Loading transfer_service/retrieve_cleaner.py +59 −19 Original line number Diff line number Diff line Loading @@ -45,7 +45,7 @@ class RetrieveCleaner(TaskExecutor): self.destPathList = [] super(RetrieveCleaner, self).__init__() def execute(self): def dataHasExpired(self): # Avoid "all zero" condition if self.days <= 0 and self.seconds < 30: self.days = 0 Loading @@ -58,20 +58,43 @@ class RetrieveCleaner(TaskExecutor): currentTime = datetime.datetime.now() delta = currentTime - jobEndTime if delta.days >= self.days and delta.seconds > self.seconds: return True else: return False def execute(self): # while dim lists > 0: # loop over the two lists (nodeList and destPathList): # if the vospace node is not busy: # set 'async_trans' = True # delete the file/dir in the 'retrieve' directory # remove the corresponding elements on the two lists try: self.logger.info("++++++++++ Start of execution phase ++++++++++") try: nodeInfo = self.dbConn.getOSPath(self.nodeList[0]) self.username = nodeInfo["username"] except Exception: self.logger.exception("FATAL: unable to obtain info about VOSpace nodes.") return False numNodes = len(self.nodeList) self.logger.info(f"Number of VOSpace nodes involved: {numNodes}") while numNodes > 0: i = 0 while i < numNodes: vospacePath = self.nodeList[i] destPath = self.destPathList[i] if not self.dbConn.nodeIsBusy(vospacePath): try: busy = self.dbConn.nodeIsBusy(vospacePath) except Exception: self.logger.exception(f"FATAL: unable to check the 'busy' flag value for the VOSpace node '{vospacePath}'.") return False if not busy: try: self.dbConn.setAsyncTrans(vospacePath, True) except Exception: self.logger.exception(f"FATAL: unable to update the 'async_trans' flag for the VOSpace node '{vospacePath}'.") return False if os.path.isfile(destPath): os.remove(destPath) else: Loading @@ -86,9 +109,12 @@ class RetrieveCleaner(TaskExecutor): self.destPathList.pop(i) numNodes -= 1 i = 0 self.destQueue.insertJob(self.jobObj) self.srcQueue.extractJob() self.logger.info(f"Job {self.jobObj.jobId} MOVED from '{self.srcQueue.name()}' to '{self.destQueue.name()}'") except Exception: self.logger.exception("FATAL: something went wrong while cleaning the expired data.") return False else: self.logger.info("++++++++++ End of execution phase ++++++++++") return True def run(self): self.logger.info("Starting retrieve cleaner...") Loading @@ -96,10 +122,24 @@ class RetrieveCleaner(TaskExecutor): self.setDestinationQueueName("read_clean") while True: self.wait() if self.destQueue.len() < self.maxReadyJobs and self.srcQueue.len() > 0: try: srcQueueLen = self.srcQueue.len() destQueueLen = self.destQueue.len() except Exception: self.logger.exception("Cache error: failed to retrieve queue length.") else: if destQueueLen < self.maxReadyJobs and srcQueueLen > 0: self.jobObj = self.srcQueue.getJob() self.nodeList = self.jobObj.nodeList.copy() self.destPathList = self.jobObj.jobInfo["destPathList"].copy() nodeInfo = self.dbConn.getOSPath(self.nodeList[0]) self.username = nodeInfo["username"] self.execute() if self.dataHasExpired(): if self.execute(): try: if destQueueLen >= self.maxCleanJobs: self.destQueue.extractJob() self.destQueue.insertJob(self.jobObj) self.srcQueue.extractJob() except Exception: self.logger.exception(f"Failed to move job {self.jobObj.jobId} from '{self.srcQueue.name()}' to '{self.destQueue.name()}'") else: self.logger.info(f"Job {self.jobObj.jobId} MOVED from '{self.srcQueue.name()}' to '{self.destQueue.name()}'") Loading
transfer_service/retrieve_cleaner.py +59 −19 Original line number Diff line number Diff line Loading @@ -45,7 +45,7 @@ class RetrieveCleaner(TaskExecutor): self.destPathList = [] super(RetrieveCleaner, self).__init__() def execute(self): def dataHasExpired(self): # Avoid "all zero" condition if self.days <= 0 and self.seconds < 30: self.days = 0 Loading @@ -58,20 +58,43 @@ class RetrieveCleaner(TaskExecutor): currentTime = datetime.datetime.now() delta = currentTime - jobEndTime if delta.days >= self.days and delta.seconds > self.seconds: return True else: return False def execute(self): # while dim lists > 0: # loop over the two lists (nodeList and destPathList): # if the vospace node is not busy: # set 'async_trans' = True # delete the file/dir in the 'retrieve' directory # remove the corresponding elements on the two lists try: self.logger.info("++++++++++ Start of execution phase ++++++++++") try: nodeInfo = self.dbConn.getOSPath(self.nodeList[0]) self.username = nodeInfo["username"] except Exception: self.logger.exception("FATAL: unable to obtain info about VOSpace nodes.") return False numNodes = len(self.nodeList) self.logger.info(f"Number of VOSpace nodes involved: {numNodes}") while numNodes > 0: i = 0 while i < numNodes: vospacePath = self.nodeList[i] destPath = self.destPathList[i] if not self.dbConn.nodeIsBusy(vospacePath): try: busy = self.dbConn.nodeIsBusy(vospacePath) except Exception: self.logger.exception(f"FATAL: unable to check the 'busy' flag value for the VOSpace node '{vospacePath}'.") return False if not busy: try: self.dbConn.setAsyncTrans(vospacePath, True) except Exception: self.logger.exception(f"FATAL: unable to update the 'async_trans' flag for the VOSpace node '{vospacePath}'.") return False if os.path.isfile(destPath): os.remove(destPath) else: Loading @@ -86,9 +109,12 @@ class RetrieveCleaner(TaskExecutor): self.destPathList.pop(i) numNodes -= 1 i = 0 self.destQueue.insertJob(self.jobObj) self.srcQueue.extractJob() self.logger.info(f"Job {self.jobObj.jobId} MOVED from '{self.srcQueue.name()}' to '{self.destQueue.name()}'") except Exception: self.logger.exception("FATAL: something went wrong while cleaning the expired data.") return False else: self.logger.info("++++++++++ End of execution phase ++++++++++") return True def run(self): self.logger.info("Starting retrieve cleaner...") Loading @@ -96,10 +122,24 @@ class RetrieveCleaner(TaskExecutor): self.setDestinationQueueName("read_clean") while True: self.wait() if self.destQueue.len() < self.maxReadyJobs and self.srcQueue.len() > 0: try: srcQueueLen = self.srcQueue.len() destQueueLen = self.destQueue.len() except Exception: self.logger.exception("Cache error: failed to retrieve queue length.") else: if destQueueLen < self.maxReadyJobs and srcQueueLen > 0: self.jobObj = self.srcQueue.getJob() self.nodeList = self.jobObj.nodeList.copy() self.destPathList = self.jobObj.jobInfo["destPathList"].copy() nodeInfo = self.dbConn.getOSPath(self.nodeList[0]) self.username = nodeInfo["username"] self.execute() if self.dataHasExpired(): if self.execute(): try: if destQueueLen >= self.maxCleanJobs: self.destQueue.extractJob() self.destQueue.insertJob(self.jobObj) self.srcQueue.extractJob() except Exception: self.logger.exception(f"Failed to move job {self.jobObj.jobId} from '{self.srcQueue.name()}' to '{self.destQueue.name()}'") else: self.logger.info(f"Job {self.jobObj.jobId} MOVED from '{self.srcQueue.name()}' to '{self.destQueue.name()}'")