Loading transfer_service/retrieve_executor.py +54 −65 Original line number Diff line number Diff line Loading @@ -41,16 +41,20 @@ class RetrieveExecutor(TaskExecutor): super(RetrieveExecutor, self).__init__() def buildFileList(self): self.dbConn.setPhase(self.jobId, "EXECUTING") self.dbConn.setStartTime(self.jobId) if os.path.exists("nodeList.txt"): os.remove("nodeList.txt") nl = open("nodeList.txt", 'w') for vospacePath in self.nodeList: nl.write(vospacePath + '\n') nl.close() self.storageType = self.dbConn.getOSPath(self.nodeList[0])["storageType"] for vospacePath in self.nodeList: nodeInfo = self.dbConn.getOSPath(vospacePath) baseSrcPath = nodeInfo["baseSrcPath"] srcPath = nodeInfo["fullPath"] username = nodeInfo["username"] md5calc = Checksum() if os.path.isdir(srcPath): for root, dirs, files in os.walk(srcPath, topdown = False): Loading @@ -59,7 +63,9 @@ class RetrieveExecutor(TaskExecutor): if md5calc.fileIsValid(fullPath): fileSize = os.stat(fullPath).st_size fileInfo = { "baseSrcPath": baseSrcPath, "fullPath": fullPath, "username": username, "fileSize": fileSize, "vospaceRootParent": vospacePath } Loading @@ -68,7 +74,9 @@ class RetrieveExecutor(TaskExecutor): if md5calc.fileIsValid(srcPath): fileSize = nodeInfo["contentLength"] fileInfo = { "baseSrcPath": baseSrcPath, "fullPath": srcPath, "username": username, "fileSize": fileSize, "vospaceRootParent": vospacePath } Loading @@ -82,7 +90,6 @@ class RetrieveExecutor(TaskExecutor): fl.close() def buildBlocks(self): if self.fileList: blockIdx = 0 blockSize = 0 Loading Loading @@ -135,67 +142,48 @@ class RetrieveExecutor(TaskExecutor): fl = open("blocks.txt", 'w') fl.write(json.dumps(self.fileList, indent = 4)) fl.close() #blockList = [ block0 = [file0 = [], ..., fileJ = [] ], # ..., # blockN = [file0 = [], ..., fileK = [] ] # ] def prepareData(self): fileList = [] self.dbConn.setPhase(self.jobId, "EXECUTING") self.dbConn.setStartTime(self.jobId) for vospacePath in self.nodeList: #[srcPath, storageType, username, osRelPath] = self.dbConn.getOSPath(vospacePath) fileInfo = self.dbConn.getOSPath(vospacePath) srcPath = fileInfo["fullPath"] storageType = fileInfo["storageType"] username = fileInfo["username"] osRelPath = fileInfo["osPath"] fileSize = fileInfo["contentLength"] if storageType == "cold": if os.path.isdir(srcPath): for root, dirs, files in os.walk(srcPath): for f in files: fileList.append(os.path.join(root, f)) else: fileList.append(srcPath) if fileList: def retrieveCompleted(self, vospacePath): return not any(vospacePath in f for f in self.fileList) def retrieveData(self): for blockIdx in range(self.numBlocks): blockFileList = [ f for f in self.fileList if f["blockIdx"] == blockIdx ] if self.storageType == "cold": self.tapeClient.connect() self.tapeClient.recall(fileList) self.tapeClient.recall([ f["fullPath"] for f in blockFileList ]) self.tapeClient.disconnect() def retrieveData(self): for vospacePath in self.nodeList: #[srcPath, storageType, username, osRelPath] = self.dbConn.getOSPath(vospacePath) fileInfo = self.dbConn.getOSPath(vospacePath) for fileInfo in blockFileList: srcPath = fileInfo["fullPath"] storageType = fileInfo["storageType"] username = fileInfo["username"] osRelPath = fileInfo["osPath"] fileSize = fileInfo["contentLength"] osRelParentPath = os.path.dirname(osRelPath) baseSrcPath = fileInfo["baseSrcPath"] osRelParentPath = os.path.dirname(srcPath) osRelParentPath = osRelParentPath.replace(baseSrcPath, "") if osRelParentPath != "/": osRelParentPath += "/" out = open("retrieve_executor_log.txt", "a") out.write(f"srcPath: {srcPath}\n") out.write(f"osRelPath: {osRelPath}\n") out.write(f"osRelParentPath: {osRelParentPath}\n") destPath = self.storageRetrievePath.replace("{username}", username) + osRelParentPath out.write(f"destPath: {destPath}\n\n") os.makedirs(destPath, exist_ok = True) sp = subprocess.run(["rsync", "-av", srcPath, destPath], capture_output = True) out.write(f"rsync stdout: {sp.stdout}") if(sp.returncode or sp.stderr): out.write(f"rsync stderr: {sp.stderr}") self.dbConn.disconnect() return False else: self.updateAsyncTrans(vospacePath) return True def updateAsyncTrans(self, vospacePath): self.dbConn.setAsyncTrans(vospacePath, False); for fileInfo in blockFileList: if fileInfo in self.fileList: self.fileList.remove(fileInfo) for vospacePath in self.nodeList: if self.retrieveCompleted(vospacePath): self.dbConn.setAsyncTrans(vospacePath, False) if self.storageType == "cold": self.tapeClient.connect() self.tapeClient.migrate([ f["fullPath"] for f in blockFileList ]) self.tapeClient.disconnect() blockFileList.clear() return True def updateJobStatus(self): results = [{"target": ""}] Loading @@ -212,6 +200,8 @@ class RetrieveExecutor(TaskExecutor): self.dbConn.deleteTmpDataNode(vospacePath) self.fileList.clear() self.nodeList.clear() self.storageType = None self.numBlocks = None def run(self): print("Starting retrieve executor...") Loading @@ -225,7 +215,6 @@ class RetrieveExecutor(TaskExecutor): self.nodeList = self.jobObj.jobInfo["nodeList"] self.buildFileList() self.buildBlocks() self.prepareData() result = self.retrieveData() if result: self.updateJobStatus() Loading Loading
transfer_service/retrieve_executor.py +54 −65 Original line number Diff line number Diff line Loading @@ -41,16 +41,20 @@ class RetrieveExecutor(TaskExecutor): super(RetrieveExecutor, self).__init__() def buildFileList(self): self.dbConn.setPhase(self.jobId, "EXECUTING") self.dbConn.setStartTime(self.jobId) if os.path.exists("nodeList.txt"): os.remove("nodeList.txt") nl = open("nodeList.txt", 'w') for vospacePath in self.nodeList: nl.write(vospacePath + '\n') nl.close() self.storageType = self.dbConn.getOSPath(self.nodeList[0])["storageType"] for vospacePath in self.nodeList: nodeInfo = self.dbConn.getOSPath(vospacePath) baseSrcPath = nodeInfo["baseSrcPath"] srcPath = nodeInfo["fullPath"] username = nodeInfo["username"] md5calc = Checksum() if os.path.isdir(srcPath): for root, dirs, files in os.walk(srcPath, topdown = False): Loading @@ -59,7 +63,9 @@ class RetrieveExecutor(TaskExecutor): if md5calc.fileIsValid(fullPath): fileSize = os.stat(fullPath).st_size fileInfo = { "baseSrcPath": baseSrcPath, "fullPath": fullPath, "username": username, "fileSize": fileSize, "vospaceRootParent": vospacePath } Loading @@ -68,7 +74,9 @@ class RetrieveExecutor(TaskExecutor): if md5calc.fileIsValid(srcPath): fileSize = nodeInfo["contentLength"] fileInfo = { "baseSrcPath": baseSrcPath, "fullPath": srcPath, "username": username, "fileSize": fileSize, "vospaceRootParent": vospacePath } Loading @@ -82,7 +90,6 @@ class RetrieveExecutor(TaskExecutor): fl.close() def buildBlocks(self): if self.fileList: blockIdx = 0 blockSize = 0 Loading Loading @@ -135,67 +142,48 @@ class RetrieveExecutor(TaskExecutor): fl = open("blocks.txt", 'w') fl.write(json.dumps(self.fileList, indent = 4)) fl.close() #blockList = [ block0 = [file0 = [], ..., fileJ = [] ], # ..., # blockN = [file0 = [], ..., fileK = [] ] # ] def prepareData(self): fileList = [] self.dbConn.setPhase(self.jobId, "EXECUTING") self.dbConn.setStartTime(self.jobId) for vospacePath in self.nodeList: #[srcPath, storageType, username, osRelPath] = self.dbConn.getOSPath(vospacePath) fileInfo = self.dbConn.getOSPath(vospacePath) srcPath = fileInfo["fullPath"] storageType = fileInfo["storageType"] username = fileInfo["username"] osRelPath = fileInfo["osPath"] fileSize = fileInfo["contentLength"] if storageType == "cold": if os.path.isdir(srcPath): for root, dirs, files in os.walk(srcPath): for f in files: fileList.append(os.path.join(root, f)) else: fileList.append(srcPath) if fileList: def retrieveCompleted(self, vospacePath): return not any(vospacePath in f for f in self.fileList) def retrieveData(self): for blockIdx in range(self.numBlocks): blockFileList = [ f for f in self.fileList if f["blockIdx"] == blockIdx ] if self.storageType == "cold": self.tapeClient.connect() self.tapeClient.recall(fileList) self.tapeClient.recall([ f["fullPath"] for f in blockFileList ]) self.tapeClient.disconnect() def retrieveData(self): for vospacePath in self.nodeList: #[srcPath, storageType, username, osRelPath] = self.dbConn.getOSPath(vospacePath) fileInfo = self.dbConn.getOSPath(vospacePath) for fileInfo in blockFileList: srcPath = fileInfo["fullPath"] storageType = fileInfo["storageType"] username = fileInfo["username"] osRelPath = fileInfo["osPath"] fileSize = fileInfo["contentLength"] osRelParentPath = os.path.dirname(osRelPath) baseSrcPath = fileInfo["baseSrcPath"] osRelParentPath = os.path.dirname(srcPath) osRelParentPath = osRelParentPath.replace(baseSrcPath, "") if osRelParentPath != "/": osRelParentPath += "/" out = open("retrieve_executor_log.txt", "a") out.write(f"srcPath: {srcPath}\n") out.write(f"osRelPath: {osRelPath}\n") out.write(f"osRelParentPath: {osRelParentPath}\n") destPath = self.storageRetrievePath.replace("{username}", username) + osRelParentPath out.write(f"destPath: {destPath}\n\n") os.makedirs(destPath, exist_ok = True) sp = subprocess.run(["rsync", "-av", srcPath, destPath], capture_output = True) out.write(f"rsync stdout: {sp.stdout}") if(sp.returncode or sp.stderr): out.write(f"rsync stderr: {sp.stderr}") self.dbConn.disconnect() return False else: self.updateAsyncTrans(vospacePath) return True def updateAsyncTrans(self, vospacePath): self.dbConn.setAsyncTrans(vospacePath, False); for fileInfo in blockFileList: if fileInfo in self.fileList: self.fileList.remove(fileInfo) for vospacePath in self.nodeList: if self.retrieveCompleted(vospacePath): self.dbConn.setAsyncTrans(vospacePath, False) if self.storageType == "cold": self.tapeClient.connect() self.tapeClient.migrate([ f["fullPath"] for f in blockFileList ]) self.tapeClient.disconnect() blockFileList.clear() return True def updateJobStatus(self): results = [{"target": ""}] Loading @@ -212,6 +200,8 @@ class RetrieveExecutor(TaskExecutor): self.dbConn.deleteTmpDataNode(vospacePath) self.fileList.clear() self.nodeList.clear() self.storageType = None self.numBlocks = None def run(self): print("Starting retrieve executor...") Loading @@ -225,7 +215,6 @@ class RetrieveExecutor(TaskExecutor): self.nodeList = self.jobObj.jobInfo["nodeList"] self.buildFileList() self.buildBlocks() self.prepareData() result = self.retrieveData() if result: self.updateJobStatus() Loading