Loading transfer_service/retrieve_executor.py +46 −1 Original line number Diff line number Diff line Loading @@ -28,6 +28,7 @@ import os import logging import subprocess import sys import tarfile from checksum import Checksum from config import Config Loading @@ -45,6 +46,8 @@ class RetrieveExecutor(TaskExecutor): self.type = "retrieve_executor" self.systemUtils = SystemUtils() config = Config("/etc/vos_ts/vos_ts.conf") params = config.loadSection("general") self.vospaceBaseUrl = params["vospace_base_url"] params = config.loadSection("transfer_node") self.storageRetrievePath = params["retrieve_path"] params = config.loadSection("transfer") Loading @@ -62,6 +65,7 @@ class RetrieveExecutor(TaskExecutor): redisLogHandler = RedisLogHandler() redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) self.resDir = params["res_dir"] params = config.loadSection("file_catalog") self.dbConn = DbConnector(params["user"], params["password"], Loading @@ -83,6 +87,8 @@ class RetrieveExecutor(TaskExecutor): self.jobId = None self.nodeList = [] self.fileList = [] self.urlList = [] self.urlListFileName = None self.destPathList = [] self.numBlocks = 0 self.procBlocks = 0 Loading Loading @@ -129,6 +135,16 @@ class RetrieveExecutor(TaskExecutor): except Exception: self.logger.exception(f"FATAL: unable to obtain the OS path for the VOSpace path '{vospacePath}'.") return False try: vospaceChildNodes = self.dbConn.getVOSpaceChildNodes(vospacePath, "data") if not vospaceChildNodes: self.urlList.append(self.vospaceBaseUrl + "/download" + vospacePath) else: for el in vospaceChildNodes: self.urlList.append(self.vospaceBaseUrl + "/download" + el["vospace_path"]) except Exception: self.logger.exception(f"FATAL: unable to obtain the VOSpace data nodes download URL list.") return False baseSrcPath = nodeInfo["baseSrcPath"] srcPath = nodeInfo["fullPath"] username = nodeInfo["username"] Loading Loading @@ -362,6 +378,23 @@ class RetrieveExecutor(TaskExecutor): self.dbConn.insertJob(self.jobObj) self.logger.info("Job phase updated to COMPLETED.") self.urlListFileName = "url_list-" + self.jobId urlListFilePath = os.path.join(self.resDir, self.urlListFileName) try: urlfp = open(urlListFilePath, "w") except IOError: self.logger.exception(f"Unable to generate {self.urlListFileName}") else: for url in self.urlList: urlfp.write(url + '\n') urlfp.close() try: tar = tarfile.open(f"{urlListFilePath}.tar.gz", "w:gz") tar.add(urlListFilePath, arcname = self.urlListFileName) tar.close() except tarfile.TarError: self.logger.exception(f"Unable to generate {self.urlListFileName}.tar.gz") msg = f""" ########## VOSpace data retrieval procedure summary ########## Loading @@ -373,9 +406,10 @@ class RetrieveExecutor(TaskExecutor): Owner ID: {self.jobObj.ownerId} Your files are available and can be downloaded. Here below is attached a file containing the URL list of all the VOSpace data nodes. """ m.setMessage("VOSpace data retrieve notification: COMPLETED", msg) m.setMessageWithAttachment("VOSpace data retrieve notification: COMPLETED", msg, urlListFilePath + ".tar.gz") else: self.jobObj.setPhase("ERROR") self.jobObj.setErrorType("fatal") Loading Loading @@ -416,11 +450,22 @@ class RetrieveExecutor(TaskExecutor): self.logger.info("Cleanup...") self.fileList.clear() self.nodeList.clear() self.urlList.clear() self.destPathList.clear() self.storageType = None self.numBlocks = 0 self.procBlocks = 0 self.totalSize = 0 urlListFilePath = os.path.join(self.resDir, self.urlListFileName) try: os.remove(urlListFilePath) except OSError: self.logger.exception(f"FATAL: unable to remove {self.urlListFileName}") try: os.remove(urlListFilePath + ".tar.gz") except OSError: self.logger.exception(f"FATAL: unable to remove {self.urlListFileName}.tar.gz") self.urlListFileName = None def run(self): self.logger.info("Starting retrieve executor...") Loading Loading
transfer_service/retrieve_executor.py +46 −1 Original line number Diff line number Diff line Loading @@ -28,6 +28,7 @@ import os import logging import subprocess import sys import tarfile from checksum import Checksum from config import Config Loading @@ -45,6 +46,8 @@ class RetrieveExecutor(TaskExecutor): self.type = "retrieve_executor" self.systemUtils = SystemUtils() config = Config("/etc/vos_ts/vos_ts.conf") params = config.loadSection("general") self.vospaceBaseUrl = params["vospace_base_url"] params = config.loadSection("transfer_node") self.storageRetrievePath = params["retrieve_path"] params = config.loadSection("transfer") Loading @@ -62,6 +65,7 @@ class RetrieveExecutor(TaskExecutor): redisLogHandler = RedisLogHandler() redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) self.resDir = params["res_dir"] params = config.loadSection("file_catalog") self.dbConn = DbConnector(params["user"], params["password"], Loading @@ -83,6 +87,8 @@ class RetrieveExecutor(TaskExecutor): self.jobId = None self.nodeList = [] self.fileList = [] self.urlList = [] self.urlListFileName = None self.destPathList = [] self.numBlocks = 0 self.procBlocks = 0 Loading Loading @@ -129,6 +135,16 @@ class RetrieveExecutor(TaskExecutor): except Exception: self.logger.exception(f"FATAL: unable to obtain the OS path for the VOSpace path '{vospacePath}'.") return False try: vospaceChildNodes = self.dbConn.getVOSpaceChildNodes(vospacePath, "data") if not vospaceChildNodes: self.urlList.append(self.vospaceBaseUrl + "/download" + vospacePath) else: for el in vospaceChildNodes: self.urlList.append(self.vospaceBaseUrl + "/download" + el["vospace_path"]) except Exception: self.logger.exception(f"FATAL: unable to obtain the VOSpace data nodes download URL list.") return False baseSrcPath = nodeInfo["baseSrcPath"] srcPath = nodeInfo["fullPath"] username = nodeInfo["username"] Loading Loading @@ -362,6 +378,23 @@ class RetrieveExecutor(TaskExecutor): self.dbConn.insertJob(self.jobObj) self.logger.info("Job phase updated to COMPLETED.") self.urlListFileName = "url_list-" + self.jobId urlListFilePath = os.path.join(self.resDir, self.urlListFileName) try: urlfp = open(urlListFilePath, "w") except IOError: self.logger.exception(f"Unable to generate {self.urlListFileName}") else: for url in self.urlList: urlfp.write(url + '\n') urlfp.close() try: tar = tarfile.open(f"{urlListFilePath}.tar.gz", "w:gz") tar.add(urlListFilePath, arcname = self.urlListFileName) tar.close() except tarfile.TarError: self.logger.exception(f"Unable to generate {self.urlListFileName}.tar.gz") msg = f""" ########## VOSpace data retrieval procedure summary ########## Loading @@ -373,9 +406,10 @@ class RetrieveExecutor(TaskExecutor): Owner ID: {self.jobObj.ownerId} Your files are available and can be downloaded. Here below is attached a file containing the URL list of all the VOSpace data nodes. """ m.setMessage("VOSpace data retrieve notification: COMPLETED", msg) m.setMessageWithAttachment("VOSpace data retrieve notification: COMPLETED", msg, urlListFilePath + ".tar.gz") else: self.jobObj.setPhase("ERROR") self.jobObj.setErrorType("fatal") Loading Loading @@ -416,11 +450,22 @@ class RetrieveExecutor(TaskExecutor): self.logger.info("Cleanup...") self.fileList.clear() self.nodeList.clear() self.urlList.clear() self.destPathList.clear() self.storageType = None self.numBlocks = 0 self.procBlocks = 0 self.totalSize = 0 urlListFilePath = os.path.join(self.resDir, self.urlListFileName) try: os.remove(urlListFilePath) except OSError: self.logger.exception(f"FATAL: unable to remove {self.urlListFileName}") try: os.remove(urlListFilePath + ".tar.gz") except OSError: self.logger.exception(f"FATAL: unable to remove {self.urlListFileName}.tar.gz") self.urlListFileName = None def run(self): self.logger.info("Starting retrieve executor...") Loading