Loading transfer_service/retrieve_executor.py +284 −184 Original line number Diff line number Diff line Loading @@ -87,22 +87,39 @@ class RetrieveExecutor(TaskExecutor): def buildFileList(self): """ Generates the list of all files to retrieve. This method returns 'True' on success, 'False' on failure. """ try: try: self.dbConn.setPhase(self.jobId, "EXECUTING") self.dbConn.setStartTime(self.jobId) except Exception: self.logger.exception("FATAL: unable to update the file catalog.") return False else: self.logger.info("Job phase updated to EXECUTING.") self.logger.info("Building the list of the files to be retrieved...") # debug block... if os.path.exists("nodeList.txt"): os.remove("nodeList.txt") nl = open("nodeList.txt", 'w') for vospacePath in self.nodeList: nl.write(vospacePath + '\n') nl.close() #if os.path.exists("nodeList.txt"): # os.remove("nodeList.txt") #nl = open("nodeList.txt", 'w') #for vospacePath in self.nodeList: # nl.write(vospacePath + '\n') #nl.close() # Obtain the storage type try: self.storageType = self.dbConn.getOSPath(self.nodeList[0])["storageType"] except Exception: self.logger.exception("FATAL: unable to obtain the storage type.") return False for vospacePath in self.nodeList: try: nodeInfo = self.dbConn.getOSPath(vospacePath) except Exception: self.logger.exception(f"FATAL: unable to obtain the OS path for the VOSpace path '{vospacePath}'.") return False baseSrcPath = nodeInfo["baseSrcPath"] srcPath = nodeInfo["fullPath"] username = nodeInfo["username"] Loading Loading @@ -134,16 +151,24 @@ class RetrieveExecutor(TaskExecutor): self.fileList.append(fileInfo.copy()) # debug block... if os.path.exists("fileList.txt"): os.remove("fileList.txt") fl = open("fileList.txt", 'w') fl.write(json.dumps(self.fileList, indent = 4)) fl.close() #if os.path.exists("fileList.txt"): # os.remove("fileList.txt") #fl = open("fileList.txt", 'w') #fl.write(json.dumps(self.fileList, indent = 4)) #fl.close() except Exception: self.logger.exception("FATAL: something went wrong while building the list of the files to be retrieved.") return False else: return True def buildBlocks(self): """ Algorithm to split data in blocks of a well known size. This method returns 'True' on success, 'False' on failure. """ try: self.logger.info("Building the blocks data structure... ") if self.fileList: blockIdx = 0 blockSize = 0 Loading Loading @@ -180,15 +205,24 @@ class RetrieveExecutor(TaskExecutor): blockSize = fileSize if self.fileList: self.numBlocks = blockIdx + 1 try: self.dbConn.setTotalBlocks(self.jobId, self.numBlocks) except Exception: self.logger.exception("FATAL: unable to set the total number of blocks in the database.") return False # debug block... print(f"numBlocks = {self.numBlocks}") if os.path.exists("blocks.txt"): os.remove("blocks.txt") fl = open("blocks.txt", 'w') fl.write(json.dumps(self.fileList, indent = 4)) fl.close() #print(f"numBlocks = {self.numBlocks}") #if os.path.exists("blocks.txt"): # os.remove("blocks.txt") #fl = open("blocks.txt", 'w') #fl.write(json.dumps(self.fileList, indent = 4)) #fl.close() except Exception: self.logger.exception("FATAL: something went wrong while building the blocks data structure.") return False else: return True def retrieveCompleted(self, vospacePath): """ Loading @@ -201,7 +235,8 @@ class RetrieveExecutor(TaskExecutor): """ Retrieves data from a generic storage point (hot or cold). """ try: self.logger.info("Starting data retrieval...") # Loop on blocks for blockIdx in range(self.numBlocks): blockFileList = [ f for f in self.fileList if f["blockIdx"] == blockIdx ] Loading @@ -226,6 +261,7 @@ class RetrieveExecutor(TaskExecutor): os.makedirs(destDirPath, exist_ok = True) sp = subprocess.run(["rsync", "-av", srcPath, destDirPath], capture_output = True) if(sp.returncode or sp.stderr): self.logger.error(f"FATAL: error during the copy process, returnCode = {sp.returncode}, stderr: {sp.stderr}") return False # Remove files from file list at the end of the copy Loading @@ -238,7 +274,11 @@ class RetrieveExecutor(TaskExecutor): # flag for vospacePath in self.nodeList: if self.retrieveCompleted(vospacePath): try: self.dbConn.setAsyncTrans(vospacePath, False) except Exception: self.logger.exception("FATAL: unable to update the file catalog.") return False # Empty the tape library frontend if the storage type # is 'cold' Loading @@ -250,22 +290,48 @@ class RetrieveExecutor(TaskExecutor): blockFileList.clear() self.procBlocks += 1 self.dbConn.updateProcessedBlocks(self.jobId, self.procBlocks) except Exception: self.logger.exception("FATAL: something went wrong while retrieving the data.") return False else: return True def update(self): def execute(self): success = True self.logger.info("++++++++++ Start of execution phase ++++++++++") success &= self.buildFileList() & self.buildBlocks() & self.retrieveData() if success: self.logger.info("++++++++++ End of execution phase ++++++++++") return True else: self.logger.info("FATAL: something went wrong during the execution phase.") return False def update(self, status): """ Updates the job status and sends an email to the user. """ try: results = [{"target": ""}] results[0]["target"] = self.jobObj.jobInfo["transfer"]["target"] #self.dbConn.setResults(self.jobId, results) m = Mailer(self.logger) m.addRecipient(self.adminEmail) userEmail = self.dbConn.getUserEmail(self.jobObj.ownerId) if userEmail != self.adminEmail: m.addRecipient(userEmail) self.jobObj.setResults(results) if status == ("OK"): self.jobObj.setPhase("COMPLETED") self.dbConn.insertJob(self.jobObj) self.dbConn.setEndTime(self.jobId) self.jobObj.endTime = datetime.datetime.now().isoformat() self.logger.info("Job phase updated to COMPLETED.") # Add a list of physical destination paths for each VOSpace node in the node list self.logger.info("Generating physical destination paths for VOSpace nodes...") for vospacePath in self.nodeList: nodeInfo = self.dbConn.getOSPath(vospacePath) baseSrcPath = nodeInfo["baseSrcPath"] Loading @@ -276,13 +342,9 @@ class RetrieveExecutor(TaskExecutor): self.destPathList.append(destPath) self.jobObj.jobInfo["destPathList"] = self.destPathList.copy() m = Mailer(self.logger) m.addRecipient(self.adminEmail) userEmail = self.dbConn.getUserEmail(self.jobObj.ownerId) if userEmail != self.adminEmail: m.addRecipient(userEmail) msg = f""" ########## VOSpace data retrieval procedure summary ########## Dear user, your job has been COMPLETED. Loading @@ -293,15 +355,46 @@ class RetrieveExecutor(TaskExecutor): Your files are available and can be downloaded. """ m.setMessage("VOSpace data retrieve notification: COMPLETED", msg) else: self.jobObj.setPhase("ERROR") self.jobObj.setErrorType("fatal") self.jobObj.setErrorMessage("FATAL: something went wrong during the execution phase.") self.dbConn.insertJob(self.jobObj) self.dbConn.setEndTime(self.jobId) self.jobObj.endTime = datetime.datetime.now().isoformat() self.logger.info("Job phase updated to ERROR.") msg = f""" ########## VOSpace data retrieval procedure summary ########## Dear user, your job has FAILED. Job ID: {self.jobId} Job type: {self.jobObj.type} Owner ID: {self.jobObj.ownerId} """ info = f""" ERROR: the job was terminated due to an error that occurred while retrieveing the data from the storage point. This issue will be automatically reported to the administrator. """ msg += info m.setMessage("VOSpace data retrieve notification: ERROR", msg) # Send e-mail notification m.setMessage("VOSpace data retrieve notification: Job COMPLETED", msg) m.send() except Exception: self.logger.exception(f"FATAL: unable to update the database, job ID: {self.jobId}") def cleanup(self): """ Cleanup method. """ self.logger.info("Cleanup...") self.fileList.clear() self.nodeList.clear() self.destPathList.clear() Loading @@ -316,24 +409,31 @@ class RetrieveExecutor(TaskExecutor): self.setDestinationQueueName("read_terminated") while True: self.wait() if self.srcQueue.len() > 0: try: srcQueueLen = self.srcQueue.len() destQueueLen = self.destQueue.len() except Exception: self.logger.exception("Cache error: failed to retrieve queue length.") else: if srcQueueLen > 0: self.jobObj = self.srcQueue.getJob() self.jobId = self.jobObj.jobId self.nodeList = self.jobObj.nodeList.copy() self.buildFileList() self.buildBlocks() result = self.retrieveData() if result: self.update() self.cleanup() if self.execute(): self.update("OK") # debug block... print(f"fileList = {self.fileList}") print(f"nodeList = {self.nodeList}") #print(f"fileList = {self.fileList}") #print(f"nodeList = {self.nodeList}") else: sys.exit("Failed to retrieve data!") if self.destQueue.len() == self.maxTerminatedJobs: self.update("ERROR") self.cleanup() try: if destQueueLen >= self.maxTerminatedJobs: self.destQueue.extractJob() self.destQueue.insertJob(self.jobObj) self.srcQueue.extractJob() except Exception: self.logger.exception(f"Failed to move job {self.jobObj.jobId} from '{self.srcQueue.name()}' to '{self.destQueue.name()}'") else: self.logger.info(f"Job {self.jobObj.jobId} MOVED from '{self.srcQueue.name()}' to '{self.destQueue.name()}'") Loading
transfer_service/retrieve_executor.py +284 −184 Original line number Diff line number Diff line Loading @@ -87,22 +87,39 @@ class RetrieveExecutor(TaskExecutor): def buildFileList(self): """ Generates the list of all files to retrieve. This method returns 'True' on success, 'False' on failure. """ try: try: self.dbConn.setPhase(self.jobId, "EXECUTING") self.dbConn.setStartTime(self.jobId) except Exception: self.logger.exception("FATAL: unable to update the file catalog.") return False else: self.logger.info("Job phase updated to EXECUTING.") self.logger.info("Building the list of the files to be retrieved...") # debug block... if os.path.exists("nodeList.txt"): os.remove("nodeList.txt") nl = open("nodeList.txt", 'w') for vospacePath in self.nodeList: nl.write(vospacePath + '\n') nl.close() #if os.path.exists("nodeList.txt"): # os.remove("nodeList.txt") #nl = open("nodeList.txt", 'w') #for vospacePath in self.nodeList: # nl.write(vospacePath + '\n') #nl.close() # Obtain the storage type try: self.storageType = self.dbConn.getOSPath(self.nodeList[0])["storageType"] except Exception: self.logger.exception("FATAL: unable to obtain the storage type.") return False for vospacePath in self.nodeList: try: nodeInfo = self.dbConn.getOSPath(vospacePath) except Exception: self.logger.exception(f"FATAL: unable to obtain the OS path for the VOSpace path '{vospacePath}'.") return False baseSrcPath = nodeInfo["baseSrcPath"] srcPath = nodeInfo["fullPath"] username = nodeInfo["username"] Loading Loading @@ -134,16 +151,24 @@ class RetrieveExecutor(TaskExecutor): self.fileList.append(fileInfo.copy()) # debug block... if os.path.exists("fileList.txt"): os.remove("fileList.txt") fl = open("fileList.txt", 'w') fl.write(json.dumps(self.fileList, indent = 4)) fl.close() #if os.path.exists("fileList.txt"): # os.remove("fileList.txt") #fl = open("fileList.txt", 'w') #fl.write(json.dumps(self.fileList, indent = 4)) #fl.close() except Exception: self.logger.exception("FATAL: something went wrong while building the list of the files to be retrieved.") return False else: return True def buildBlocks(self): """ Algorithm to split data in blocks of a well known size. This method returns 'True' on success, 'False' on failure. """ try: self.logger.info("Building the blocks data structure... ") if self.fileList: blockIdx = 0 blockSize = 0 Loading Loading @@ -180,15 +205,24 @@ class RetrieveExecutor(TaskExecutor): blockSize = fileSize if self.fileList: self.numBlocks = blockIdx + 1 try: self.dbConn.setTotalBlocks(self.jobId, self.numBlocks) except Exception: self.logger.exception("FATAL: unable to set the total number of blocks in the database.") return False # debug block... print(f"numBlocks = {self.numBlocks}") if os.path.exists("blocks.txt"): os.remove("blocks.txt") fl = open("blocks.txt", 'w') fl.write(json.dumps(self.fileList, indent = 4)) fl.close() #print(f"numBlocks = {self.numBlocks}") #if os.path.exists("blocks.txt"): # os.remove("blocks.txt") #fl = open("blocks.txt", 'w') #fl.write(json.dumps(self.fileList, indent = 4)) #fl.close() except Exception: self.logger.exception("FATAL: something went wrong while building the blocks data structure.") return False else: return True def retrieveCompleted(self, vospacePath): """ Loading @@ -201,7 +235,8 @@ class RetrieveExecutor(TaskExecutor): """ Retrieves data from a generic storage point (hot or cold). """ try: self.logger.info("Starting data retrieval...") # Loop on blocks for blockIdx in range(self.numBlocks): blockFileList = [ f for f in self.fileList if f["blockIdx"] == blockIdx ] Loading @@ -226,6 +261,7 @@ class RetrieveExecutor(TaskExecutor): os.makedirs(destDirPath, exist_ok = True) sp = subprocess.run(["rsync", "-av", srcPath, destDirPath], capture_output = True) if(sp.returncode or sp.stderr): self.logger.error(f"FATAL: error during the copy process, returnCode = {sp.returncode}, stderr: {sp.stderr}") return False # Remove files from file list at the end of the copy Loading @@ -238,7 +274,11 @@ class RetrieveExecutor(TaskExecutor): # flag for vospacePath in self.nodeList: if self.retrieveCompleted(vospacePath): try: self.dbConn.setAsyncTrans(vospacePath, False) except Exception: self.logger.exception("FATAL: unable to update the file catalog.") return False # Empty the tape library frontend if the storage type # is 'cold' Loading @@ -250,22 +290,48 @@ class RetrieveExecutor(TaskExecutor): blockFileList.clear() self.procBlocks += 1 self.dbConn.updateProcessedBlocks(self.jobId, self.procBlocks) except Exception: self.logger.exception("FATAL: something went wrong while retrieving the data.") return False else: return True def update(self): def execute(self): success = True self.logger.info("++++++++++ Start of execution phase ++++++++++") success &= self.buildFileList() & self.buildBlocks() & self.retrieveData() if success: self.logger.info("++++++++++ End of execution phase ++++++++++") return True else: self.logger.info("FATAL: something went wrong during the execution phase.") return False def update(self, status): """ Updates the job status and sends an email to the user. """ try: results = [{"target": ""}] results[0]["target"] = self.jobObj.jobInfo["transfer"]["target"] #self.dbConn.setResults(self.jobId, results) m = Mailer(self.logger) m.addRecipient(self.adminEmail) userEmail = self.dbConn.getUserEmail(self.jobObj.ownerId) if userEmail != self.adminEmail: m.addRecipient(userEmail) self.jobObj.setResults(results) if status == ("OK"): self.jobObj.setPhase("COMPLETED") self.dbConn.insertJob(self.jobObj) self.dbConn.setEndTime(self.jobId) self.jobObj.endTime = datetime.datetime.now().isoformat() self.logger.info("Job phase updated to COMPLETED.") # Add a list of physical destination paths for each VOSpace node in the node list self.logger.info("Generating physical destination paths for VOSpace nodes...") for vospacePath in self.nodeList: nodeInfo = self.dbConn.getOSPath(vospacePath) baseSrcPath = nodeInfo["baseSrcPath"] Loading @@ -276,13 +342,9 @@ class RetrieveExecutor(TaskExecutor): self.destPathList.append(destPath) self.jobObj.jobInfo["destPathList"] = self.destPathList.copy() m = Mailer(self.logger) m.addRecipient(self.adminEmail) userEmail = self.dbConn.getUserEmail(self.jobObj.ownerId) if userEmail != self.adminEmail: m.addRecipient(userEmail) msg = f""" ########## VOSpace data retrieval procedure summary ########## Dear user, your job has been COMPLETED. Loading @@ -293,15 +355,46 @@ class RetrieveExecutor(TaskExecutor): Your files are available and can be downloaded. """ m.setMessage("VOSpace data retrieve notification: COMPLETED", msg) else: self.jobObj.setPhase("ERROR") self.jobObj.setErrorType("fatal") self.jobObj.setErrorMessage("FATAL: something went wrong during the execution phase.") self.dbConn.insertJob(self.jobObj) self.dbConn.setEndTime(self.jobId) self.jobObj.endTime = datetime.datetime.now().isoformat() self.logger.info("Job phase updated to ERROR.") msg = f""" ########## VOSpace data retrieval procedure summary ########## Dear user, your job has FAILED. Job ID: {self.jobId} Job type: {self.jobObj.type} Owner ID: {self.jobObj.ownerId} """ info = f""" ERROR: the job was terminated due to an error that occurred while retrieveing the data from the storage point. This issue will be automatically reported to the administrator. """ msg += info m.setMessage("VOSpace data retrieve notification: ERROR", msg) # Send e-mail notification m.setMessage("VOSpace data retrieve notification: Job COMPLETED", msg) m.send() except Exception: self.logger.exception(f"FATAL: unable to update the database, job ID: {self.jobId}") def cleanup(self): """ Cleanup method. """ self.logger.info("Cleanup...") self.fileList.clear() self.nodeList.clear() self.destPathList.clear() Loading @@ -316,24 +409,31 @@ class RetrieveExecutor(TaskExecutor): self.setDestinationQueueName("read_terminated") while True: self.wait() if self.srcQueue.len() > 0: try: srcQueueLen = self.srcQueue.len() destQueueLen = self.destQueue.len() except Exception: self.logger.exception("Cache error: failed to retrieve queue length.") else: if srcQueueLen > 0: self.jobObj = self.srcQueue.getJob() self.jobId = self.jobObj.jobId self.nodeList = self.jobObj.nodeList.copy() self.buildFileList() self.buildBlocks() result = self.retrieveData() if result: self.update() self.cleanup() if self.execute(): self.update("OK") # debug block... print(f"fileList = {self.fileList}") print(f"nodeList = {self.nodeList}") #print(f"fileList = {self.fileList}") #print(f"nodeList = {self.nodeList}") else: sys.exit("Failed to retrieve data!") if self.destQueue.len() == self.maxTerminatedJobs: self.update("ERROR") self.cleanup() try: if destQueueLen >= self.maxTerminatedJobs: self.destQueue.extractJob() self.destQueue.insertJob(self.jobObj) self.srcQueue.extractJob() except Exception: self.logger.exception(f"Failed to move job {self.jobObj.jobId} from '{self.srcQueue.name()}' to '{self.destQueue.name()}'") else: self.logger.info(f"Job {self.jobObj.jobId} MOVED from '{self.srcQueue.name()}' to '{self.destQueue.name()}'")