Commit c73f71df authored by Cristiano Urban's avatar Cristiano Urban
Browse files

Some changes on how we move jobs from one queue to another + added status...


Some changes on how we move jobs from one queue to another + added status update on db and redis for storage operations.

Signed-off-by: default avatarCristiano Urban <cristiano.urban@inaf.it>
parent cb0591c3
Loading
Loading
Loading
Loading
+5 −1
Original line number Diff line number Diff line
@@ -25,6 +25,8 @@ class RetrieveExecutor(TaskExecutor):
                                  self.params.getint("port"),
                                  self.params["db"])
        self.jobObj = None
        self.jobId = None
        self.nodeList = []
        super(RetrieveExecutor, self).__init__()
        
    def retrieveData(self):
@@ -66,6 +68,7 @@ class RetrieveExecutor(TaskExecutor):
        results = [{"target": ""}]
        results[0]["target"] = self.jobObj.jobInfo["transfer"]["target"]        
        self.dbConn.setResults(self.jobId, results)
        self.jobObj.setPhase("COMPLETED")
        self.dbConn.setPhase(self.jobId, "COMPLETED")
        self.dbConn.disconnect()
        
@@ -87,6 +90,7 @@ class RetrieveExecutor(TaskExecutor):
                    self.updateJobStatus()
                else:
                    sys.exit("Failed to retrieve data!")
                self.srcQueue.moveJobTo(self.destQueue.name())
                self.destQueue.insertJob(self.jobObj)
                self.srcQueue.extractJob()
                print(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}")
    
+3 −3
Original line number Diff line number Diff line
@@ -37,6 +37,6 @@ class RetrievePreprocessor(TaskExecutor):
            if self.destQueue.len() < self.maxReadyJobs and self.srcQueue.len() > 0:
                self.jobObj = self.srcQueue.getJob()                
                self.execute()
                self.srcQueue.extractJob()
                self.destQueue.insertJob(self.jobObj)
                self.srcQueue.extractJob()
                print(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}")
+8 −3
Original line number Diff line number Diff line
@@ -27,6 +27,7 @@ class StoreExecutor(TaskExecutor):
                                  self.params.getint("port"),
                                  self.params["db"])
        self.jobObj = None
        self.jobId = None
        self.username = None
        self.requestType = None
        self.storageId = None
@@ -35,10 +36,11 @@ class StoreExecutor(TaskExecutor):
        super(StoreExecutor, self).__init__()

    def copyData(self):
        self.dbConn.connect()
        self.dbConn.setPhase(self.jobId, "EXECUTING")
        srcPathPrefix = self.storageStorePath.replace("{username}", self.username)
        srcData = os.listdir(srcPathPrefix)
        if self.requestType == "CSTORE":
            self.dbConn.connect()
            destPathPrefix = self.dbConn.getStorageBasePath(self.storageId) + '/' + self.username
            self.dbConn.disconnect()
            #destPathPrefix = self.tapeStorageBasePath.replace("{username}", self.username)
@@ -47,7 +49,6 @@ class StoreExecutor(TaskExecutor):
                self.tapeClient.copy(srcPathPrefix + '/' + el, destPathPrefix + '/' + el)
            self.tapeClient.disconnect()
        else:
            self.dbConn.connect()
            destPathPrefix = self.dbConn.getStorageBasePath(self.storageId) + '/' + self.username
            self.dbConn.disconnect()
            #destPathPrefix = self.serverStorageBasePath.replace("{username}", self.username)
@@ -80,6 +81,8 @@ class StoreExecutor(TaskExecutor):
            self.dbConn.setAsyncTrans(nodeVOSPath, True);
            self.dbConn.setSticky(nodeVOSPath, True);
            self.dbConn.setBusyState(nodeVOSPath, False);
        self.jobObj.setPhase("COMPLETED")
        self.dbConn.setPhase(self.jobId, "COMPLETED")
        self.dbConn.disconnect()

    def run(self):
@@ -90,6 +93,7 @@ class StoreExecutor(TaskExecutor):
            self.wait()
            if self.srcQueue.len() > 0:
                self.jobObj = self.srcQueue.getJob()
                self.jobId = self.jobObj.jobId
                self.username = self.jobObj.jobInfo["userName"]
                self.requestType = self.jobObj.jobInfo["requestType"]
                self.storageId = self.jobObj.jobInfo["storageId"]
@@ -107,5 +111,6 @@ class StoreExecutor(TaskExecutor):
                self.copyData()
                self.cleanup()
                self.update()
                self.srcQueue.moveJobTo(self.destQueue.name())
                self.destQueue.insertJob(self.jobObj)
                self.srcQueue.extractJob()
                print(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}")
+10 −1
Original line number Diff line number Diff line
@@ -37,6 +37,7 @@ class StorePreprocessor(TaskExecutor):
        self.storageStorePath = self.params["store_path"]
        self.storageId = None
        self.jobObj = None
        self.jobId = None
        self.username = None
        self.userId = None
        self.nodeList = []
@@ -202,6 +203,12 @@ class StorePreprocessor(TaskExecutor):
        out.close()
        self.dbConn.disconnect()
        
    def update(self):
        self.jobObj.setPhase("QUEUED")
        self.dbConn.connect()
        self.dbConn.setPhase(self.jobId, "QUEUED")
        self.dbConn.disconnect()

    def run(self):
        print("Starting store preprocessor...")
        self.setSourceQueueName("write_pending")
@@ -210,13 +217,15 @@ class StorePreprocessor(TaskExecutor):
            self.wait()
            if self.destQueue.len() < self.maxReadyJobs and self.srcQueue.len() > 0:
                self.jobObj = self.srcQueue.getJob()
                self.jobId = self.jobObj.jobId
                self.storageId = self.jobObj.jobInfo["storageId"]
                self.username = self.jobObj.jobInfo["userName"]
                self.prepare(self.username)
                self.execute()
                self.update()
                self.jobObj.jobInfo["nodeList"] = self.nodeList
                self.srcQueue.extractJob()
                self.destQueue.insertJob(self.jobObj)
                self.srcQueue.extractJob()                
                print(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}")

# Test