Loading transfer_service/import_executor.py +194 −124 Original line number Diff line number Diff line Loading @@ -57,27 +57,31 @@ class ImportExecutor(TaskExecutor): self.pathPrefix = None self.storageId = None self.storageType = None self.nodeList = [] super(ImportExecutor, self).__init__() def importVOSpaceNodes(self): def execute(self): """This method performs the VOSpace import operation.""" try: self.logger.info("++++++++++ Start of import phase ++++++++++") self.dbConn.setPhase(self.jobId, "EXECUTING") self.dbConn.setStartTime(self.jobId) start = dt.now() nodeList = [] timestamp = dt.now().strftime("%Y_%m_%d-%H_%M_%S") nodeListFile = os.path.join(self.resDir, "vos_import_report-" + timestamp) nlfp = open(nodeListFile, "w") self.logger.info("Job phase updated to EXECUTING.") if self.storageType == "cold": self.tapeClient.connect() self.tapeClient.recallChecksumFiles(self.path) self.tapeClient.disconnect() self.logger.info(f"Recursive scan of '{os.path.dirname(self.path)}'") [ dirs, files ] = self.systemUtils.scanRecursive(os.path.dirname(self.path)) try: locationId = self.dbConn.getLocationId(self.storageId) except Exception: self.logger.exception("FATAL: unable to obtain the location ID for the storage point.") return False tstampWrapperDirPattern = re.compile("/[0-9]{4}_[0-9]{2}_[0-9]{2}-[0-9]{2}_[0-9]{2}_[0-9]{2}-vos_wrapper") for dir in dirs: if self.path in dir: Loading @@ -94,22 +98,25 @@ class ImportExecutor(TaskExecutor): else: vospacePath = parentPath + '/' + nodeName cnode.setParentPath(parentPath) locationId = self.dbConn.getLocationId(self.storageId) cnode.setLocationId(locationId) cnode.setCreatorId(self.userId) cnode.setContentLength(0) cnode.setAsyncTrans(True) cnode.setSticky(True) try: if os.path.islink(dir): now = dt.now().isoformat() nodeList.append([ now, dir, vospacePath, "container", "SYMLINK" ]) self.nodeList.append([ now, dir, vospacePath, "container", "SYMLINK" ]) elif self.dbConn.insertNode(cnode): now = dt.now().isoformat() nodeList.append([ now, dir, vospacePath, "container", "DONE" ]) self.nodeList.append([ now, dir, vospacePath, "container", "DONE" ]) else: now = dt.now().isoformat() nodeList.append([ now, dir, vospacePath, "container", "SKIPPED" ]) self.nodeList.append([ now, dir, vospacePath, "container", "SKIPPED" ]) except Exception: self.logger.exception("FATAL: unable to update the file catalog.") return False for flist in files: for file in flist: Loading @@ -123,8 +130,6 @@ class ImportExecutor(TaskExecutor): dnode.setWrapperDir(tstampWrapperDir) vospacePath = parentPath + '/' + nodeName dnode.setParentPath(parentPath) self.storageId = self.dbConn.getStorageId(self.pathPrefix) locationId = self.dbConn.getLocationId(self.storageId) dnode.setLocationId(locationId) dnode.setCreatorId(self.userId) dnode.setContentLength(os.path.getsize(file)) Loading @@ -132,32 +137,52 @@ class ImportExecutor(TaskExecutor): dnode.setAsyncTrans(True) dnode.setSticky(True) try: if os.path.islink(file): now = dt.now().isoformat() nodeList.append([ now, file, vospacePath, "data", "SYMLINK" ]) self.nodeList.append([ now, file, vospacePath, "data", "SYMLINK" ]) elif self.dbConn.insertNode(dnode): now = dt.now().isoformat() nodeList.append([ now, file, vospacePath, "data", "DONE" ]) self.nodeList.append([ now, file, vospacePath, "data", "DONE" ]) else: now = dt.now().isoformat() nodeList.append([ now, file, vospacePath, "data", "SKIPPED" ]) self.nodeList.append([ now, file, vospacePath, "data", "SKIPPED" ]) except Exception: self.logger.exception("FATAL: unable to update the file catalog.") return False except Exception: self.logger.exception("FATAL: something went wrong during the import phase.") return False else: self.logger.info("++++++++++ End of import phase ++++++++++") return True def update(self, status): try: results = [{"target": ""}] self.dbConn.setResults(self.jobId, results) m = Mailer(self.logger) m.addRecipient(self.adminEmail) nlfp.write(tabulate(nodeList, if status == "OK": timestamp = dt.now().strftime("%Y_%m_%d-%H_%M_%S") nodeListFile = os.path.join(self.resDir, "vos_import_report-" + timestamp) try: nlfp = open(nodeListFile, "w") except IOError: self.logger.exception("Unable to generate the 'vos_import_report'.") else: nlfp.write(tabulate(self.nodeList, headers = [ "Timestamp", "OS path", "VOSpace path", "Node type", "Result"], tablefmt = "simple")) nlfp.close() end = dt.now() # Update job status (to be moved) results = [{"target": ""}] self.dbConn.setResults(self.jobId, results) self.jobObj.setPhase("COMPLETED") self.dbConn.setPhase(self.jobId, "COMPLETED") self.dbConn.setEndTime(self.jobId) self.logger.info("Job phase updated to COMPLETED.") # Send e-mail notification m = Mailer(self.logger) m.addRecipient(self.adminEmail) msg = f""" ########## VOSpace import procedure summary ########## Loading @@ -166,16 +191,14 @@ class ImportExecutor(TaskExecutor): Storage type: {self.storageType} Storage ID: {self.storageId} Creator ID: {self.userId} Start time: {start} End time: {end} Processed nodes: {len(nodeList)} Imported nodes: {sum(res[-1] == 'DONE' for res in nodeList)} Skipped nodes: {sum(res[-1] == 'SKIPPED' for res in nodeList)} Symlinks detected: {sum(res[-1] == 'SYMLINK' for res in nodeList)} Processed nodes: {len(self.nodeList)} Imported nodes: {sum(res[-1] == 'DONE' for res in self.nodeList)} Skipped nodes: {sum(res[-1] == 'SKIPPED' for res in self.nodeList)} Symlinks detected: {sum(res[-1] == 'SYMLINK' for res in self.nodeList)} """ if len(nodeList) <= 10 ** 5: if len(self.nodeList) <= 10 ** 5: m.setMessageWithAttachment("VOSpace import notification", msg, nodeListFile) else: info = f""" Loading @@ -186,7 +209,41 @@ class ImportExecutor(TaskExecutor): """ msg += info m.setMessage("VOSpace import notification", msg) else: self.jobObj.setPhase("ERROR") self.jobObj.setErrorType("fatal") self.jobObj.setErrorMessage("FATAL: something went wrong during the import phase.") self.dbConn.insertJob(self.jobObj) self.dbConn.setEndTime(self.jobId) self.logger.info("Job phase updated to ERROR.") msg = f""" ########## VOSpace import procedure summary ########## Job ID: {self.jobId} Job type {self.jobObj.type} Storage type: {self.storageType} Storage ID: {self.storageId} Creator ID: {self.userId} """ info = f""" ERROR: the job was terminated due to an error that occurred while importing the VOSpace nodes in the file catalog. This issue will be automatically reported to the administrator. """ msg += info m.setMessage("VOSpace data storage notification: Job ERROR", msg) # Send e-mail notification m.send() except Exception: self.logger.exception(f"FATAL: unable to update the database, job ID: {self.jobId}") finally: self.nodeList.clear() def run(self): self.logger.info("Starting import executor...") Loading @@ -194,7 +251,13 @@ class ImportExecutor(TaskExecutor): self.setDestinationQueueName("import_terminated") while True: self.wait() if self.srcQueue.len() > 0: try: srcQueueLen = self.srcQueue.len() destQueueLen = self.destQueue.len() except Exception: self.logger.exception("Cache error: failed to retrieve queue length.") else: if srcQueueLen > 0: self.jobObj = self.srcQueue.getJob() self.jobId = self.jobObj.jobId self.userId = self.jobObj.ownerId Loading @@ -202,9 +265,16 @@ class ImportExecutor(TaskExecutor): self.pathPrefix = self.jobObj.jobInfo["pathPrefix"] self.storageId = self.jobObj.jobInfo["storageId"] self.storageType = self.jobObj.jobInfo["storageType"] self.importVOSpaceNodes() if self.destQueue.len() == self.maxTerminatedJobs: if self.execute(): self.update("OK") else: self.update("ERROR") try: if destQueueLen == self.maxTerminatedJobs: self.destQueue.extractJob() self.destQueue.insertJob(self.jobObj) self.srcQueue.extractJob() except Exception: self.logger.exception(f"Failed to move job {self.jobObj.jobId} from '{self.srcQueue.name()}' to '{self.destQueue.name()}'") else: self.logger.info(f"Job {self.jobObj.jobId} MOVED from '{self.srcQueue.name()}' to '{self.destQueue.name()}'") Loading
transfer_service/import_executor.py +194 −124 Original line number Diff line number Diff line Loading @@ -57,27 +57,31 @@ class ImportExecutor(TaskExecutor): self.pathPrefix = None self.storageId = None self.storageType = None self.nodeList = [] super(ImportExecutor, self).__init__() def importVOSpaceNodes(self): def execute(self): """This method performs the VOSpace import operation.""" try: self.logger.info("++++++++++ Start of import phase ++++++++++") self.dbConn.setPhase(self.jobId, "EXECUTING") self.dbConn.setStartTime(self.jobId) start = dt.now() nodeList = [] timestamp = dt.now().strftime("%Y_%m_%d-%H_%M_%S") nodeListFile = os.path.join(self.resDir, "vos_import_report-" + timestamp) nlfp = open(nodeListFile, "w") self.logger.info("Job phase updated to EXECUTING.") if self.storageType == "cold": self.tapeClient.connect() self.tapeClient.recallChecksumFiles(self.path) self.tapeClient.disconnect() self.logger.info(f"Recursive scan of '{os.path.dirname(self.path)}'") [ dirs, files ] = self.systemUtils.scanRecursive(os.path.dirname(self.path)) try: locationId = self.dbConn.getLocationId(self.storageId) except Exception: self.logger.exception("FATAL: unable to obtain the location ID for the storage point.") return False tstampWrapperDirPattern = re.compile("/[0-9]{4}_[0-9]{2}_[0-9]{2}-[0-9]{2}_[0-9]{2}_[0-9]{2}-vos_wrapper") for dir in dirs: if self.path in dir: Loading @@ -94,22 +98,25 @@ class ImportExecutor(TaskExecutor): else: vospacePath = parentPath + '/' + nodeName cnode.setParentPath(parentPath) locationId = self.dbConn.getLocationId(self.storageId) cnode.setLocationId(locationId) cnode.setCreatorId(self.userId) cnode.setContentLength(0) cnode.setAsyncTrans(True) cnode.setSticky(True) try: if os.path.islink(dir): now = dt.now().isoformat() nodeList.append([ now, dir, vospacePath, "container", "SYMLINK" ]) self.nodeList.append([ now, dir, vospacePath, "container", "SYMLINK" ]) elif self.dbConn.insertNode(cnode): now = dt.now().isoformat() nodeList.append([ now, dir, vospacePath, "container", "DONE" ]) self.nodeList.append([ now, dir, vospacePath, "container", "DONE" ]) else: now = dt.now().isoformat() nodeList.append([ now, dir, vospacePath, "container", "SKIPPED" ]) self.nodeList.append([ now, dir, vospacePath, "container", "SKIPPED" ]) except Exception: self.logger.exception("FATAL: unable to update the file catalog.") return False for flist in files: for file in flist: Loading @@ -123,8 +130,6 @@ class ImportExecutor(TaskExecutor): dnode.setWrapperDir(tstampWrapperDir) vospacePath = parentPath + '/' + nodeName dnode.setParentPath(parentPath) self.storageId = self.dbConn.getStorageId(self.pathPrefix) locationId = self.dbConn.getLocationId(self.storageId) dnode.setLocationId(locationId) dnode.setCreatorId(self.userId) dnode.setContentLength(os.path.getsize(file)) Loading @@ -132,32 +137,52 @@ class ImportExecutor(TaskExecutor): dnode.setAsyncTrans(True) dnode.setSticky(True) try: if os.path.islink(file): now = dt.now().isoformat() nodeList.append([ now, file, vospacePath, "data", "SYMLINK" ]) self.nodeList.append([ now, file, vospacePath, "data", "SYMLINK" ]) elif self.dbConn.insertNode(dnode): now = dt.now().isoformat() nodeList.append([ now, file, vospacePath, "data", "DONE" ]) self.nodeList.append([ now, file, vospacePath, "data", "DONE" ]) else: now = dt.now().isoformat() nodeList.append([ now, file, vospacePath, "data", "SKIPPED" ]) self.nodeList.append([ now, file, vospacePath, "data", "SKIPPED" ]) except Exception: self.logger.exception("FATAL: unable to update the file catalog.") return False except Exception: self.logger.exception("FATAL: something went wrong during the import phase.") return False else: self.logger.info("++++++++++ End of import phase ++++++++++") return True def update(self, status): try: results = [{"target": ""}] self.dbConn.setResults(self.jobId, results) m = Mailer(self.logger) m.addRecipient(self.adminEmail) nlfp.write(tabulate(nodeList, if status == "OK": timestamp = dt.now().strftime("%Y_%m_%d-%H_%M_%S") nodeListFile = os.path.join(self.resDir, "vos_import_report-" + timestamp) try: nlfp = open(nodeListFile, "w") except IOError: self.logger.exception("Unable to generate the 'vos_import_report'.") else: nlfp.write(tabulate(self.nodeList, headers = [ "Timestamp", "OS path", "VOSpace path", "Node type", "Result"], tablefmt = "simple")) nlfp.close() end = dt.now() # Update job status (to be moved) results = [{"target": ""}] self.dbConn.setResults(self.jobId, results) self.jobObj.setPhase("COMPLETED") self.dbConn.setPhase(self.jobId, "COMPLETED") self.dbConn.setEndTime(self.jobId) self.logger.info("Job phase updated to COMPLETED.") # Send e-mail notification m = Mailer(self.logger) m.addRecipient(self.adminEmail) msg = f""" ########## VOSpace import procedure summary ########## Loading @@ -166,16 +191,14 @@ class ImportExecutor(TaskExecutor): Storage type: {self.storageType} Storage ID: {self.storageId} Creator ID: {self.userId} Start time: {start} End time: {end} Processed nodes: {len(nodeList)} Imported nodes: {sum(res[-1] == 'DONE' for res in nodeList)} Skipped nodes: {sum(res[-1] == 'SKIPPED' for res in nodeList)} Symlinks detected: {sum(res[-1] == 'SYMLINK' for res in nodeList)} Processed nodes: {len(self.nodeList)} Imported nodes: {sum(res[-1] == 'DONE' for res in self.nodeList)} Skipped nodes: {sum(res[-1] == 'SKIPPED' for res in self.nodeList)} Symlinks detected: {sum(res[-1] == 'SYMLINK' for res in self.nodeList)} """ if len(nodeList) <= 10 ** 5: if len(self.nodeList) <= 10 ** 5: m.setMessageWithAttachment("VOSpace import notification", msg, nodeListFile) else: info = f""" Loading @@ -186,7 +209,41 @@ class ImportExecutor(TaskExecutor): """ msg += info m.setMessage("VOSpace import notification", msg) else: self.jobObj.setPhase("ERROR") self.jobObj.setErrorType("fatal") self.jobObj.setErrorMessage("FATAL: something went wrong during the import phase.") self.dbConn.insertJob(self.jobObj) self.dbConn.setEndTime(self.jobId) self.logger.info("Job phase updated to ERROR.") msg = f""" ########## VOSpace import procedure summary ########## Job ID: {self.jobId} Job type {self.jobObj.type} Storage type: {self.storageType} Storage ID: {self.storageId} Creator ID: {self.userId} """ info = f""" ERROR: the job was terminated due to an error that occurred while importing the VOSpace nodes in the file catalog. This issue will be automatically reported to the administrator. """ msg += info m.setMessage("VOSpace data storage notification: Job ERROR", msg) # Send e-mail notification m.send() except Exception: self.logger.exception(f"FATAL: unable to update the database, job ID: {self.jobId}") finally: self.nodeList.clear() def run(self): self.logger.info("Starting import executor...") Loading @@ -194,7 +251,13 @@ class ImportExecutor(TaskExecutor): self.setDestinationQueueName("import_terminated") while True: self.wait() if self.srcQueue.len() > 0: try: srcQueueLen = self.srcQueue.len() destQueueLen = self.destQueue.len() except Exception: self.logger.exception("Cache error: failed to retrieve queue length.") else: if srcQueueLen > 0: self.jobObj = self.srcQueue.getJob() self.jobId = self.jobObj.jobId self.userId = self.jobObj.ownerId Loading @@ -202,9 +265,16 @@ class ImportExecutor(TaskExecutor): self.pathPrefix = self.jobObj.jobInfo["pathPrefix"] self.storageId = self.jobObj.jobInfo["storageId"] self.storageType = self.jobObj.jobInfo["storageType"] self.importVOSpaceNodes() if self.destQueue.len() == self.maxTerminatedJobs: if self.execute(): self.update("OK") else: self.update("ERROR") try: if destQueueLen == self.maxTerminatedJobs: self.destQueue.extractJob() self.destQueue.insertJob(self.jobObj) self.srcQueue.extractJob() except Exception: self.logger.exception(f"Failed to move job {self.jobObj.jobId} from '{self.srcQueue.name()}' to '{self.destQueue.name()}'") else: self.logger.info(f"Job {self.jobObj.jobId} MOVED from '{self.srcQueue.name()}' to '{self.destQueue.name()}'")