Loading transfer_service/exceptions.py +2 −2 Original line number Diff line number Diff line Loading @@ -29,8 +29,8 @@ class MultipleUsersException(Error): # SystemUtils exceptions class IllegalCharacterException(Error): def __init__(self, name): self.message = "Illegal character found in: " + name def __init__(self): self.message = "Illegal characters found in file/dir names." super(IllegalCharacterException, self).__init__(self.message) Loading transfer_service/import_executor.py +19 −0 Original line number Diff line number Diff line Loading @@ -13,6 +13,7 @@ import re from config import Config from checksum import Checksum from db_connector import DbConnector from exceptions import IllegalCharacterException from mailer import Mailer from node import Node from redis_log_handler import RedisLogHandler Loading Loading @@ -64,6 +65,7 @@ class ImportExecutor(TaskExecutor): self.storageId = None self.storageType = None self.nodeList = [] self.invalidFileAndDirNames = [] super(ImportExecutor, self).__init__() def execute(self): Loading @@ -84,6 +86,22 @@ class ImportExecutor(TaskExecutor): self.tapeClient.recallChecksumFiles(self.path) self.tapeClient.disconnect() self.logger.info(f"Checking for invalid file/dir names in '{self.path}'") self.invalidFileAndDirNames = self.systemUtils.findInvalidFileAndDirNames(self.path) if self.invalidFileAndDirNames: self.logger.warning("Found invalid file/dir names") reportFile = os.path.join(self.resDir, "vos_import_report-" + self.jobId) try: rfp = open(reportFile, "w") except IOError: self.logger.exception("Unable to generate the 'vos_import_report'.") else: rfp.write(tabulate(self.invalidFileAndDirNames, headers = [ "Path list of invalid file/dir names" ], tablefmt = "simple")) rfp.close() raise IllegalCharacterException self.logger.info(f"Recursive scan of '{os.path.dirname(self.path)}'") [ dirs, files ] = self.systemUtils.scanRecursive(os.path.dirname(self.path)) Loading Loading @@ -264,6 +282,7 @@ class ImportExecutor(TaskExecutor): self.logger.exception(f"FATAL: unable to update the database, job ID: {self.jobId}") finally: self.nodeList.clear() self.invalidFileAndDirNames.clear() def run(self): Loading transfer_service/store_preprocessor.py +45 −7 Original line number Diff line number Diff line Loading @@ -14,8 +14,11 @@ import shutil import sys import time from tabulate import tabulate from checksum import Checksum from config import Config from exceptions import IllegalCharacterException from file_grouper import FileGrouper from db_connector import DbConnector from mailer import Mailer Loading Loading @@ -47,6 +50,7 @@ class StorePreprocessor(TaskExecutor): self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() redisLogHandler.setFormatter(logFormatter) self.resDir = params["res_dir"] self.logger.addHandler(redisLogHandler) params = config.loadSection("file_catalog") self.dbConn = DbConnector(params["user"], Loading @@ -64,6 +68,7 @@ class StorePreprocessor(TaskExecutor): self.username = None self.userId = None self.nodeList = [] self.invalidFileAndDirNames = [] super(StorePreprocessor, self).__init__() def prepare(self, username): Loading Loading @@ -104,6 +109,24 @@ class StorePreprocessor(TaskExecutor): [ dirs, files ] = self.systemUtils.scan(self.path) timestamp = datetime.datetime.now().strftime("%Y_%m_%d-%H_%M_%S") # Third scan to check illegal characters in file/dir names (if any) self.logger.info(f"Checking for invalid file/dir names in '{self.path}'") self.invalidFileAndDirNames = self.systemUtils.findInvalidFileAndDirNames(self.path) if self.invalidFileAndDirNames: self.logger.warning("Found invalid file/dir names") reportFile = os.path.join(self.resDir, "vos_data_report-" + self.jobId) try: rfp = open(reportFile, "w") except IOError: self.logger.exception("Unable to generate the 'vos_data_report'.") else: rfp.write(tabulate(self.invalidFileAndDirNames, headers = [ "Path list of invalid file/dir names" ], tablefmt = "simple")) rfp.close() self.cleanup() raise IllegalCharacterException # Check if /home/user/store contains files or dirs if files or dirs: destPath = self.path + '/' + timestamp + "-vos_wrapper" Loading @@ -118,14 +141,10 @@ class StorePreprocessor(TaskExecutor): # /home/user/store is empty (this should be handled by data_rpc_server.py) else: self.logger.error("FATAL: the 'store' directory is empty.") userInfo = self.systemUtils.userInfo(self.username) uid = userInfo[1] gid = userInfo[2] os.chown(self.path, uid, gid) os.chmod(self.path, 0o755) self.cleanup() return False # Third scan after directory structure 'check & repair' # Fourth (and last) recursive scan self.logger.info(f"Recursive scan of '{self.path}'") [ dirs, files ] = self.systemUtils.scanRecursive(self.path) Loading Loading @@ -208,7 +227,7 @@ class StorePreprocessor(TaskExecutor): self.logger.exception("FATAL: unable to update the file catalog.") return False self.logger.info("Overall data size calculation") self.jobObj.jobInfo["dataSize"] = self.systemUtils.getSize(self.path) self.jobObj.jobInfo["dataSize"] = self.systemUtils.getSize(self.path) - os.stat(self.path).st_size except Exception: self.logger.exception("FATAL: something went wrong during the preprocessing phase.") return False Loading Loading @@ -278,6 +297,25 @@ class StorePreprocessor(TaskExecutor): finally: self.jobObj.jobInfo["nodeList"] = self.nodeList.copy() self.nodeList.clear() self.invalidFileAndDirNames.clear() def cleanup(self): try: self.logger.info(f"Restoring user permissions for '{self.path}'...") userInfo = self.systemUtils.userInfo(self.username) uid = userInfo[1] gid = userInfo[2] os.chown(self.path, uid, gid) os.chmod(self.path, 0o755) for root, dirs, files in os.walk(self.path): for d in dirs: os.chown(os.path.join(root, d), uid, gid) os.chmod(os.path.join(root, d), 0o755) for f in files: os.chown(os.path.join(root, f), uid, gid) os.chmod(os.path.join(root, f), 0o755) except Exception: self.logger.exception(f"Unable to restore user permissions for {self.path}.") def run(self): self.logger.info("Starting store preprocessor...") Loading Loading
transfer_service/exceptions.py +2 −2 Original line number Diff line number Diff line Loading @@ -29,8 +29,8 @@ class MultipleUsersException(Error): # SystemUtils exceptions class IllegalCharacterException(Error): def __init__(self, name): self.message = "Illegal character found in: " + name def __init__(self): self.message = "Illegal characters found in file/dir names." super(IllegalCharacterException, self).__init__(self.message) Loading
transfer_service/import_executor.py +19 −0 Original line number Diff line number Diff line Loading @@ -13,6 +13,7 @@ import re from config import Config from checksum import Checksum from db_connector import DbConnector from exceptions import IllegalCharacterException from mailer import Mailer from node import Node from redis_log_handler import RedisLogHandler Loading Loading @@ -64,6 +65,7 @@ class ImportExecutor(TaskExecutor): self.storageId = None self.storageType = None self.nodeList = [] self.invalidFileAndDirNames = [] super(ImportExecutor, self).__init__() def execute(self): Loading @@ -84,6 +86,22 @@ class ImportExecutor(TaskExecutor): self.tapeClient.recallChecksumFiles(self.path) self.tapeClient.disconnect() self.logger.info(f"Checking for invalid file/dir names in '{self.path}'") self.invalidFileAndDirNames = self.systemUtils.findInvalidFileAndDirNames(self.path) if self.invalidFileAndDirNames: self.logger.warning("Found invalid file/dir names") reportFile = os.path.join(self.resDir, "vos_import_report-" + self.jobId) try: rfp = open(reportFile, "w") except IOError: self.logger.exception("Unable to generate the 'vos_import_report'.") else: rfp.write(tabulate(self.invalidFileAndDirNames, headers = [ "Path list of invalid file/dir names" ], tablefmt = "simple")) rfp.close() raise IllegalCharacterException self.logger.info(f"Recursive scan of '{os.path.dirname(self.path)}'") [ dirs, files ] = self.systemUtils.scanRecursive(os.path.dirname(self.path)) Loading Loading @@ -264,6 +282,7 @@ class ImportExecutor(TaskExecutor): self.logger.exception(f"FATAL: unable to update the database, job ID: {self.jobId}") finally: self.nodeList.clear() self.invalidFileAndDirNames.clear() def run(self): Loading
transfer_service/store_preprocessor.py +45 −7 Original line number Diff line number Diff line Loading @@ -14,8 +14,11 @@ import shutil import sys import time from tabulate import tabulate from checksum import Checksum from config import Config from exceptions import IllegalCharacterException from file_grouper import FileGrouper from db_connector import DbConnector from mailer import Mailer Loading Loading @@ -47,6 +50,7 @@ class StorePreprocessor(TaskExecutor): self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() redisLogHandler.setFormatter(logFormatter) self.resDir = params["res_dir"] self.logger.addHandler(redisLogHandler) params = config.loadSection("file_catalog") self.dbConn = DbConnector(params["user"], Loading @@ -64,6 +68,7 @@ class StorePreprocessor(TaskExecutor): self.username = None self.userId = None self.nodeList = [] self.invalidFileAndDirNames = [] super(StorePreprocessor, self).__init__() def prepare(self, username): Loading Loading @@ -104,6 +109,24 @@ class StorePreprocessor(TaskExecutor): [ dirs, files ] = self.systemUtils.scan(self.path) timestamp = datetime.datetime.now().strftime("%Y_%m_%d-%H_%M_%S") # Third scan to check illegal characters in file/dir names (if any) self.logger.info(f"Checking for invalid file/dir names in '{self.path}'") self.invalidFileAndDirNames = self.systemUtils.findInvalidFileAndDirNames(self.path) if self.invalidFileAndDirNames: self.logger.warning("Found invalid file/dir names") reportFile = os.path.join(self.resDir, "vos_data_report-" + self.jobId) try: rfp = open(reportFile, "w") except IOError: self.logger.exception("Unable to generate the 'vos_data_report'.") else: rfp.write(tabulate(self.invalidFileAndDirNames, headers = [ "Path list of invalid file/dir names" ], tablefmt = "simple")) rfp.close() self.cleanup() raise IllegalCharacterException # Check if /home/user/store contains files or dirs if files or dirs: destPath = self.path + '/' + timestamp + "-vos_wrapper" Loading @@ -118,14 +141,10 @@ class StorePreprocessor(TaskExecutor): # /home/user/store is empty (this should be handled by data_rpc_server.py) else: self.logger.error("FATAL: the 'store' directory is empty.") userInfo = self.systemUtils.userInfo(self.username) uid = userInfo[1] gid = userInfo[2] os.chown(self.path, uid, gid) os.chmod(self.path, 0o755) self.cleanup() return False # Third scan after directory structure 'check & repair' # Fourth (and last) recursive scan self.logger.info(f"Recursive scan of '{self.path}'") [ dirs, files ] = self.systemUtils.scanRecursive(self.path) Loading Loading @@ -208,7 +227,7 @@ class StorePreprocessor(TaskExecutor): self.logger.exception("FATAL: unable to update the file catalog.") return False self.logger.info("Overall data size calculation") self.jobObj.jobInfo["dataSize"] = self.systemUtils.getSize(self.path) self.jobObj.jobInfo["dataSize"] = self.systemUtils.getSize(self.path) - os.stat(self.path).st_size except Exception: self.logger.exception("FATAL: something went wrong during the preprocessing phase.") return False Loading Loading @@ -278,6 +297,25 @@ class StorePreprocessor(TaskExecutor): finally: self.jobObj.jobInfo["nodeList"] = self.nodeList.copy() self.nodeList.clear() self.invalidFileAndDirNames.clear() def cleanup(self): try: self.logger.info(f"Restoring user permissions for '{self.path}'...") userInfo = self.systemUtils.userInfo(self.username) uid = userInfo[1] gid = userInfo[2] os.chown(self.path, uid, gid) os.chmod(self.path, 0o755) for root, dirs, files in os.walk(self.path): for d in dirs: os.chown(os.path.join(root, d), uid, gid) os.chmod(os.path.join(root, d), 0o755) for f in files: os.chown(os.path.join(root, f), uid, gid) os.chmod(os.path.join(root, f), 0o755) except Exception: self.logger.exception(f"Unable to restore user permissions for {self.path}.") def run(self): self.logger.info("Starting store preprocessor...") Loading