Loading transfer_service/checksum.py +8 −4 Original line number Diff line number Diff line Loading @@ -52,9 +52,13 @@ class Checksum(object): def md5sum(self, filePath): """Calculates the MD5 checksum of a file.""" md5Hash = hashlib.md5() try: with open(filePath, "rb") as f: for chunk in iter(lambda: f.read(self.fileBufferSize), b""): md5Hash.update(chunk) except (FileNotFoundError, IOError): raise else: return md5Hash.hexdigest() def recursive(self, folderPath): Loading transfer_service/mailer.py +12 −10 Original line number Diff line number Diff line Loading @@ -45,16 +45,18 @@ class Mailer(object): def setMessageWithAttachment(self, subject, msg, filePath): """Set a message with attachment.""" self.setMessage(subject, msg) if os.path.exists(filePath): ctype, encoding = mimetypes.guess_type(filePath) if ctype is None or encoding is not None: ctype = "application/octet-stream" maintype, subtype = ctype.split('/', 1) with open(filePath, "rb") as fp: self.message.add_attachment(fp.read(), maintype = maintype, subtype = subtype, filename = os.path.basename(filePath)) else: self.logger.error("File not found: the e-mail message will be sent without the attachment.") def send(self): """Send email message.""" Loading transfer_service/store_executor.py +73 −59 Original line number Diff line number Diff line Loading @@ -122,24 +122,8 @@ class StoreExecutor(TaskExecutor): self.logger.info("++++++++++ End of execution phase ++++++++++") return True def cleanup(self): srcPathPrefix = self.storageStorePath.replace("{username}", self.username) srcData = os.listdir(srcPathPrefix) for el in srcData: nodeOSPath = srcPathPrefix + '/' + el if os.path.isdir(nodeOSPath): shutil.rmtree(nodeOSPath) elif os.path.isfile(nodeOSPath): os.remove(nodeOSPath) else: sys.exit("Unable to remove file/dir on the transfer node!!!") userInfo = self.systemUtils.userInfo(self.username) uid = userInfo[1] gid = userInfo[2] os.chown(srcPathPrefix, uid, gid) os.chmod(srcPathPrefix, 0o755) def update(self, status): try: results = [{"target": ""}] self.dbConn.setResults(self.jobId, results) Loading @@ -151,6 +135,7 @@ class StoreExecutor(TaskExecutor): m.addRecipient(self.userEmail) if status == "OK": self.logger.info("Updating VOSpace nodes status...") for el in self.nodeList: nodeVOSPath = el[2] self.dbConn.setAsyncTrans(nodeVOSPath, True) Loading @@ -158,12 +143,18 @@ class StoreExecutor(TaskExecutor): timestamp = dt.now().strftime("%Y_%m_%d-%H_%M_%S") nodeListFile = os.path.join(self.resDir, "vos_data_report-" + timestamp) try: nlfp = open(nodeListFile, "w") except IOError: self.logger.exception("Unable to generate the 'vos_data_report'.") else: nlfp.write(tabulate(self.nodeList, headers = [ "Timestamp", "OS path", "VOSpace path", "Node type", "Result"], tablefmt = "simple")) finally: nlfp.close() self.logger.info("Updating job phase to COMPLETED...") self.jobObj.setPhase("COMPLETED") self.dbConn.setPhase(self.jobId, "COMPLETED") self.dbConn.setEndTime(self.jobId) Loading Loading @@ -196,6 +187,7 @@ class StoreExecutor(TaskExecutor): m.setMessage("VOSpace data storage notification", msg) m.send() else: self.logger.info("Updating job phase to ERROR") self.jobObj.setPhase("ERROR") self.dbConn.setPhase(self.jobId, "ERROR") self.dbConn.setEndTime(self.jobId) Loading @@ -220,8 +212,30 @@ class StoreExecutor(TaskExecutor): """ msg += info m.setMessage("VOSpace data storage notification: Job ERROR", msg) except Exception: self.logger.exception(f"FATAL: unable to update the database, job ID: {self.jobId}") finally: self.nodeList.clear() def cleanup(self): try: srcPathPrefix = self.storageStorePath.replace("{username}", self.username) self.logger.info(f"Cleanup of {srcPathPrefix}...") srcData = os.listdir(srcPathPrefix) for el in srcData: nodeOSPath = srcPathPrefix + '/' + el if os.path.isdir(nodeOSPath): shutil.rmtree(nodeOSPath) else: os.remove(nodeOSPath) userInfo = self.systemUtils.userInfo(self.username) uid = userInfo[1] gid = userInfo[2] os.chown(srcPathPrefix, uid, gid) os.chmod(srcPathPrefix, 0o755) except Exception: self.logger.exception(f"Unable to perform the cleanup of {srcPathPrefix}.") def run(self): self.logger.info("Starting store executor...") self.setSourceQueueName("write_ready") Loading Loading
transfer_service/checksum.py +8 −4 Original line number Diff line number Diff line Loading @@ -52,9 +52,13 @@ class Checksum(object): def md5sum(self, filePath): """Calculates the MD5 checksum of a file.""" md5Hash = hashlib.md5() try: with open(filePath, "rb") as f: for chunk in iter(lambda: f.read(self.fileBufferSize), b""): md5Hash.update(chunk) except (FileNotFoundError, IOError): raise else: return md5Hash.hexdigest() def recursive(self, folderPath): Loading
transfer_service/mailer.py +12 −10 Original line number Diff line number Diff line Loading @@ -45,16 +45,18 @@ class Mailer(object): def setMessageWithAttachment(self, subject, msg, filePath): """Set a message with attachment.""" self.setMessage(subject, msg) if os.path.exists(filePath): ctype, encoding = mimetypes.guess_type(filePath) if ctype is None or encoding is not None: ctype = "application/octet-stream" maintype, subtype = ctype.split('/', 1) with open(filePath, "rb") as fp: self.message.add_attachment(fp.read(), maintype = maintype, subtype = subtype, filename = os.path.basename(filePath)) else: self.logger.error("File not found: the e-mail message will be sent without the attachment.") def send(self): """Send email message.""" Loading
transfer_service/store_executor.py +73 −59 Original line number Diff line number Diff line Loading @@ -122,24 +122,8 @@ class StoreExecutor(TaskExecutor): self.logger.info("++++++++++ End of execution phase ++++++++++") return True def cleanup(self): srcPathPrefix = self.storageStorePath.replace("{username}", self.username) srcData = os.listdir(srcPathPrefix) for el in srcData: nodeOSPath = srcPathPrefix + '/' + el if os.path.isdir(nodeOSPath): shutil.rmtree(nodeOSPath) elif os.path.isfile(nodeOSPath): os.remove(nodeOSPath) else: sys.exit("Unable to remove file/dir on the transfer node!!!") userInfo = self.systemUtils.userInfo(self.username) uid = userInfo[1] gid = userInfo[2] os.chown(srcPathPrefix, uid, gid) os.chmod(srcPathPrefix, 0o755) def update(self, status): try: results = [{"target": ""}] self.dbConn.setResults(self.jobId, results) Loading @@ -151,6 +135,7 @@ class StoreExecutor(TaskExecutor): m.addRecipient(self.userEmail) if status == "OK": self.logger.info("Updating VOSpace nodes status...") for el in self.nodeList: nodeVOSPath = el[2] self.dbConn.setAsyncTrans(nodeVOSPath, True) Loading @@ -158,12 +143,18 @@ class StoreExecutor(TaskExecutor): timestamp = dt.now().strftime("%Y_%m_%d-%H_%M_%S") nodeListFile = os.path.join(self.resDir, "vos_data_report-" + timestamp) try: nlfp = open(nodeListFile, "w") except IOError: self.logger.exception("Unable to generate the 'vos_data_report'.") else: nlfp.write(tabulate(self.nodeList, headers = [ "Timestamp", "OS path", "VOSpace path", "Node type", "Result"], tablefmt = "simple")) finally: nlfp.close() self.logger.info("Updating job phase to COMPLETED...") self.jobObj.setPhase("COMPLETED") self.dbConn.setPhase(self.jobId, "COMPLETED") self.dbConn.setEndTime(self.jobId) Loading Loading @@ -196,6 +187,7 @@ class StoreExecutor(TaskExecutor): m.setMessage("VOSpace data storage notification", msg) m.send() else: self.logger.info("Updating job phase to ERROR") self.jobObj.setPhase("ERROR") self.dbConn.setPhase(self.jobId, "ERROR") self.dbConn.setEndTime(self.jobId) Loading @@ -220,8 +212,30 @@ class StoreExecutor(TaskExecutor): """ msg += info m.setMessage("VOSpace data storage notification: Job ERROR", msg) except Exception: self.logger.exception(f"FATAL: unable to update the database, job ID: {self.jobId}") finally: self.nodeList.clear() def cleanup(self): try: srcPathPrefix = self.storageStorePath.replace("{username}", self.username) self.logger.info(f"Cleanup of {srcPathPrefix}...") srcData = os.listdir(srcPathPrefix) for el in srcData: nodeOSPath = srcPathPrefix + '/' + el if os.path.isdir(nodeOSPath): shutil.rmtree(nodeOSPath) else: os.remove(nodeOSPath) userInfo = self.systemUtils.userInfo(self.username) uid = userInfo[1] gid = userInfo[2] os.chown(srcPathPrefix, uid, gid) os.chmod(srcPathPrefix, 0o755) except Exception: self.logger.exception(f"Unable to perform the cleanup of {srcPathPrefix}.") def run(self): self.logger.info("Starting store executor...") self.setSourceQueueName("write_ready") Loading