Commit 8253e468 authored by Cristiano Urban's avatar Cristiano Urban
Browse files

Added ImportAMQPServer class.

parent 0f868c62
Loading
Loading
Loading
Loading
+135 −0
Original line number Diff line number Diff line
import os
import sys
import json
import re

from amqp_server import AMQPServer
from checksum import Checksum
from db_connector import DbConnector
from node import Node
from system_utils import SystemUtils
from config import Config


class ImportAMQPServer(AMQPServer):

    def __init__(self, host, port, queue):
        self.type = "import"
        self.md5calc = Checksum()
        self.storeAck = False
        config = Config("vos_ts.conf")       
        self.params = config.loadSection("file_catalog")
        self.dbConn = DbConnector(self.params["user"], 
                                  self.params["password"], 
                                  self.params["host"], 
                                  self.params.getint("port"), 
                                  self.params["db"])
        self.params = config.loadSection("transfer_node")
        self.storageStorePath = self.params["store_path"]
        self.path = None
        self.username = None
        super(ImportAMQPServer, self).__init__(host, port, queue)

    def execute_callback(self, requestBody):
        # 'requestType' and 'path' attributes are mandatory
        if "requestType" not in requestBody or "path" not in requestBody:
            response = { "errorCode": 1, "errorMsg": "Malformed request, missing parameters." }
        elif requestBody["requestType"] == "NODE_IMPORT":
            self.path = requestBody["path"]
            self.username = requestBody["userName"]
            out = open("import_amqp_server_log.txt", "a")
            self.dbConn.connect()
            #if os.path.isdir(self.path):
            userId = self.dbConn.getRapId(self.username)
            pathPrefix = self.dbConn.storageBasePathIsValid(self.path)
            storageId = self.dbConn.getStorageId(pathPrefix)
            [ dirs, files ] = self.scanRecursive()
            tstampWrapperDirPattern = re.compile("/[0-9]{4}_[0-9]{2}_[0-9]{2}-[0-9]{2}_[0-9]{2}_[0-9]{2}-vos_wrapper")
            for dir in dirs:            
                #pathPrefix = self.dbConn.storageBasePathIsValid(dir)
                out.write(f"DIR dir: {dir}\n")
                out.write(f"DIR pathPrefix: {pathPrefix}\n\n")
                if pathPrefix + '/' + self.username in dir and len(pathPrefix + '/' + self.username) < len(dir):
                    parentPath = os.path.dirname(dir).split(pathPrefix)[1]
                    nodeName = os.path.basename(dir)
                    vospacePath = parentPath + '/' + nodeName
                    cnode = Node(nodeName, "container")
                    if not tstampWrapperDirPattern.match("/" + nodeName):
                        if tstampWrapperDirPattern.search(vospacePath):
                            tstampWrapperDir = tstampWrapperDirPattern.search(vospacePath).group(0).lstrip('/')
                            vospacePath = tstampWrapperDirPattern.sub("", vospacePath)
                            cnode.setWrapperDir(tstampWrapperDir)
                        cnode.setParentPath(parentPath)
                        locationId = self.dbConn.getLocationId(storageId)
                        cnode.setLocationId(locationId)
                        cnode.setOwnerID(userId)
                        cnode.setCreatorID(userId)
                        cnode.setContentLength(0)
                        if not self.dbConn.nodeExists(cnode):
                            self.dbConn.insertNode(cnode)

            for flist in files:
                for file in flist:                
                    if self.md5calc.fileIsValid(file):
                        out.write(f"FILE files: {files}\n")
                        out.write(f"FILE flist: {flist}\n")
                        out.write(f"FILE file: {file}\n")
                        #pathPrefix = self.dbConn.storageBasePathIsValid(file)
                        out.write(f"FILE pathPrefix: {pathPrefix}\n")
                        parentPath = os.path.dirname(file).split(pathPrefix)[1]
                        out.write(f"FILE parentPath: {parentPath}\n")
                        nodeName = os.path.basename(file)
                        out.write(f"FILE nodeName: {nodeName}\n")
                        vospacePath = parentPath + '/' + nodeName                        
                        dnode = Node(nodeName, "data")
                        if tstampWrapperDirPattern.search(vospacePath):
                            tstampWrapperDir = tstampWrapperDirPattern.search(vospacePath).group(0).lstrip('/')
                            vospacePath = tstampWrapperDirPattern.sub("", vospacePath)
                            dnode.setWrapperDir(tstampWrapperDir)
                        out.write(f"FILE vospacePath: {vospacePath}\n")
                        dnode.setParentPath(parentPath)
                        storageId = self.dbConn.getStorageId(pathPrefix)
                        locationId = self.dbConn.getLocationId(storageId)
                        dnode.setLocationId(locationId)
                        dnode.setOwnerID(userId)
                        dnode.setCreatorID(userId)
                        dnode.setContentLength(os.path.getsize(file))
                        dnode.setContentMD5(self.md5calc.getMD5(file))
                        if not self.dbConn.nodeExists(dnode):
                            self.dbConn.insertNode(dnode)
            self.dbConn.disconnect()
            # add a counter to track the number of nodes (files and dirs) + log file
            response = { "responseType": "IMPORT_DONE" }
        else:
            response = { "responseType": "ERROR",
                         "errorCode": 2,
                         "errorMsg": "Unkown request type." }
        return response

    def scanRecursive(self):
        dirList = []
        fileList = []
        if os.path.isfile(self.path):
            p = self.path
            while p != '/':
                p = os.path.dirname(p)
                dirList.append(p)
            dirList.reverse()
            fileList.append([os.path.abspath(self.path)])
            return [ dirList, fileList ]
        for folder, subfolders, files in os.walk(self.path, topdown = True):
            cwd = os.path.basename(folder)
            if folder != self.path:
                parent = os.path.dirname(folder)
                dirList.append(parent + '/' + cwd)
                i = 0
                for f in files:
                    files[i] = parent + '/' + cwd + '/' + f
                    i += 1
                fileList.append(files)
        return [ dirList, fileList ]

    def run(self):
        print(f"Starting AMQP server of type {self.type}...")
        super(ImportAMQPServer, self).run()
 
+3 −0
Original line number Diff line number Diff line
import pika
import sys

from import_amqp_server import ImportAMQPServer
from start_job_amqp_server import StartJobAMQPServer
from get_job_amqp_server import GetJobAMQPServer
from abort_job_amqp_server import AbortJobAMQPServer
@@ -23,6 +24,8 @@ class JobHandler(object):
            self.amqpServerList.append(AbortJobAMQPServer(self.host, self.port, rpcQueue))
        elif srvType == 'store':
            self.amqpServerList.append(StoreAMQPServer(self.host, self.port, rpcQueue))
        elif srvType == 'import':
            self.amqpServerList.append(ImportAMQPServer(self.host, self.port, rpcQueue))
        else:
            sys.exit(f"FATAL: unknown server type {srvType}.")

+4 −1
Original line number Diff line number Diff line
@@ -22,6 +22,9 @@ class TransferService(object):
        # PushToVOSpace (via dataArchiver, the 'unofficial' command line client)
        self.jobHandler.addAMQPServer('store', 'store_job_queue')       
        
        # Import
        self.jobHandler.addAMQPServer('import', 'import_queue')

    def start(self):
        self.jobScheduler.start()
        self.jobHandler.start()