Commit c957a5b1 authored by Cristiano Urban's avatar Cristiano Urban
Browse files

Fixed 'read_clean' queue saturation + added 'cleanup()' method.

parent 9bf3e315
Loading
Loading
Loading
Loading
Loading
+6 −1
Original line number Diff line number Diff line
@@ -127,6 +127,10 @@ class RetrieveCleaner(TaskExecutor):
            self.logger.info("++++++++++ End of execution phase ++++++++++")
            return True

    def cleanup(self):
        self.nodeList.clear()
        self.destPathList.clear()
        
    def run(self):
        self.logger.info("Starting retrieve cleaner...")
        self.setSourceQueueName("read_terminated")
@@ -139,7 +143,7 @@ class RetrieveCleaner(TaskExecutor):
            except Exception:
                self.logger.exception("Cache error: failed to retrieve queue length.")
            else:
                if destQueueLen < self.maxCleanJobs and srcQueueLen > 0:
                if srcQueueLen > 0:
                    self.jobObj = self.srcQueue.getJob()
                    self.nodeList = self.jobObj.nodeList.copy()
                    self.destPathList = self.jobObj.jobInfo["destPathList"].copy()
@@ -154,3 +158,4 @@ class RetrieveCleaner(TaskExecutor):
                                self.logger.exception(f"Failed to move job {self.jobObj.jobId} from '{self.srcQueue.name()}' to '{self.destQueue.name()}'")
                            else:
                                self.logger.info(f"Job {self.jobObj.jobId} MOVED from '{self.srcQueue.name()}' to '{self.destQueue.name()}'")
                    self.cleanup()