Loading transfer_service/retrieve_executor.py +27 −3 Original line number Diff line number Diff line Loading @@ -27,6 +27,7 @@ import sys from checksum import Checksum from config import Config from db_connector import DbConnector from mailer import Mailer from redis_log_handler import RedisLogHandler from system_utils import SystemUtils from tape_client import TapeClient Loading Loading @@ -59,6 +60,8 @@ class RetrieveExecutor(TaskExecutor): self.maxBlockSize = self.systemUtils.convertSizeToBytes(params["block_size"]) params = config.loadSection("scheduling") self.maxTerminatedJobs = params.getint("max_terminated_jobs") params = config.loadSection("mail") self.adminEmail = params["admin_email"] params = config.loadSection("logging") self.logger = logging.getLogger(__name__) logLevel = "logging." + params["log_level"] Loading Loading @@ -249,9 +252,9 @@ class RetrieveExecutor(TaskExecutor): self.dbConn.updateProcessedBlocks(self.jobId, self.procBlocks) return True def updateJobStatus(self): def update(self): """ Updates the job status. Updates the job status and sends an email to the user. """ results = [{"target": ""}] results[0]["target"] = self.jobObj.jobInfo["transfer"]["target"] Loading @@ -260,6 +263,27 @@ class RetrieveExecutor(TaskExecutor): self.dbConn.setPhase(self.jobId, "COMPLETED") self.dbConn.setEndTime(self.jobId) # Send e-mail notification m = Mailer() m.addRecipient(self.adminEmail) userEmail = self.dbConn.getUserEmail(self.jobObj.ownerId) if userEmail != self.adminEmail: m.addRecipient(userEmail) msg = f""" Dear user, your job has been COMPLETED. Job ID: {self.jobObj.jobId} Owner ID: {self.jobObj.ownerId} Your files are available and can be downloaded. """ m.setMessage("VOSpace data retrieve notification: Job COMPLETED", msg) m.send() def cleanup(self): """ Cleanup method. Loading @@ -285,7 +309,7 @@ class RetrieveExecutor(TaskExecutor): self.buildBlocks() result = self.retrieveData() if result: self.updateJobStatus() self.update() self.cleanup() # debug block... Loading transfer_service/retrieve_preprocessor.py +26 −0 Original line number Diff line number Diff line Loading @@ -6,6 +6,7 @@ import os from config import Config from db_connector import DbConnector from mailer import Mailer from redis_log_handler import RedisLogHandler from task_executor import TaskExecutor Loading @@ -23,6 +24,8 @@ class RetrievePreprocessor(TaskExecutor): params["db"], 1, 1) params = config.loadSection("mail") self.adminEmail = params["admin_email"] params = config.loadSection("logging") self.logger = logging.getLogger(__name__) logLevel = "logging." + params["log_level"] Loading @@ -45,6 +48,28 @@ class RetrievePreprocessor(TaskExecutor): self.nodeList.append(vospacePath.split("!vospace")[1]) self.jobObj.jobInfo["nodeList"] = self.nodeList.copy() def update(self): # Send e-mail notification m = Mailer() m.addRecipient(self.adminEmail) userEmail = self.dbConn.getUserEmail(self.jobObj.ownerId) if userEmail != self.adminEmail: m.addRecipient(userEmail) msg = f""" Dear user, your job has been QUEUED. Job ID: {self.jobObj.jobId} Owner ID: {self.jobObj.ownerId} You will be notified by email once the job is completed. """ m.setMessage("VOSpace data retrieve notification: Job QUEUED", msg) m.send() def cleanup(self): self.nodeList.clear() Loading @@ -57,6 +82,7 @@ class RetrievePreprocessor(TaskExecutor): if self.destQueue.len() < self.maxReadyJobs and self.srcQueue.len() > 0: self.jobObj = self.srcQueue.getJob() self.execute() self.update() self.destQueue.insertJob(self.jobObj) self.srcQueue.extractJob() self.cleanup() Loading transfer_service/store_executor.py +1 −1 Original line number Diff line number Diff line Loading @@ -135,7 +135,7 @@ class StoreExecutor(TaskExecutor): """ if len(self.nodeList) <= 10 ** 5: m.setMessageWithAttachment("VOSpace data storage notification", msg, nodeListFile) m.setMessageWithAttachment("VOSpace data storage notification: job COMPLETED", msg, nodeListFile) else: info = f""" INFO: Loading transfer_service/store_preprocessor.py +27 −1 Original line number Diff line number Diff line Loading @@ -18,6 +18,7 @@ from checksum import Checksum from config import Config from file_grouper import FileGrouper from db_connector import DbConnector from mailer import Mailer from node import Node from redis_log_handler import RedisLogHandler from system_utils import SystemUtils Loading @@ -44,6 +45,8 @@ class StorePreprocessor(TaskExecutor): 1) params = config.loadSection("transfer_node") self.storageStorePath = params["store_path"] params = config.loadSection("mail") self.adminEmail = params["admin_email"] params = config.loadSection("logging") self.logger = logging.getLogger(__name__) logLevel = "logging." + params["log_level"] Loading Loading @@ -222,6 +225,29 @@ class StorePreprocessor(TaskExecutor): self.jobObj.setPhase("QUEUED") self.dbConn.setPhase(self.jobId, "QUEUED") # Send e-mail notification m = Mailer() m.addRecipient(self.adminEmail) userEmail = self.dbConn.getUserEmail(self.jobObj.ownerId) if userEmail != self.adminEmail: m.addRecipient(userEmail) msg = f""" Dear user, your job has been QUEUED. Job ID: {self.jobId} Storage type: {self.storageType} Storage ID: {self.storageId} Owner ID: {self.jobObj.ownerId} You will be notified by email once the job is completed. """ m.setMessage("VOSpace data storage notification: Job QUEUED", msg) m.send() def run(self): self.logger.info("Starting store preprocessor...") self.setSourceQueueName("write_pending") Loading @@ -236,9 +262,9 @@ class StorePreprocessor(TaskExecutor): self.username = self.jobObj.jobInfo["userName"] self.prepare(self.username) self.execute() self.update() self.jobObj.jobInfo["nodeList"] = self.nodeList.copy() self.nodeList.clear() self.update() self.destQueue.insertJob(self.jobObj) self.srcQueue.extractJob() self.logger.info(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}") Loading Loading
transfer_service/retrieve_executor.py +27 −3 Original line number Diff line number Diff line Loading @@ -27,6 +27,7 @@ import sys from checksum import Checksum from config import Config from db_connector import DbConnector from mailer import Mailer from redis_log_handler import RedisLogHandler from system_utils import SystemUtils from tape_client import TapeClient Loading Loading @@ -59,6 +60,8 @@ class RetrieveExecutor(TaskExecutor): self.maxBlockSize = self.systemUtils.convertSizeToBytes(params["block_size"]) params = config.loadSection("scheduling") self.maxTerminatedJobs = params.getint("max_terminated_jobs") params = config.loadSection("mail") self.adminEmail = params["admin_email"] params = config.loadSection("logging") self.logger = logging.getLogger(__name__) logLevel = "logging." + params["log_level"] Loading Loading @@ -249,9 +252,9 @@ class RetrieveExecutor(TaskExecutor): self.dbConn.updateProcessedBlocks(self.jobId, self.procBlocks) return True def updateJobStatus(self): def update(self): """ Updates the job status. Updates the job status and sends an email to the user. """ results = [{"target": ""}] results[0]["target"] = self.jobObj.jobInfo["transfer"]["target"] Loading @@ -260,6 +263,27 @@ class RetrieveExecutor(TaskExecutor): self.dbConn.setPhase(self.jobId, "COMPLETED") self.dbConn.setEndTime(self.jobId) # Send e-mail notification m = Mailer() m.addRecipient(self.adminEmail) userEmail = self.dbConn.getUserEmail(self.jobObj.ownerId) if userEmail != self.adminEmail: m.addRecipient(userEmail) msg = f""" Dear user, your job has been COMPLETED. Job ID: {self.jobObj.jobId} Owner ID: {self.jobObj.ownerId} Your files are available and can be downloaded. """ m.setMessage("VOSpace data retrieve notification: Job COMPLETED", msg) m.send() def cleanup(self): """ Cleanup method. Loading @@ -285,7 +309,7 @@ class RetrieveExecutor(TaskExecutor): self.buildBlocks() result = self.retrieveData() if result: self.updateJobStatus() self.update() self.cleanup() # debug block... Loading
transfer_service/retrieve_preprocessor.py +26 −0 Original line number Diff line number Diff line Loading @@ -6,6 +6,7 @@ import os from config import Config from db_connector import DbConnector from mailer import Mailer from redis_log_handler import RedisLogHandler from task_executor import TaskExecutor Loading @@ -23,6 +24,8 @@ class RetrievePreprocessor(TaskExecutor): params["db"], 1, 1) params = config.loadSection("mail") self.adminEmail = params["admin_email"] params = config.loadSection("logging") self.logger = logging.getLogger(__name__) logLevel = "logging." + params["log_level"] Loading @@ -45,6 +48,28 @@ class RetrievePreprocessor(TaskExecutor): self.nodeList.append(vospacePath.split("!vospace")[1]) self.jobObj.jobInfo["nodeList"] = self.nodeList.copy() def update(self): # Send e-mail notification m = Mailer() m.addRecipient(self.adminEmail) userEmail = self.dbConn.getUserEmail(self.jobObj.ownerId) if userEmail != self.adminEmail: m.addRecipient(userEmail) msg = f""" Dear user, your job has been QUEUED. Job ID: {self.jobObj.jobId} Owner ID: {self.jobObj.ownerId} You will be notified by email once the job is completed. """ m.setMessage("VOSpace data retrieve notification: Job QUEUED", msg) m.send() def cleanup(self): self.nodeList.clear() Loading @@ -57,6 +82,7 @@ class RetrievePreprocessor(TaskExecutor): if self.destQueue.len() < self.maxReadyJobs and self.srcQueue.len() > 0: self.jobObj = self.srcQueue.getJob() self.execute() self.update() self.destQueue.insertJob(self.jobObj) self.srcQueue.extractJob() self.cleanup() Loading
transfer_service/store_executor.py +1 −1 Original line number Diff line number Diff line Loading @@ -135,7 +135,7 @@ class StoreExecutor(TaskExecutor): """ if len(self.nodeList) <= 10 ** 5: m.setMessageWithAttachment("VOSpace data storage notification", msg, nodeListFile) m.setMessageWithAttachment("VOSpace data storage notification: job COMPLETED", msg, nodeListFile) else: info = f""" INFO: Loading
transfer_service/store_preprocessor.py +27 −1 Original line number Diff line number Diff line Loading @@ -18,6 +18,7 @@ from checksum import Checksum from config import Config from file_grouper import FileGrouper from db_connector import DbConnector from mailer import Mailer from node import Node from redis_log_handler import RedisLogHandler from system_utils import SystemUtils Loading @@ -44,6 +45,8 @@ class StorePreprocessor(TaskExecutor): 1) params = config.loadSection("transfer_node") self.storageStorePath = params["store_path"] params = config.loadSection("mail") self.adminEmail = params["admin_email"] params = config.loadSection("logging") self.logger = logging.getLogger(__name__) logLevel = "logging." + params["log_level"] Loading Loading @@ -222,6 +225,29 @@ class StorePreprocessor(TaskExecutor): self.jobObj.setPhase("QUEUED") self.dbConn.setPhase(self.jobId, "QUEUED") # Send e-mail notification m = Mailer() m.addRecipient(self.adminEmail) userEmail = self.dbConn.getUserEmail(self.jobObj.ownerId) if userEmail != self.adminEmail: m.addRecipient(userEmail) msg = f""" Dear user, your job has been QUEUED. Job ID: {self.jobId} Storage type: {self.storageType} Storage ID: {self.storageId} Owner ID: {self.jobObj.ownerId} You will be notified by email once the job is completed. """ m.setMessage("VOSpace data storage notification: Job QUEUED", msg) m.send() def run(self): self.logger.info("Starting store preprocessor...") self.setSourceQueueName("write_pending") Loading @@ -236,9 +262,9 @@ class StorePreprocessor(TaskExecutor): self.username = self.jobObj.jobInfo["userName"] self.prepare(self.username) self.execute() self.update() self.jobObj.jobInfo["nodeList"] = self.nodeList.copy() self.nodeList.clear() self.update() self.destQueue.insertJob(self.jobObj) self.srcQueue.extractJob() self.logger.info(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}") Loading