Loading transfer_service/import_executor.py +9 −7 Original line number Diff line number Diff line Loading @@ -70,7 +70,8 @@ class ImportExecutor(TaskExecutor): start = dt.now() nodeList = [] timestamp = dt.now().strftime("%Y_%m_%d-%H_%M_%S") nodeListFile = self.resDir + '/' + "vos_import_report-" + timestamp nodeListFile = os.path.join(self.resDir, "vos_import_report-" + timestamp) #nodeListFile = self.resDir + '/' + "vos_import_report-" + timestamp nlfp = open(nodeListFile, "w") #out = open("import_amqp_server_log.txt", "a") Loading Loading @@ -112,13 +113,13 @@ class ImportExecutor(TaskExecutor): cnode.setSticky(True) if os.path.islink(dir): now = dt.now() now = dt.now().isoformat() nodeList.append([ now, dir, vospacePath, "container", "SYMLINK" ]) elif self.dbConn.insertNode(cnode): now = dt.now() now = dt.now().isoformat() nodeList.append([ now, dir, vospacePath, "container", "DONE" ]) else: now = dt.now() now = dt.now().isoformat() nodeList.append([ now, dir, vospacePath, "container", "SKIPPED" ]) for flist in files: Loading Loading @@ -152,13 +153,13 @@ class ImportExecutor(TaskExecutor): dnode.setSticky(True) if os.path.islink(file): now = dt.now() now = dt.now().isoformat() nodeList.append([ now, file, vospacePath, "data", "SYMLINK" ]) elif self.dbConn.insertNode(dnode): now = dt.now() now = dt.now().isoformat() nodeList.append([ now, file, vospacePath, "data", "DONE" ]) else: now = dt.now() now = dt.now().isoformat() nodeList.append([ now, file, vospacePath, "data", "SKIPPED" ]) #out.close() Loading Loading @@ -189,6 +190,7 @@ class ImportExecutor(TaskExecutor): Processed nodes: {len(nodeList)} Imported nodes: {sum(res[-1] == 'DONE' for res in nodeList)} Skipped nodes: {sum(res[-1] == 'SKIPPED' for res in nodeList)} Symlinks detected: {sum(res[-1] == 'SYMLINK' for res in nodeList)} """ Loading transfer_service/store_executor.py +55 −5 Original line number Diff line number Diff line Loading @@ -7,9 +7,12 @@ import subprocess import sys from config import Config from datetime import datetime as dt from db_connector import DbConnector from mailer import Mailer from redis_log_handler import RedisLogHandler from system_utils import SystemUtils from tabulate import tabulate from tape_client import TapeClient from task_executor import TaskExecutor Loading @@ -36,6 +39,8 @@ class StoreExecutor(TaskExecutor): 1) params = config.loadSection("scheduling") self.maxTerminatedJobs = params.getint("max_terminated_jobs") params = config.loadSection("mail") self.adminEmail = params["admin_email"] params = config.loadSection("logging") self.logger = logging.getLogger(__name__) logLevel = "logging." + params["log_level"] Loading @@ -48,9 +53,12 @@ class StoreExecutor(TaskExecutor): redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) self.logger.addHandler(logStreamHandler) self.resDir = params["res_dir"] self.jobObj = None self.jobId = None self.username = None self.userId = None self.userEmail = None self.requestType = None self.storageId = None self.storageType = None Loading Loading @@ -88,17 +96,58 @@ class StoreExecutor(TaskExecutor): os.chmod(srcPathPrefix, 0o755) def update(self): out = open("store_executor_log.txt", "a") results = [{"target": ""}] self.dbConn.setResults(self.jobId, results) for nodeVOSPath in self.nodeList: out.write(f"nodeListElement: {nodeVOSPath}\n") for el in self.nodeList: nodeVOSPath = el[2] self.dbConn.setAsyncTrans(nodeVOSPath, True) self.dbConn.setJobId(nodeVOSPath, None) timestamp = dt.now().strftime("%Y_%m_%d-%H_%M_%S") nodeListFile = os.path.join(self.resDir, "vos_data_report-" + timestamp) nlfp = open(nodeListFile, "w") nlfp.write(tabulate(self.nodeList, headers = [ "Timestamp", "OS path", "VOSpace path", "Node type", "Result"], tablefmt = "simple")) nlfp.close() self.jobObj.setPhase("COMPLETED") self.dbConn.setPhase(self.jobId, "COMPLETED") self.dbConn.setEndTime(self.jobId) out.close() # Send e-mail notification m = Mailer() m.addRecipient(self.adminEmail) self.userEmail = self.dbConn.getUserEmail(self.userId) if self.userEmail != self.adminEmail: m.addRecipient(self.userEmail) msg = f""" [VOSpace data storage procedure summary] Storage type: {self.storageType} Storage ID: {self.storageId} Creator ID: {self.userId} Processed nodes: {len(self.nodeList)} Stored nodes: {sum(res[-1] == 'DONE' for res in self.nodeList)} Skipped nodes: {sum(res[-1] == 'SKIPPED' for res in self.nodeList)} Symlinks detected: {sum(res[-1] == 'SYMLINK' for res in self.nodeList)} """ if len(self.nodeList) <= 10 ** 5: m.setMessageWithAttachment("VOSpace data storage notification", msg, nodeListFile) else: info = f""" INFO: this operation involved a number of nodes greater than 10^5, you will find the results in {self.resDir}. """ msg += info m.setMessage("VOSpace data storage notification", msg) m.send() self.nodeList.clear() def run(self): self.logger.info("Starting store executor...") Loading @@ -110,10 +159,11 @@ class StoreExecutor(TaskExecutor): self.jobObj = self.srcQueue.getJob() self.jobId = self.jobObj.jobId self.username = self.jobObj.jobInfo["userName"] self.userId = self.jobObj.ownerId self.requestType = self.jobObj.jobInfo["requestType"] self.storageId = self.jobObj.jobInfo["storageId"] self.storageType = self.jobObj.jobInfo["storageType"] self.nodeList = self.jobObj.jobInfo["nodeList"] self.nodeList = self.jobObj.jobInfo["nodeList"].copy() # TODO # 1) Controlla il tipo di destinazione: hot (server) o cold (tape) # *) HOT Loading transfer_service/store_preprocessor.py +20 −7 Original line number Diff line number Diff line Loading @@ -129,7 +129,10 @@ class StorePreprocessor(TaskExecutor): # File catalog update #out = open("store_preprocessor_log.txt", "a") self.userId = self.dbConn.getUserId(self.username) #self.userId = self.dbConn.getUserId(self.username) self.userId = self.jobObj.ownerId #out.write(f"USER: {self.username}\n") #out.write(f"USER_ID: {self.userId}\n") pathPrefix = self.storageStorePath.replace("{username}", self.username) Loading Loading @@ -157,14 +160,19 @@ class StorePreprocessor(TaskExecutor): cnode.setContentLength(0) cnode.setSticky(True) vospacePath = basePath + '/' + nodeName if os.path.islink(dir): # node is a symlink, do not import it... pass now = dt.now().isoformat() self.nodeList.append([ now, dir, vospacePath, "container", "SYMLINK" ]) elif self.dbConn.insertNode(cnode): self.nodeList.append(basePath + '/' + nodeName) now = dt.now().isoformat() self.nodeList.append([ now, dir, vospacePath, "container", "DONE" ]) else: # node already exists, skip it... pass now = dt.now().isoformat() self.nodeList.append([ now, dir, vospacePath, "container", "SKIPPED" ]) #out.write("\n\n") Loading Loading @@ -193,14 +201,19 @@ class StorePreprocessor(TaskExecutor): dnode.setContentMD5(self.md5calc.getMD5(file)) dnode.setSticky(True) vospacePath = basePath + '/' + nodeName if os.path.islink(file): # node is a symlink, do not import it... pass now = dt.now().isoformat() self.nodeList.append([ now, file, vospacePath, "data", "SYMLINK" ]) elif self.dbConn.insertNode(dnode): self.nodeList.append(basePath + '/' + nodeName) now = dt.now().isoformat() self.nodeList.append([ now, file, vospacePath, "data", "DONE" ]) else: # node already exists, skip it... pass now = dt.now().isoformat() self.nodeList.append([ now, file, vospacePath, "data", "SKIPPED" ]) #out.write("\n") #out.close() Loading Loading
transfer_service/import_executor.py +9 −7 Original line number Diff line number Diff line Loading @@ -70,7 +70,8 @@ class ImportExecutor(TaskExecutor): start = dt.now() nodeList = [] timestamp = dt.now().strftime("%Y_%m_%d-%H_%M_%S") nodeListFile = self.resDir + '/' + "vos_import_report-" + timestamp nodeListFile = os.path.join(self.resDir, "vos_import_report-" + timestamp) #nodeListFile = self.resDir + '/' + "vos_import_report-" + timestamp nlfp = open(nodeListFile, "w") #out = open("import_amqp_server_log.txt", "a") Loading Loading @@ -112,13 +113,13 @@ class ImportExecutor(TaskExecutor): cnode.setSticky(True) if os.path.islink(dir): now = dt.now() now = dt.now().isoformat() nodeList.append([ now, dir, vospacePath, "container", "SYMLINK" ]) elif self.dbConn.insertNode(cnode): now = dt.now() now = dt.now().isoformat() nodeList.append([ now, dir, vospacePath, "container", "DONE" ]) else: now = dt.now() now = dt.now().isoformat() nodeList.append([ now, dir, vospacePath, "container", "SKIPPED" ]) for flist in files: Loading Loading @@ -152,13 +153,13 @@ class ImportExecutor(TaskExecutor): dnode.setSticky(True) if os.path.islink(file): now = dt.now() now = dt.now().isoformat() nodeList.append([ now, file, vospacePath, "data", "SYMLINK" ]) elif self.dbConn.insertNode(dnode): now = dt.now() now = dt.now().isoformat() nodeList.append([ now, file, vospacePath, "data", "DONE" ]) else: now = dt.now() now = dt.now().isoformat() nodeList.append([ now, file, vospacePath, "data", "SKIPPED" ]) #out.close() Loading Loading @@ -189,6 +190,7 @@ class ImportExecutor(TaskExecutor): Processed nodes: {len(nodeList)} Imported nodes: {sum(res[-1] == 'DONE' for res in nodeList)} Skipped nodes: {sum(res[-1] == 'SKIPPED' for res in nodeList)} Symlinks detected: {sum(res[-1] == 'SYMLINK' for res in nodeList)} """ Loading
transfer_service/store_executor.py +55 −5 Original line number Diff line number Diff line Loading @@ -7,9 +7,12 @@ import subprocess import sys from config import Config from datetime import datetime as dt from db_connector import DbConnector from mailer import Mailer from redis_log_handler import RedisLogHandler from system_utils import SystemUtils from tabulate import tabulate from tape_client import TapeClient from task_executor import TaskExecutor Loading @@ -36,6 +39,8 @@ class StoreExecutor(TaskExecutor): 1) params = config.loadSection("scheduling") self.maxTerminatedJobs = params.getint("max_terminated_jobs") params = config.loadSection("mail") self.adminEmail = params["admin_email"] params = config.loadSection("logging") self.logger = logging.getLogger(__name__) logLevel = "logging." + params["log_level"] Loading @@ -48,9 +53,12 @@ class StoreExecutor(TaskExecutor): redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) self.logger.addHandler(logStreamHandler) self.resDir = params["res_dir"] self.jobObj = None self.jobId = None self.username = None self.userId = None self.userEmail = None self.requestType = None self.storageId = None self.storageType = None Loading Loading @@ -88,17 +96,58 @@ class StoreExecutor(TaskExecutor): os.chmod(srcPathPrefix, 0o755) def update(self): out = open("store_executor_log.txt", "a") results = [{"target": ""}] self.dbConn.setResults(self.jobId, results) for nodeVOSPath in self.nodeList: out.write(f"nodeListElement: {nodeVOSPath}\n") for el in self.nodeList: nodeVOSPath = el[2] self.dbConn.setAsyncTrans(nodeVOSPath, True) self.dbConn.setJobId(nodeVOSPath, None) timestamp = dt.now().strftime("%Y_%m_%d-%H_%M_%S") nodeListFile = os.path.join(self.resDir, "vos_data_report-" + timestamp) nlfp = open(nodeListFile, "w") nlfp.write(tabulate(self.nodeList, headers = [ "Timestamp", "OS path", "VOSpace path", "Node type", "Result"], tablefmt = "simple")) nlfp.close() self.jobObj.setPhase("COMPLETED") self.dbConn.setPhase(self.jobId, "COMPLETED") self.dbConn.setEndTime(self.jobId) out.close() # Send e-mail notification m = Mailer() m.addRecipient(self.adminEmail) self.userEmail = self.dbConn.getUserEmail(self.userId) if self.userEmail != self.adminEmail: m.addRecipient(self.userEmail) msg = f""" [VOSpace data storage procedure summary] Storage type: {self.storageType} Storage ID: {self.storageId} Creator ID: {self.userId} Processed nodes: {len(self.nodeList)} Stored nodes: {sum(res[-1] == 'DONE' for res in self.nodeList)} Skipped nodes: {sum(res[-1] == 'SKIPPED' for res in self.nodeList)} Symlinks detected: {sum(res[-1] == 'SYMLINK' for res in self.nodeList)} """ if len(self.nodeList) <= 10 ** 5: m.setMessageWithAttachment("VOSpace data storage notification", msg, nodeListFile) else: info = f""" INFO: this operation involved a number of nodes greater than 10^5, you will find the results in {self.resDir}. """ msg += info m.setMessage("VOSpace data storage notification", msg) m.send() self.nodeList.clear() def run(self): self.logger.info("Starting store executor...") Loading @@ -110,10 +159,11 @@ class StoreExecutor(TaskExecutor): self.jobObj = self.srcQueue.getJob() self.jobId = self.jobObj.jobId self.username = self.jobObj.jobInfo["userName"] self.userId = self.jobObj.ownerId self.requestType = self.jobObj.jobInfo["requestType"] self.storageId = self.jobObj.jobInfo["storageId"] self.storageType = self.jobObj.jobInfo["storageType"] self.nodeList = self.jobObj.jobInfo["nodeList"] self.nodeList = self.jobObj.jobInfo["nodeList"].copy() # TODO # 1) Controlla il tipo di destinazione: hot (server) o cold (tape) # *) HOT Loading
transfer_service/store_preprocessor.py +20 −7 Original line number Diff line number Diff line Loading @@ -129,7 +129,10 @@ class StorePreprocessor(TaskExecutor): # File catalog update #out = open("store_preprocessor_log.txt", "a") self.userId = self.dbConn.getUserId(self.username) #self.userId = self.dbConn.getUserId(self.username) self.userId = self.jobObj.ownerId #out.write(f"USER: {self.username}\n") #out.write(f"USER_ID: {self.userId}\n") pathPrefix = self.storageStorePath.replace("{username}", self.username) Loading Loading @@ -157,14 +160,19 @@ class StorePreprocessor(TaskExecutor): cnode.setContentLength(0) cnode.setSticky(True) vospacePath = basePath + '/' + nodeName if os.path.islink(dir): # node is a symlink, do not import it... pass now = dt.now().isoformat() self.nodeList.append([ now, dir, vospacePath, "container", "SYMLINK" ]) elif self.dbConn.insertNode(cnode): self.nodeList.append(basePath + '/' + nodeName) now = dt.now().isoformat() self.nodeList.append([ now, dir, vospacePath, "container", "DONE" ]) else: # node already exists, skip it... pass now = dt.now().isoformat() self.nodeList.append([ now, dir, vospacePath, "container", "SKIPPED" ]) #out.write("\n\n") Loading Loading @@ -193,14 +201,19 @@ class StorePreprocessor(TaskExecutor): dnode.setContentMD5(self.md5calc.getMD5(file)) dnode.setSticky(True) vospacePath = basePath + '/' + nodeName if os.path.islink(file): # node is a symlink, do not import it... pass now = dt.now().isoformat() self.nodeList.append([ now, file, vospacePath, "data", "SYMLINK" ]) elif self.dbConn.insertNode(dnode): self.nodeList.append(basePath + '/' + nodeName) now = dt.now().isoformat() self.nodeList.append([ now, file, vospacePath, "data", "DONE" ]) else: # node already exists, skip it... pass now = dt.now().isoformat() self.nodeList.append([ now, file, vospacePath, "data", "SKIPPED" ]) #out.write("\n") #out.close() Loading