Commit 27568ba4 authored by Cristiano Urban's avatar Cristiano Urban
Browse files

Modified 'retrieveData()' method + added 'updateAsyncTrans()' and 'updateJobStatus()'.

parent ca33db07
Loading
Loading
Loading
Loading
+20 −21
Original line number Diff line number Diff line
@@ -17,10 +17,6 @@ class RetrieveExecutor(TaskExecutor):
                                     self.params["password"])
        self.params = config.loadSection("transfer_node")
        self.storageRetrievePath = self.params["retrieve_path"]
        self.params = config.loadSection("tape_library")
        self.tapeStorageBasePath = self.params["base_path"]
        self.params = config.loadSection("ia2_server")
        self.serverStorageBasePath = self.params["base_path"]
        self.params = config.loadSection("file_catalog")
        self.dbConn = DbConnector(self.params["user"],
                                  self.params["password"],
@@ -32,29 +28,33 @@ class RetrieveExecutor(TaskExecutor):
        
    def retrieveData(self):
        self.dbConn.connect()
        self.username = self.dbConn.getUserName()
        destPathPrefix = self.storageRetrievePath.replace("{username}", self.username)
        srcData = os.listdir(srcPathPrefix)
        if self.requestType == "CSTORE": # TO BE CHANGED!!!
            srcPathPrefix = self.tapeStorageBasePath.replace("{username}", self.username)
        self.dbConn.setPhase(self.jobId, "EXECUTING")
        for vospacePath in self.nodeList:
            [srcPath, storageType, username] = self.dbConn.getOSPath(vospacePath)
            destPath = self.storageRetrievePath.replace("{username}", username)
            if storageType == "cold":
                #srcPathPrefix = self.tapeStorageBasePath.replace("{username}", self.username)
                # TO BE DONE 
                pass
            else:
            srcPathPrefix = self.serverStorageBasePath.replace("{username}", self.username)
            for vospacePath in self.nodeList:
                osRelPath = self.dbConn.getOSPath(vospacePath)
                sp = subprocess.run(["rsync", "-av", srcPathPrefix + osRelPath, destPathPrefix + '/'], capture_output = True)
                sp = subprocess.run(["rsync", "-av", srcPath, destPath + "/"], capture_output = True)
                if(sp.returncode or sp.stderr):
                    self.dbConn.disconnect()
                    return False
                else:
                    self.updateAsyncTrans(vospacePath)
        self.dbConn.disconnect()    
        return True

    def updateDb(self):
    def updateAsyncTrans(self, vospacePath):
        self.dbConn.setAsyncTrans(vospacePath, False);

    def updateJobStatus(self):
        self.dbConn.connect()
        self.dbConn.setPhase(self.jobId, "COMPLETED")
        for vospacePath in self.nodeList:
            osRelPath = self.dbConn.getOSPath(vospacePath)            
            self.dbConn.setAsyncTrans(osRelPath, False);
        results = [{"target": ""}]
        results[0]["target"] = self.jobObj.jobInfo["transfer"]["target"]        
        self.dbConn.setResults(self.jobId, results)
        self.dbConn.disconnect()
        
    def cleanup(self):
@@ -69,11 +69,10 @@ class RetrieveExecutor(TaskExecutor):
            if self.srcQueue.len() > 0:
                self.jobObj = self.srcQueue.getJob()
                self.jobId = self.jobObj.jobId                
                self.requestType = self.jobObj.jobInfo["requestType"]
                self.nodeList = self.jobObj.jobInfo["nodeList"]
                result = self.retrieveData()
                if result:
                    self.updateDb()
                    self.updateJobStatus()
                else:
                    sys.exit("Failed to retrieve data!")
                self.srcQueue.moveJobTo(self.destQueue.name())