Loading client/vos_import +2 −2 Original line number Diff line number Diff line Loading @@ -23,8 +23,8 @@ class VOSImport(AMQPClient): importResponse = self.call(importRequest) if "responseType" not in importResponse: sys.exit("FATAL: Malformed response, storage acknowledge expected.\n") elif importResponse["responseType"] == "IMPORT_DONE": print("\nImport procedure completed!\n") elif importResponse["responseType"] == "IMPORT_STARTED": print("\nImport procedure started. You'll receive an email at the end of the operation.\n") elif importResponse["responseType"] == "ERROR": errorCode = importResponse["errorCode"] errorMsg = importResponse["errorMsg"] Loading transfer_service/import_amqp_server.py +26 −7 Original line number Diff line number Diff line Loading @@ -7,6 +7,7 @@ from amqp_server import AMQPServer from config import Config from checksum import Checksum from db_connector import DbConnector from mailer import Mailer from node import Node from system_utils import SystemUtils from tape_client import TapeClient Loading Loading @@ -85,10 +86,20 @@ class ImportAMQPServer(AMQPServer): "errorMsg": "Invalid path, directory must be located in " + pathPrefix + '/' + username } return response else: p = Process(target = self.load, args = (self.tapeClient, self.dbConn, self.md5calc, self.systemUtils, path, pathPrefix, storageType, storageId, userId,), daemon = True) p = Process(target = self.load, args = (self.tapeClient, self.dbConn, self.md5calc, self.systemUtils, path, pathPrefix, storageType, storageId, userId,), daemon = True) p.start() # add a counter to track the number of nodes (files and dirs) + log file response = { "responseType": "IMPORT_DONE" } response = { "responseType": "IMPORT_STARTED" } else: response = { "responseType": "ERROR", "errorCode": 8, Loading @@ -96,11 +107,11 @@ class ImportAMQPServer(AMQPServer): return response def run(self): print(f"Starting AMQP server of type {self.type}...") super(ImportAMQPServer, self).run() def load(self, tapeClient, dbConn, md5calc, systemUtils, path, pathPrefix, storageType, storageId, userId): """ This method performs an import and is executed from a separate process in order to allow the 'execute_callback' to return quickly. """ out = open("import_amqp_server_log.txt", "a") if storageType == "cold": tapeClient.connect() Loading Loading @@ -176,3 +187,11 @@ class ImportAMQPServer(AMQPServer): dbConn.setAsyncTrans(vospacePath, True) dbConn.setSticky(vospacePath, True) m = Mailer() m.setMessage("Nodes successfully imported.") m.addReceiver("cristiano.urban@inaf.it") m.send() def run(self): print(f"Starting AMQP server of type {self.type}...") super(ImportAMQPServer, self).run() No newline at end of file Loading
client/vos_import +2 −2 Original line number Diff line number Diff line Loading @@ -23,8 +23,8 @@ class VOSImport(AMQPClient): importResponse = self.call(importRequest) if "responseType" not in importResponse: sys.exit("FATAL: Malformed response, storage acknowledge expected.\n") elif importResponse["responseType"] == "IMPORT_DONE": print("\nImport procedure completed!\n") elif importResponse["responseType"] == "IMPORT_STARTED": print("\nImport procedure started. You'll receive an email at the end of the operation.\n") elif importResponse["responseType"] == "ERROR": errorCode = importResponse["errorCode"] errorMsg = importResponse["errorMsg"] Loading
transfer_service/import_amqp_server.py +26 −7 Original line number Diff line number Diff line Loading @@ -7,6 +7,7 @@ from amqp_server import AMQPServer from config import Config from checksum import Checksum from db_connector import DbConnector from mailer import Mailer from node import Node from system_utils import SystemUtils from tape_client import TapeClient Loading Loading @@ -85,10 +86,20 @@ class ImportAMQPServer(AMQPServer): "errorMsg": "Invalid path, directory must be located in " + pathPrefix + '/' + username } return response else: p = Process(target = self.load, args = (self.tapeClient, self.dbConn, self.md5calc, self.systemUtils, path, pathPrefix, storageType, storageId, userId,), daemon = True) p = Process(target = self.load, args = (self.tapeClient, self.dbConn, self.md5calc, self.systemUtils, path, pathPrefix, storageType, storageId, userId,), daemon = True) p.start() # add a counter to track the number of nodes (files and dirs) + log file response = { "responseType": "IMPORT_DONE" } response = { "responseType": "IMPORT_STARTED" } else: response = { "responseType": "ERROR", "errorCode": 8, Loading @@ -96,11 +107,11 @@ class ImportAMQPServer(AMQPServer): return response def run(self): print(f"Starting AMQP server of type {self.type}...") super(ImportAMQPServer, self).run() def load(self, tapeClient, dbConn, md5calc, systemUtils, path, pathPrefix, storageType, storageId, userId): """ This method performs an import and is executed from a separate process in order to allow the 'execute_callback' to return quickly. """ out = open("import_amqp_server_log.txt", "a") if storageType == "cold": tapeClient.connect() Loading Loading @@ -176,3 +187,11 @@ class ImportAMQPServer(AMQPServer): dbConn.setAsyncTrans(vospacePath, True) dbConn.setSticky(vospacePath, True) m = Mailer() m.setMessage("Nodes successfully imported.") m.addReceiver("cristiano.urban@inaf.it") m.send() def run(self): print(f"Starting AMQP server of type {self.type}...") super(ImportAMQPServer, self).run() No newline at end of file