Loading transfer_service/db_connector.py +10 −2 Original line number Diff line number Diff line Loading @@ -1084,14 +1084,22 @@ class DbConnector(object): self.connPool.putconn(conn, close = False) def setAsyncTrans(self, nodeVOSPath, value): """Sets the 'async_trans' flag for a VOSpace node.""" """ Sets the 'async_trans' flag recursively for a VOSpace node. Local nodes are not considered. """ try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" UPDATE node c SET async_trans = %s FROM node n WHERE c.path <@ n.path AND n.node_id = id_from_vos_path(%s); WHERE c.path <@ n.path AND n.node_id = id_from_vos_path(%s) AND c.location_id NOT IN (SELECT location_id FROM location WHERE storage_src_id = storage_dest_id); """, (value, nodeVOSPath,)) conn.commit() Loading transfer_service/retrieve_cleaner.py +30 −28 Original line number Diff line number Diff line Loading @@ -10,6 +10,7 @@ import json import logging import os import shutil import time from config import Config from db_connector import DbConnector Loading Loading @@ -87,21 +88,22 @@ class RetrieveCleaner(TaskExecutor): numNodes = len(self.nodeList) self.logger.info(f"Number of VOSpace nodes involved: {numNodes}") while numNodes > 0: i = 0 while i < numNodes: vospacePath = self.nodeList[i] destPath = self.destPathList[i] time.sleep(0.2) vospacePath = self.nodeList[numNodes - 1] destPath = self.destPathList[numNodes - 1] try: busy = self.dbConn.nodeIsBusy(vospacePath) except Exception: self.logger.exception(f"FATAL: unable to check the 'busy' flag value for the VOSpace node '{vospacePath}'.") return False else: if not busy: try: self.dbConn.setAsyncTrans(vospacePath, True) except Exception: self.logger.exception(f"FATAL: unable to update the 'async_trans' flag for the VOSpace node '{vospacePath}'.") return False else: try: if os.path.isfile(destPath): os.remove(destPath) Loading @@ -109,6 +111,7 @@ class RetrieveCleaner(TaskExecutor): shutil.rmtree(destPath) except FileNotFoundError: self.logger.exception(f"Cannot find '{destPath}', skip...") # check for empty dirs and remove them basePath = self.storageRetrievePath.replace("{username}", self.username) for root, dirs, files in os.walk(basePath, topdown = False): Loading @@ -116,10 +119,9 @@ class RetrieveCleaner(TaskExecutor): dirPath = os.path.abspath(root) + '/' + dir if not os.listdir(dirPath): os.rmdir(dirPath) self.nodeList.pop(i) self.destPathList.pop(i) self.nodeList.pop(numNodes - 1) self.destPathList.pop(numNodes - 1) numNodes -= 1 i = 0 except Exception: self.logger.exception("FATAL: something went wrong while cleaning the expired data.") return False Loading Loading
transfer_service/db_connector.py +10 −2 Original line number Diff line number Diff line Loading @@ -1084,14 +1084,22 @@ class DbConnector(object): self.connPool.putconn(conn, close = False) def setAsyncTrans(self, nodeVOSPath, value): """Sets the 'async_trans' flag for a VOSpace node.""" """ Sets the 'async_trans' flag recursively for a VOSpace node. Local nodes are not considered. """ try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" UPDATE node c SET async_trans = %s FROM node n WHERE c.path <@ n.path AND n.node_id = id_from_vos_path(%s); WHERE c.path <@ n.path AND n.node_id = id_from_vos_path(%s) AND c.location_id NOT IN (SELECT location_id FROM location WHERE storage_src_id = storage_dest_id); """, (value, nodeVOSPath,)) conn.commit() Loading
transfer_service/retrieve_cleaner.py +30 −28 Original line number Diff line number Diff line Loading @@ -10,6 +10,7 @@ import json import logging import os import shutil import time from config import Config from db_connector import DbConnector Loading Loading @@ -87,21 +88,22 @@ class RetrieveCleaner(TaskExecutor): numNodes = len(self.nodeList) self.logger.info(f"Number of VOSpace nodes involved: {numNodes}") while numNodes > 0: i = 0 while i < numNodes: vospacePath = self.nodeList[i] destPath = self.destPathList[i] time.sleep(0.2) vospacePath = self.nodeList[numNodes - 1] destPath = self.destPathList[numNodes - 1] try: busy = self.dbConn.nodeIsBusy(vospacePath) except Exception: self.logger.exception(f"FATAL: unable to check the 'busy' flag value for the VOSpace node '{vospacePath}'.") return False else: if not busy: try: self.dbConn.setAsyncTrans(vospacePath, True) except Exception: self.logger.exception(f"FATAL: unable to update the 'async_trans' flag for the VOSpace node '{vospacePath}'.") return False else: try: if os.path.isfile(destPath): os.remove(destPath) Loading @@ -109,6 +111,7 @@ class RetrieveCleaner(TaskExecutor): shutil.rmtree(destPath) except FileNotFoundError: self.logger.exception(f"Cannot find '{destPath}', skip...") # check for empty dirs and remove them basePath = self.storageRetrievePath.replace("{username}", self.username) for root, dirs, files in os.walk(basePath, topdown = False): Loading @@ -116,10 +119,9 @@ class RetrieveCleaner(TaskExecutor): dirPath = os.path.abspath(root) + '/' + dir if not os.listdir(dirPath): os.rmdir(dirPath) self.nodeList.pop(i) self.destPathList.pop(i) self.nodeList.pop(numNodes - 1) self.destPathList.pop(numNodes - 1) numNodes -= 1 i = 0 except Exception: self.logger.exception("FATAL: something went wrong while cleaning the expired data.") return False Loading