Loading transfer_service/retrieve_executor.py +18 −6 Original line number Diff line number Diff line Loading @@ -79,6 +79,7 @@ class RetrieveExecutor(TaskExecutor): self.jobId = None self.nodeList = [] self.fileList = [] self.destPathList = [] self.numBlocks = 0 self.procBlocks = 0 self.totalSize = 0 Loading Loading @@ -222,9 +223,9 @@ class RetrieveExecutor(TaskExecutor): osRelParentPath = osRelParentPath.replace(baseSrcPath, "") if osRelParentPath != "/": osRelParentPath += "/" destPath = self.storageRetrievePath.replace("{username}", username) + osRelParentPath os.makedirs(destPath, exist_ok = True) sp = subprocess.run(["rsync", "-av", srcPath, destPath], capture_output = True) destDirPath = self.storageRetrievePath.replace("{username}", username) + osRelParentPath os.makedirs(destDirPath, exist_ok = True) sp = subprocess.run(["rsync", "-av", srcPath, destDirPath], capture_output = True) if(sp.returncode or sp.stderr): return False Loading Loading @@ -263,6 +264,17 @@ class RetrieveExecutor(TaskExecutor): self.dbConn.setPhase(self.jobId, "COMPLETED") self.dbConn.setEndTime(self.jobId) # Add a list of physical destination paths for each VOSpace node in the node list for vospacePath in self.nodeList: nodeInfo = self.dbConn.getOSPath(vospacePath) baseSrcPath = nodeInfo["baseSrcPath"] username = nodeInfo["username"] srcPath = nodeInfo["fullPath"] baseDestPath = self.storageRetrievePath.replace("{username}", username) destPath = srcPath.replace(baseSrcPath, baseDestPath) self.destPathList.append(destPath) self.jobObj.jobInfo["destPathList"] = self.destPathList.copy() # Send e-mail notification m = Mailer() Loading Loading @@ -290,6 +302,7 @@ class RetrieveExecutor(TaskExecutor): """ self.fileList.clear() self.nodeList.clear() self.destPathList.clear() self.storageType = None self.numBlocks = 0 self.procBlocks = 0 Loading @@ -304,7 +317,7 @@ class RetrieveExecutor(TaskExecutor): if self.srcQueue.len() > 0: self.jobObj = self.srcQueue.getJob() self.jobId = self.jobObj.jobId self.nodeList = self.jobObj.jobInfo["nodeList"] self.nodeList = self.jobObj.jobInfo["nodeList"].copy() self.buildFileList() self.buildBlocks() result = self.retrieveData() Loading @@ -321,5 +334,4 @@ class RetrieveExecutor(TaskExecutor): self.destQueue.extractJob() self.destQueue.insertJob(self.jobObj) self.srcQueue.extractJob() self.logger.info(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}") Loading
transfer_service/retrieve_executor.py +18 −6 Original line number Diff line number Diff line Loading @@ -79,6 +79,7 @@ class RetrieveExecutor(TaskExecutor): self.jobId = None self.nodeList = [] self.fileList = [] self.destPathList = [] self.numBlocks = 0 self.procBlocks = 0 self.totalSize = 0 Loading Loading @@ -222,9 +223,9 @@ class RetrieveExecutor(TaskExecutor): osRelParentPath = osRelParentPath.replace(baseSrcPath, "") if osRelParentPath != "/": osRelParentPath += "/" destPath = self.storageRetrievePath.replace("{username}", username) + osRelParentPath os.makedirs(destPath, exist_ok = True) sp = subprocess.run(["rsync", "-av", srcPath, destPath], capture_output = True) destDirPath = self.storageRetrievePath.replace("{username}", username) + osRelParentPath os.makedirs(destDirPath, exist_ok = True) sp = subprocess.run(["rsync", "-av", srcPath, destDirPath], capture_output = True) if(sp.returncode or sp.stderr): return False Loading Loading @@ -263,6 +264,17 @@ class RetrieveExecutor(TaskExecutor): self.dbConn.setPhase(self.jobId, "COMPLETED") self.dbConn.setEndTime(self.jobId) # Add a list of physical destination paths for each VOSpace node in the node list for vospacePath in self.nodeList: nodeInfo = self.dbConn.getOSPath(vospacePath) baseSrcPath = nodeInfo["baseSrcPath"] username = nodeInfo["username"] srcPath = nodeInfo["fullPath"] baseDestPath = self.storageRetrievePath.replace("{username}", username) destPath = srcPath.replace(baseSrcPath, baseDestPath) self.destPathList.append(destPath) self.jobObj.jobInfo["destPathList"] = self.destPathList.copy() # Send e-mail notification m = Mailer() Loading Loading @@ -290,6 +302,7 @@ class RetrieveExecutor(TaskExecutor): """ self.fileList.clear() self.nodeList.clear() self.destPathList.clear() self.storageType = None self.numBlocks = 0 self.procBlocks = 0 Loading @@ -304,7 +317,7 @@ class RetrieveExecutor(TaskExecutor): if self.srcQueue.len() > 0: self.jobObj = self.srcQueue.getJob() self.jobId = self.jobObj.jobId self.nodeList = self.jobObj.jobInfo["nodeList"] self.nodeList = self.jobObj.jobInfo["nodeList"].copy() self.buildFileList() self.buildBlocks() result = self.retrieveData() Loading @@ -321,5 +334,4 @@ class RetrieveExecutor(TaskExecutor): self.destQueue.extractJob() self.destQueue.insertJob(self.jobObj) self.srcQueue.extractJob() self.logger.info(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}")