Commit 35e390d0 authored by Cristiano Urban's avatar Cristiano Urban
Browse files

Added daemonized process to handle long import procedures.

parent 5c6dfda0
Loading
Loading
Loading
Loading
+81 −75
Original line number Diff line number Diff line
@@ -11,6 +11,7 @@ from node import Node
from system_utils import SystemUtils
from tape_client import TapeClient

from multiprocessing import Process

class ImportAMQPServer(AMQPServer):

@@ -43,7 +44,7 @@ class ImportAMQPServer(AMQPServer):
            username = requestBody["userName"]
            userInDb = self.dbConn.userExists(username)
            userInfo = self.systemUtils.userInfo(username)
            out = open("import_amqp_server_log.txt", "a")
            #out = open("import_amqp_server_log.txt", "a")
            
            if not userInfo or not userInDb:
                response = { "responseType": "ERROR",
@@ -84,12 +85,29 @@ class ImportAMQPServer(AMQPServer):
                             "errorMsg": "Invalid path, directory must be located in " + pathPrefix + '/' + username  }
                return response            
            else:
                p = Process(target = self.load, args = (self.tapeClient, self.dbConn, self.md5calc, self.systemUtils, path, pathPrefix, storageType, storageId, userId,), daemon = True)
                p.start()
            # add a counter to track the number of nodes (files and dirs) + log file
            response = { "responseType": "IMPORT_DONE" }
        else:
            response = { "responseType": "ERROR",
                         "errorCode": 8,
                         "errorMsg": "Unkown request type." }

        return response

    def run(self):
        print(f"Starting AMQP server of type {self.type}...")
        super(ImportAMQPServer, self).run()
        
    def load(self, tapeClient, dbConn, md5calc, systemUtils, path, pathPrefix, storageType, storageId, userId):
        out = open("import_amqp_server_log.txt", "a")
        if storageType == "cold":
                    self.tapeClient.connect()
                    self.tapeClient.recallChecksumFiles(path)
                    self.tapeClient.disconnect()
                tapeClient.connect()
                tapeClient.recallChecksumFiles(path)
                tapeClient.disconnect()
                
                [ dirs, files ] = self.systemUtils.scanRecursive(os.path.dirname(path))
        [ dirs, files ] = systemUtils.scanRecursive(os.path.dirname(path))
                
        tstampWrapperDirPattern = re.compile("/[0-9]{4}_[0-9]{2}_[0-9]{2}-[0-9]{2}_[0-9]{2}_[0-9]{2}-vos_wrapper")
        for dir in dirs:            
@@ -114,19 +132,19 @@ class ImportAMQPServer(AMQPServer):
                        vospacePath = parentPath + '/' + nodeName
                        
                    cnode.setParentPath(parentPath)
                            locationId = self.dbConn.getLocationId(storageId)
                    locationId = dbConn.getLocationId(storageId)
                    cnode.setLocationId(locationId)
                    cnode.setOwnerID(userId)
                    cnode.setCreatorID(userId)
                    cnode.setContentLength(0)
                            if not self.dbConn.nodeExists(cnode):
                                self.dbConn.insertNode(cnode)
                                self.dbConn.setAsyncTrans(vospacePath, True)
                                self.dbConn.setSticky(vospacePath, True)
                    if not dbConn.nodeExists(cnode):
                        dbConn.insertNode(cnode)
                        dbConn.setAsyncTrans(vospacePath, True)
                        dbConn.setSticky(vospacePath, True)

        for flist in files:
            for file in flist:                
                        if self.md5calc.fileIsValid(file) and path in os.path.dirname(file):
                if md5calc.fileIsValid(file) and path in os.path.dirname(file):
                    out.write(f"FILE files: {files}\n")
                    out.write(f"FILE flist: {flist}\n")
                    out.write(f"FILE file: {file}\n")
@@ -145,28 +163,16 @@ class ImportAMQPServer(AMQPServer):
                    vospacePath = parentPath + '/' + nodeName
                    out.write(f"FILE vospacePath: {vospacePath}\n")
                    dnode.setParentPath(parentPath)
                            storageId = self.dbConn.getStorageId(pathPrefix)
                            locationId = self.dbConn.getLocationId(storageId)
                    storageId = dbConn.getStorageId(pathPrefix)
                    locationId = dbConn.getLocationId(storageId)
                    dnode.setLocationId(locationId)
                    dnode.setOwnerID(userId)
                    dnode.setCreatorID(userId)
                    dnode.setContentLength(os.path.getsize(file))
                            dnode.setContentMD5(self.md5calc.getMD5(file))
                            
                            if not self.dbConn.nodeExists(dnode):
                                self.dbConn.insertNode(dnode)
                                self.dbConn.setAsyncTrans(vospacePath, True)
                                self.dbConn.setSticky(vospacePath, True)

            # add a counter to track the number of nodes (files and dirs) + log file
            response = { "responseType": "IMPORT_DONE" }
        else:
            response = { "responseType": "ERROR",
                         "errorCode": 8,
                         "errorMsg": "Unkown request type." }
                    dnode.setContentMD5(md5calc.getMD5(file))
                            
        return response
                    if not dbConn.nodeExists(dnode):
                        dbConn.insertNode(dnode)
                        dbConn.setAsyncTrans(vospacePath, True)
                        dbConn.setSticky(vospacePath, True)
    def run(self):
        print(f"Starting AMQP server of type {self.type}...")
        super(ImportAMQPServer, self).run()