Loading transfer_service/db_connector.py +10 −7 Original line number Diff line number Diff line Loading @@ -427,7 +427,8 @@ class DbConnector(object): 'pullToVoSpace', 'pushToVoSpace', 'vos_data', 'vos_group', 'vos_grpr', 'vos_grpw', 'vos_import') ORDER BY creation_time DESC; """) Loading Loading @@ -467,7 +468,8 @@ class DbConnector(object): 'pullToVoSpace', 'pushToVoSpace', 'vos_data', 'vos_group', 'vos_grpr', 'vos_grpw', 'vos_import') ORDER BY creation_time DESC; """, Loading @@ -487,7 +489,8 @@ class DbConnector(object): 'pullToVoSpace', 'pushToVoSpace', 'vos_data', 'vos_group', 'vos_grpr', 'vos_grpw', 'vos_import') ORDER BY creation_time DESC; """, Loading transfer_service/group_rw_executor.py +7 −5 Original line number Diff line number Diff line Loading @@ -5,6 +5,7 @@ # SPDX-License-Identifier: GPL-3.0-or-later # import datetime import logging import os Loading Loading @@ -57,8 +58,9 @@ class GroupRwExecutor(TaskExecutor): """This method adds/removes groups to/from 'group_read' and 'group_write'.""" try: self.logger.info("++++++++++ Start of execution phase ++++++++++") self.dbConn.setPhase(self.jobId, "EXECUTING") self.dbConn.setStartTime(self.jobId) self.jobObj.setPhase("EXECUTING") self.jobObj.setStartTime(datetime.datetime.now().isoformat()) self.dbConn.insertJob(self.jobObj) if self.requestType == "GRPR_ADD": self.logger.info(f"User command: vos_group read add {self.groupName[0]} {self.vospacePath}") Loading Loading @@ -92,8 +94,8 @@ class GroupRwExecutor(TaskExecutor): if status == "OK": self.jobObj.setPhase("COMPLETED") self.dbConn.setPhase(self.jobId, "COMPLETED") self.dbConn.setEndTime(self.jobId) self.jobObj.setEndTime(datetime.datetime.now().isoformat()) self.dbConn.insertJob(self.jobObj) self.logger.info("Job phase updated to COMPLETED.") if self.requestType == "GRPR_ADD": Loading @@ -117,8 +119,8 @@ Removed '{self.realGroupName}' from 'group_write' for {self.vospacePath} and any self.jobObj.setPhase("ERROR") self.jobObj.setErrorType("fatal") self.jobObj.setErrorMessage(msg) self.jobObj.setEndTime(datetime.datetime.now().isoformat()) self.dbConn.insertJob(self.jobObj) self.dbConn.setEndTime(self.jobId) self.logger.info("Job phase updated to ERROR.") # Send e-mail notification m.setMessage("VOSpace group_rw notification", msg) Loading transfer_service/import_executor.py +10 −13 Original line number Diff line number Diff line Loading @@ -5,13 +5,13 @@ # SPDX-License-Identifier: GPL-3.0-or-later # import datetime import logging import os import re from config import Config from checksum import Checksum from datetime import datetime as dt from db_connector import DbConnector from mailer import Mailer from node import Node Loading Loading @@ -69,8 +69,9 @@ class ImportExecutor(TaskExecutor): """This method performs the VOSpace import operation.""" try: self.logger.info("++++++++++ Start of import phase ++++++++++") self.dbConn.setPhase(self.jobId, "EXECUTING") self.dbConn.setStartTime(self.jobId) self.jobObj.setPhase("EXECUTING") self.jobObj.setStartTime(datetime.datetime.now().isoformat()) self.dbConn.insertJob(self.jobObj) self.logger.info("Job phase updated to EXECUTING.") if self.storageType == "cold": Loading Loading @@ -110,14 +111,12 @@ class ImportExecutor(TaskExecutor): cnode.setSticky(True) try: now = datetime.datetime.now().isoformat() if os.path.islink(dir): now = dt.now().isoformat() self.nodeList.append([ now, dir, vospacePath, "container", "SYMLINK" ]) elif self.dbConn.insertNode(cnode): now = dt.now().isoformat() self.nodeList.append([ now, dir, vospacePath, "container", "DONE" ]) else: now = dt.now().isoformat() self.nodeList.append([ now, dir, vospacePath, "container", "SKIPPED" ]) except Exception: self.logger.exception("FATAL: unable to update the file catalog.") Loading @@ -143,14 +142,12 @@ class ImportExecutor(TaskExecutor): dnode.setSticky(True) try: now = datetime.datetime.now().isoformat() if os.path.islink(file): now = dt.now().isoformat() self.nodeList.append([ now, file, vospacePath, "data", "SYMLINK" ]) elif self.dbConn.insertNode(dnode): now = dt.now().isoformat() self.nodeList.append([ now, file, vospacePath, "data", "DONE" ]) else: now = dt.now().isoformat() self.nodeList.append([ now, file, vospacePath, "data", "SKIPPED" ]) except Exception: self.logger.exception("FATAL: unable to update the file catalog.") Loading @@ -171,7 +168,7 @@ class ImportExecutor(TaskExecutor): m.addRecipient(self.adminEmail) if status == "OK": timestamp = dt.now().strftime("%Y_%m_%d-%H_%M_%S") timestamp = datetime.datetime.now().strftime("%Y_%m_%d-%H_%M_%S") nodeListFile = os.path.join(self.resDir, "vos_import_report-" + timestamp) try: nlfp = open(nodeListFile, "w") Loading @@ -184,8 +181,8 @@ class ImportExecutor(TaskExecutor): nlfp.close() self.jobObj.setPhase("COMPLETED") self.dbConn.setPhase(self.jobId, "COMPLETED") self.dbConn.setEndTime(self.jobId) self.jobObj.setEndTime(datetime.datetime.now().isoformat()) self.dbConn.insertJob(self.jobObj) self.logger.info("Job phase updated to COMPLETED.") msg = f""" Loading Loading @@ -218,8 +215,8 @@ class ImportExecutor(TaskExecutor): self.jobObj.setPhase("ERROR") self.jobObj.setErrorType("fatal") self.jobObj.setErrorMessage("FATAL: something went wrong during the import phase.") self.jobObj.setEndTime(datetime.datetime.now().isoformat()) self.dbConn.insertJob(self.jobObj) self.dbConn.setEndTime(self.jobId) self.logger.info("Job phase updated to ERROR.") msg = f""" Loading transfer_service/retrieve_executor.py +5 −6 Original line number Diff line number Diff line Loading @@ -95,8 +95,9 @@ class RetrieveExecutor(TaskExecutor): """ try: try: self.dbConn.setPhase(self.jobId, "EXECUTING") self.dbConn.setStartTime(self.jobId) self.jobObj.setPhase("EXECUTING") self.jobObj.setStartTime(datetime.datetime.now().isoformat()) self.dbConn.insertJob(self.jobObj) except Exception: self.logger.exception("FATAL: unable to update the file catalog.") return False Loading Loading @@ -329,9 +330,8 @@ class RetrieveExecutor(TaskExecutor): if status == ("OK"): self.jobObj.setPhase("COMPLETED") self.jobObj.setEndTime(datetime.datetime.now().isoformat()) self.dbConn.insertJob(self.jobObj) self.dbConn.setEndTime(self.jobId) self.jobObj.endTime = datetime.datetime.now().isoformat() self.logger.info("Job phase updated to COMPLETED.") # Add a list of physical destination paths for each VOSpace node in the node list Loading Loading @@ -364,9 +364,8 @@ class RetrieveExecutor(TaskExecutor): self.jobObj.setPhase("ERROR") self.jobObj.setErrorType("fatal") self.jobObj.setErrorMessage("FATAL: something went wrong during the execution phase.") self.jobObj.setEndTime(datetime.datetime.now().isoformat()) self.dbConn.insertJob(self.jobObj) self.dbConn.setEndTime(self.jobId) self.jobObj.endTime = datetime.datetime.now().isoformat() self.logger.info("Job phase updated to ERROR.") msg = f""" Loading transfer_service/store_executor.py +8 −6 Original line number Diff line number Diff line Loading @@ -5,8 +5,9 @@ # SPDX-License-Identifier: GPL-3.0-or-later # import os import datetime import logging import os import shutil import subprocess import sys Loading Loading @@ -81,8 +82,9 @@ class StoreExecutor(TaskExecutor): self.logger.info("++++++++++ Start of execution phase ++++++++++") self.logger.info(f"Data size: {self.dataSize} B") try: self.dbConn.setPhase(self.jobId, "EXECUTING") self.dbConn.setStartTime(self.jobId) self.jobObj.setPhase("EXECUTING") self.jobObj.setStartTime(datetime.datetime.now().isoformat()) self.dbConn.insertJob(self.jobObj) except Exception: self.logger.exception("FATAL: unable to update the file catalog.") return False Loading Loading @@ -157,8 +159,8 @@ class StoreExecutor(TaskExecutor): nlfp.close() self.jobObj.setPhase("COMPLETED") self.dbConn.setPhase(self.jobId, "COMPLETED") self.dbConn.setEndTime(self.jobId) self.jobObj.setEndTime(datetime.datetime.now().isoformat()) self.dbConn.insertJob(self.jobObj) self.logger.info("Job phase updated to COMPLETED.") msg = f""" Loading Loading @@ -191,8 +193,8 @@ class StoreExecutor(TaskExecutor): self.jobObj.setPhase("ERROR") self.jobObj.setErrorType("fatal") self.jobObj.setErrorMessage("FATAL: something went wrong during the execution phase.") self.jobObj.setEndTime(datetime.datetime.now().isoformat()) self.dbConn.insertJob(self.jobObj) self.dbConn.setEndTime(self.jobId) self.logger.info("Job phase updated to ERROR.") msg = f""" Loading Loading
transfer_service/db_connector.py +10 −7 Original line number Diff line number Diff line Loading @@ -427,7 +427,8 @@ class DbConnector(object): 'pullToVoSpace', 'pushToVoSpace', 'vos_data', 'vos_group', 'vos_grpr', 'vos_grpw', 'vos_import') ORDER BY creation_time DESC; """) Loading Loading @@ -467,7 +468,8 @@ class DbConnector(object): 'pullToVoSpace', 'pushToVoSpace', 'vos_data', 'vos_group', 'vos_grpr', 'vos_grpw', 'vos_import') ORDER BY creation_time DESC; """, Loading @@ -487,7 +489,8 @@ class DbConnector(object): 'pullToVoSpace', 'pushToVoSpace', 'vos_data', 'vos_group', 'vos_grpr', 'vos_grpw', 'vos_import') ORDER BY creation_time DESC; """, Loading
transfer_service/group_rw_executor.py +7 −5 Original line number Diff line number Diff line Loading @@ -5,6 +5,7 @@ # SPDX-License-Identifier: GPL-3.0-or-later # import datetime import logging import os Loading Loading @@ -57,8 +58,9 @@ class GroupRwExecutor(TaskExecutor): """This method adds/removes groups to/from 'group_read' and 'group_write'.""" try: self.logger.info("++++++++++ Start of execution phase ++++++++++") self.dbConn.setPhase(self.jobId, "EXECUTING") self.dbConn.setStartTime(self.jobId) self.jobObj.setPhase("EXECUTING") self.jobObj.setStartTime(datetime.datetime.now().isoformat()) self.dbConn.insertJob(self.jobObj) if self.requestType == "GRPR_ADD": self.logger.info(f"User command: vos_group read add {self.groupName[0]} {self.vospacePath}") Loading Loading @@ -92,8 +94,8 @@ class GroupRwExecutor(TaskExecutor): if status == "OK": self.jobObj.setPhase("COMPLETED") self.dbConn.setPhase(self.jobId, "COMPLETED") self.dbConn.setEndTime(self.jobId) self.jobObj.setEndTime(datetime.datetime.now().isoformat()) self.dbConn.insertJob(self.jobObj) self.logger.info("Job phase updated to COMPLETED.") if self.requestType == "GRPR_ADD": Loading @@ -117,8 +119,8 @@ Removed '{self.realGroupName}' from 'group_write' for {self.vospacePath} and any self.jobObj.setPhase("ERROR") self.jobObj.setErrorType("fatal") self.jobObj.setErrorMessage(msg) self.jobObj.setEndTime(datetime.datetime.now().isoformat()) self.dbConn.insertJob(self.jobObj) self.dbConn.setEndTime(self.jobId) self.logger.info("Job phase updated to ERROR.") # Send e-mail notification m.setMessage("VOSpace group_rw notification", msg) Loading
transfer_service/import_executor.py +10 −13 Original line number Diff line number Diff line Loading @@ -5,13 +5,13 @@ # SPDX-License-Identifier: GPL-3.0-or-later # import datetime import logging import os import re from config import Config from checksum import Checksum from datetime import datetime as dt from db_connector import DbConnector from mailer import Mailer from node import Node Loading Loading @@ -69,8 +69,9 @@ class ImportExecutor(TaskExecutor): """This method performs the VOSpace import operation.""" try: self.logger.info("++++++++++ Start of import phase ++++++++++") self.dbConn.setPhase(self.jobId, "EXECUTING") self.dbConn.setStartTime(self.jobId) self.jobObj.setPhase("EXECUTING") self.jobObj.setStartTime(datetime.datetime.now().isoformat()) self.dbConn.insertJob(self.jobObj) self.logger.info("Job phase updated to EXECUTING.") if self.storageType == "cold": Loading Loading @@ -110,14 +111,12 @@ class ImportExecutor(TaskExecutor): cnode.setSticky(True) try: now = datetime.datetime.now().isoformat() if os.path.islink(dir): now = dt.now().isoformat() self.nodeList.append([ now, dir, vospacePath, "container", "SYMLINK" ]) elif self.dbConn.insertNode(cnode): now = dt.now().isoformat() self.nodeList.append([ now, dir, vospacePath, "container", "DONE" ]) else: now = dt.now().isoformat() self.nodeList.append([ now, dir, vospacePath, "container", "SKIPPED" ]) except Exception: self.logger.exception("FATAL: unable to update the file catalog.") Loading @@ -143,14 +142,12 @@ class ImportExecutor(TaskExecutor): dnode.setSticky(True) try: now = datetime.datetime.now().isoformat() if os.path.islink(file): now = dt.now().isoformat() self.nodeList.append([ now, file, vospacePath, "data", "SYMLINK" ]) elif self.dbConn.insertNode(dnode): now = dt.now().isoformat() self.nodeList.append([ now, file, vospacePath, "data", "DONE" ]) else: now = dt.now().isoformat() self.nodeList.append([ now, file, vospacePath, "data", "SKIPPED" ]) except Exception: self.logger.exception("FATAL: unable to update the file catalog.") Loading @@ -171,7 +168,7 @@ class ImportExecutor(TaskExecutor): m.addRecipient(self.adminEmail) if status == "OK": timestamp = dt.now().strftime("%Y_%m_%d-%H_%M_%S") timestamp = datetime.datetime.now().strftime("%Y_%m_%d-%H_%M_%S") nodeListFile = os.path.join(self.resDir, "vos_import_report-" + timestamp) try: nlfp = open(nodeListFile, "w") Loading @@ -184,8 +181,8 @@ class ImportExecutor(TaskExecutor): nlfp.close() self.jobObj.setPhase("COMPLETED") self.dbConn.setPhase(self.jobId, "COMPLETED") self.dbConn.setEndTime(self.jobId) self.jobObj.setEndTime(datetime.datetime.now().isoformat()) self.dbConn.insertJob(self.jobObj) self.logger.info("Job phase updated to COMPLETED.") msg = f""" Loading Loading @@ -218,8 +215,8 @@ class ImportExecutor(TaskExecutor): self.jobObj.setPhase("ERROR") self.jobObj.setErrorType("fatal") self.jobObj.setErrorMessage("FATAL: something went wrong during the import phase.") self.jobObj.setEndTime(datetime.datetime.now().isoformat()) self.dbConn.insertJob(self.jobObj) self.dbConn.setEndTime(self.jobId) self.logger.info("Job phase updated to ERROR.") msg = f""" Loading
transfer_service/retrieve_executor.py +5 −6 Original line number Diff line number Diff line Loading @@ -95,8 +95,9 @@ class RetrieveExecutor(TaskExecutor): """ try: try: self.dbConn.setPhase(self.jobId, "EXECUTING") self.dbConn.setStartTime(self.jobId) self.jobObj.setPhase("EXECUTING") self.jobObj.setStartTime(datetime.datetime.now().isoformat()) self.dbConn.insertJob(self.jobObj) except Exception: self.logger.exception("FATAL: unable to update the file catalog.") return False Loading Loading @@ -329,9 +330,8 @@ class RetrieveExecutor(TaskExecutor): if status == ("OK"): self.jobObj.setPhase("COMPLETED") self.jobObj.setEndTime(datetime.datetime.now().isoformat()) self.dbConn.insertJob(self.jobObj) self.dbConn.setEndTime(self.jobId) self.jobObj.endTime = datetime.datetime.now().isoformat() self.logger.info("Job phase updated to COMPLETED.") # Add a list of physical destination paths for each VOSpace node in the node list Loading Loading @@ -364,9 +364,8 @@ class RetrieveExecutor(TaskExecutor): self.jobObj.setPhase("ERROR") self.jobObj.setErrorType("fatal") self.jobObj.setErrorMessage("FATAL: something went wrong during the execution phase.") self.jobObj.setEndTime(datetime.datetime.now().isoformat()) self.dbConn.insertJob(self.jobObj) self.dbConn.setEndTime(self.jobId) self.jobObj.endTime = datetime.datetime.now().isoformat() self.logger.info("Job phase updated to ERROR.") msg = f""" Loading
transfer_service/store_executor.py +8 −6 Original line number Diff line number Diff line Loading @@ -5,8 +5,9 @@ # SPDX-License-Identifier: GPL-3.0-or-later # import os import datetime import logging import os import shutil import subprocess import sys Loading Loading @@ -81,8 +82,9 @@ class StoreExecutor(TaskExecutor): self.logger.info("++++++++++ Start of execution phase ++++++++++") self.logger.info(f"Data size: {self.dataSize} B") try: self.dbConn.setPhase(self.jobId, "EXECUTING") self.dbConn.setStartTime(self.jobId) self.jobObj.setPhase("EXECUTING") self.jobObj.setStartTime(datetime.datetime.now().isoformat()) self.dbConn.insertJob(self.jobObj) except Exception: self.logger.exception("FATAL: unable to update the file catalog.") return False Loading Loading @@ -157,8 +159,8 @@ class StoreExecutor(TaskExecutor): nlfp.close() self.jobObj.setPhase("COMPLETED") self.dbConn.setPhase(self.jobId, "COMPLETED") self.dbConn.setEndTime(self.jobId) self.jobObj.setEndTime(datetime.datetime.now().isoformat()) self.dbConn.insertJob(self.jobObj) self.logger.info("Job phase updated to COMPLETED.") msg = f""" Loading Loading @@ -191,8 +193,8 @@ class StoreExecutor(TaskExecutor): self.jobObj.setPhase("ERROR") self.jobObj.setErrorType("fatal") self.jobObj.setErrorMessage("FATAL: something went wrong during the execution phase.") self.jobObj.setEndTime(datetime.datetime.now().isoformat()) self.dbConn.insertJob(self.jobObj) self.dbConn.setEndTime(self.jobId) self.logger.info("Job phase updated to ERROR.") msg = f""" Loading