Loading transfer_service/import_amqp_server.py +32 −11 Original line number Diff line number Diff line Loading @@ -6,6 +6,7 @@ import re from amqp_server import AMQPServer from config import Config from checksum import Checksum from datetime import datetime as dt from db_connector import DbConnector from mailer import Mailer from node import Node Loading Loading @@ -113,9 +114,11 @@ class ImportAMQPServer(AMQPServer): This method performs an import and is executed from a separate process in order to allow the 'execute_callback' to return quickly. """ start = dt.now() nodeList = [] nodes = open("node_list.txt", "w") timestamp = dt.now().strftime("%Y_%m_%d-%H_%M_%S") nodeListFile = "vos_import_report-" + timestamp nlfp = open(nodeListFile, "w") out = open("import_amqp_server_log.txt", "a") if storageType == "cold": Loading Loading @@ -157,9 +160,11 @@ class ImportAMQPServer(AMQPServer): dbConn.insertNode(cnode) dbConn.setAsyncTrans(vospacePath, True) dbConn.setSticky(vospacePath, True) nodeList.append([ dir, nodeName, "DONE" ]) now = dt.now() nodeList.append([ now, dir, vospacePath, "DONE" ]) else: nodeList.append([ dir, nodeName, "SKIP" ]) now = dt.now() nodeList.append([ now, dir, vospacePath, "SKIP" ]) for flist in files: for file in flist: Loading Loading @@ -194,18 +199,34 @@ class ImportAMQPServer(AMQPServer): dbConn.insertNode(dnode) dbConn.setAsyncTrans(vospacePath, True) dbConn.setSticky(vospacePath, True) nodeList.append([ file, nodeName, "DONE" ]) now = dt.now() nodeList.append([ now, file, vospacePath, "DONE" ]) else: nodeList.append([ file, nodeName, "SKIP" ]) now = dt.now() nodeList.append([ now, file, vospacePath, "SKIP" ]) nodes.write(tabulate(nodeList)) nodes.close() nlfp.write(tabulate(nodeList, headers = [ "Timestamp", "OS path", "VOSpace path", "Result"], tablefmt = "simple")) nlfp.close() end = dt.now() m = Mailer() m.addRecipient("cristiano.urban@inaf.it") m.setMessageWithAttachment("import", "Import procedure completed", "node_list.txt") msg = f""" [VOSpace import procedure summary] Storage type: {storageType} Storage ID: {storageId} Creator ID: {userId} Start time: {start} End time: {end} """ m.setMessageWithAttachment("VOSpace import notification", msg, nodeListFile) m.send() os.remove(nodeListFile) def run(self): print(f"Starting AMQP server of type {self.type}...") super(ImportAMQPServer, self).run() Loading
transfer_service/import_amqp_server.py +32 −11 Original line number Diff line number Diff line Loading @@ -6,6 +6,7 @@ import re from amqp_server import AMQPServer from config import Config from checksum import Checksum from datetime import datetime as dt from db_connector import DbConnector from mailer import Mailer from node import Node Loading Loading @@ -113,9 +114,11 @@ class ImportAMQPServer(AMQPServer): This method performs an import and is executed from a separate process in order to allow the 'execute_callback' to return quickly. """ start = dt.now() nodeList = [] nodes = open("node_list.txt", "w") timestamp = dt.now().strftime("%Y_%m_%d-%H_%M_%S") nodeListFile = "vos_import_report-" + timestamp nlfp = open(nodeListFile, "w") out = open("import_amqp_server_log.txt", "a") if storageType == "cold": Loading Loading @@ -157,9 +160,11 @@ class ImportAMQPServer(AMQPServer): dbConn.insertNode(cnode) dbConn.setAsyncTrans(vospacePath, True) dbConn.setSticky(vospacePath, True) nodeList.append([ dir, nodeName, "DONE" ]) now = dt.now() nodeList.append([ now, dir, vospacePath, "DONE" ]) else: nodeList.append([ dir, nodeName, "SKIP" ]) now = dt.now() nodeList.append([ now, dir, vospacePath, "SKIP" ]) for flist in files: for file in flist: Loading Loading @@ -194,18 +199,34 @@ class ImportAMQPServer(AMQPServer): dbConn.insertNode(dnode) dbConn.setAsyncTrans(vospacePath, True) dbConn.setSticky(vospacePath, True) nodeList.append([ file, nodeName, "DONE" ]) now = dt.now() nodeList.append([ now, file, vospacePath, "DONE" ]) else: nodeList.append([ file, nodeName, "SKIP" ]) now = dt.now() nodeList.append([ now, file, vospacePath, "SKIP" ]) nodes.write(tabulate(nodeList)) nodes.close() nlfp.write(tabulate(nodeList, headers = [ "Timestamp", "OS path", "VOSpace path", "Result"], tablefmt = "simple")) nlfp.close() end = dt.now() m = Mailer() m.addRecipient("cristiano.urban@inaf.it") m.setMessageWithAttachment("import", "Import procedure completed", "node_list.txt") msg = f""" [VOSpace import procedure summary] Storage type: {storageType} Storage ID: {storageId} Creator ID: {userId} Start time: {start} End time: {end} """ m.setMessageWithAttachment("VOSpace import notification", msg, nodeListFile) m.send() os.remove(nodeListFile) def run(self): print(f"Starting AMQP server of type {self.type}...") super(ImportAMQPServer, self).run()