Loading transfer_service/import_executor.py +29 −13 Original line number Diff line number Diff line #!/usr/bin/env python import logging import os import re Loading @@ -20,19 +21,32 @@ class ImportExecutor(TaskExecutor): def __init__(self): self.md5calc = Checksum() config = Config("/etc/vos_ts/vos_ts.conf") self.params = config.loadSection("file_catalog") self.dbConn = DbConnector(self.params["user"], self.params["password"], self.params["host"], self.params.getint("port"), self.params["db"], params = config.loadSection("file_catalog") self.dbConn = DbConnector(params["user"], params["password"], params["host"], params.getint("port"), params["db"], 1, 1) self.params = config.loadSection("spectrum_archive") self.tapeClient = TapeClient(self.params["host"], self.params.getint("port"), self.params["user"], self.params["pkey_file_path"]) params = config.loadSection("spectrum_archive") self.tapeClient = TapeClient(params["host"], params.getint("port"), params["user"], params["pkey_file_path"]) params = config.loadSection("logging") self.logger = logging.getLogger("import_executor") logLevel = "logging." + params["log_level"] logDir = params["log_dir"] logFile = logDir + '/' + "import_executor.log" self.logger.setLevel(eval(logLevel)) logFileHandler = logging.FileHandler(logFile) logStreamHandler = logging.StreamHandler() logFormatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") logFileHandler.setFormatter(logFormatter) logStreamHandler.setFormatter(logFormatter) self.logger.addHandler(logFileHandler) self.logger.addHandler(logStreamHandler) self.systemUtils = SystemUtils() self.jobObj = None self.jobId = None Loading Loading @@ -181,7 +195,8 @@ class ImportExecutor(TaskExecutor): def run(self): print("Starting import executor...") #print("Starting import executor...") self.logger.info("Starting import executor...") self.setSourceQueueName("import_ready") self.setDestinationQueueName("import_terminated") while True: Loading @@ -199,4 +214,5 @@ class ImportExecutor(TaskExecutor): self.destQueue.extractJob() self.destQueue.insertJob(self.jobObj) self.srcQueue.extractJob() print(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}") self.logger.info(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}") #print(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}") transfer_service/mailer.py +6 −15 Original line number Diff line number Diff line Loading @@ -7,6 +7,7 @@ import smtplib from config import Config from email.message import EmailMessage from email.policy import SMTP from smtplib import SMTPConnectError from smtplib import SMTPException Loading @@ -18,16 +19,7 @@ class Mailer(object): self.smtpServer = params["smtp_server"] self.smtpPort = params.getint("smtp_port") self.sender = params["no_reply_email"] params = config.loadSection("logging") self.logger = logging.getLogger("Mailer") logLevel = "logging." + params["log_level"] logDir = params["log_dir"] logFile = logDir + '/' + "mailer.log" self.logger.setLevel(eval(logLevel)) logFileHandler = logging.FileHandler(logFile) logFormatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") logFileHandler.setFormatter(logFormatter) self.logger.addHandler(logFileHandler) self.logger = logging.getLogger("import_executor.mailer") self.recipients = [] self.message = None Loading Loading @@ -66,14 +58,13 @@ class Mailer(object): try: smtpObj = smtplib.SMTP(self.smtpServer, self.smtpPort) smtpObj.send_message(self.message) print("Message sent!") self.logger.debug("E-mail message sent successfully!") except OSError: self.logger.exception("Error: cannot connect to SMTP server!") except SMTPConnectError: self.logger.exception("Cannot connect to SMTP server.") except TimeoutError: self.logger.exception("Error: connection timeout!") self.logger.exception("Connection timeout.") except SMTPException: self.logger.exception("Error: cannot send email message!") self.logger.exception("Cannot send email message.") # Test 1 Loading Loading
transfer_service/import_executor.py +29 −13 Original line number Diff line number Diff line #!/usr/bin/env python import logging import os import re Loading @@ -20,19 +21,32 @@ class ImportExecutor(TaskExecutor): def __init__(self): self.md5calc = Checksum() config = Config("/etc/vos_ts/vos_ts.conf") self.params = config.loadSection("file_catalog") self.dbConn = DbConnector(self.params["user"], self.params["password"], self.params["host"], self.params.getint("port"), self.params["db"], params = config.loadSection("file_catalog") self.dbConn = DbConnector(params["user"], params["password"], params["host"], params.getint("port"), params["db"], 1, 1) self.params = config.loadSection("spectrum_archive") self.tapeClient = TapeClient(self.params["host"], self.params.getint("port"), self.params["user"], self.params["pkey_file_path"]) params = config.loadSection("spectrum_archive") self.tapeClient = TapeClient(params["host"], params.getint("port"), params["user"], params["pkey_file_path"]) params = config.loadSection("logging") self.logger = logging.getLogger("import_executor") logLevel = "logging." + params["log_level"] logDir = params["log_dir"] logFile = logDir + '/' + "import_executor.log" self.logger.setLevel(eval(logLevel)) logFileHandler = logging.FileHandler(logFile) logStreamHandler = logging.StreamHandler() logFormatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") logFileHandler.setFormatter(logFormatter) logStreamHandler.setFormatter(logFormatter) self.logger.addHandler(logFileHandler) self.logger.addHandler(logStreamHandler) self.systemUtils = SystemUtils() self.jobObj = None self.jobId = None Loading Loading @@ -181,7 +195,8 @@ class ImportExecutor(TaskExecutor): def run(self): print("Starting import executor...") #print("Starting import executor...") self.logger.info("Starting import executor...") self.setSourceQueueName("import_ready") self.setDestinationQueueName("import_terminated") while True: Loading @@ -199,4 +214,5 @@ class ImportExecutor(TaskExecutor): self.destQueue.extractJob() self.destQueue.insertJob(self.jobObj) self.srcQueue.extractJob() print(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}") self.logger.info(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}") #print(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}")
transfer_service/mailer.py +6 −15 Original line number Diff line number Diff line Loading @@ -7,6 +7,7 @@ import smtplib from config import Config from email.message import EmailMessage from email.policy import SMTP from smtplib import SMTPConnectError from smtplib import SMTPException Loading @@ -18,16 +19,7 @@ class Mailer(object): self.smtpServer = params["smtp_server"] self.smtpPort = params.getint("smtp_port") self.sender = params["no_reply_email"] params = config.loadSection("logging") self.logger = logging.getLogger("Mailer") logLevel = "logging." + params["log_level"] logDir = params["log_dir"] logFile = logDir + '/' + "mailer.log" self.logger.setLevel(eval(logLevel)) logFileHandler = logging.FileHandler(logFile) logFormatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") logFileHandler.setFormatter(logFormatter) self.logger.addHandler(logFileHandler) self.logger = logging.getLogger("import_executor.mailer") self.recipients = [] self.message = None Loading Loading @@ -66,14 +58,13 @@ class Mailer(object): try: smtpObj = smtplib.SMTP(self.smtpServer, self.smtpPort) smtpObj.send_message(self.message) print("Message sent!") self.logger.debug("E-mail message sent successfully!") except OSError: self.logger.exception("Error: cannot connect to SMTP server!") except SMTPConnectError: self.logger.exception("Cannot connect to SMTP server.") except TimeoutError: self.logger.exception("Error: connection timeout!") self.logger.exception("Connection timeout.") except SMTPException: self.logger.exception("Error: cannot send email message!") self.logger.exception("Cannot send email message.") # Test 1 Loading