Commit 07c94856 authored by Cristiano Urban's avatar Cristiano Urban
Browse files

Changed way to import VOSpace nodes.

parent 2e1759d0
Loading
Loading
Loading
Loading
+31 −150
Original line number Original line Diff line number Diff line
#!/usr/bin/env python
#!/usr/bin/env python


import os
import os
import re


from redis_rpc_server import RedisRpcServer
from config import Config
from config import Config
from checksum import Checksum
from datetime import datetime as dt
from db_connector import DbConnector
from db_connector import DbConnector
from mailer import Mailer
from import_job_queue import ImportJobQueue
from node import Node
from redis_rpc_server import RedisRpcServer
from system_utils import SystemUtils
from system_utils import SystemUtils
from tabulate import tabulate
from tape_client import TapeClient


from multiprocessing import Process


class ImportAMQPServer(RedisRpcServer):
class ImportAMQPServer(RedisRpcServer):


    def __init__(self, host, port, db, rpcQueue):
    def __init__(self, host, port, db, rpcQueue):
        self.type = "import"
        self.type = "import"
        self.md5calc = Checksum()
        config = Config("/etc/vos_ts/vos_ts.conf")       
        config = Config("/etc/vos_ts/vos_ts.conf")       
        self.params = config.loadSection("file_catalog")
        self.params = config.loadSection("file_catalog")
        self.dbConn = DbConnector(self.params["user"], 
        self.dbConn = DbConnector(self.params["user"], 
@@ -30,11 +22,9 @@ class ImportAMQPServer(RedisRpcServer):
                                  self.params["db"],
                                  self.params["db"],
                                  1,
                                  1,
                                  1)
                                  1)
        self.params = config.loadSection("spectrum_archive")
        self.params = config.loadSection("scheduling")
        self.tapeClient = TapeClient(self.params["host"],
        self.maxReadyJobs = self.params.getint("max_ready_jobs")
                                     self.params.getint("port"),
        self.importReadyQueue = ImportJobQueue("import_ready")
                                     self.params["user"],
                                     self.params["pkey_file_path"])
        self.systemUtils = SystemUtils()
        self.systemUtils = SystemUtils()
        super(ImportAMQPServer, self).__init__(host, port, db, rpcQueue)
        super(ImportAMQPServer, self).__init__(host, port, db, rpcQueue)


@@ -73,163 +63,54 @@ class ImportAMQPServer(RedisRpcServer):
                response = { "responseType": "ERROR",
                response = { "responseType": "ERROR",
                             "errorCode": 4,
                             "errorCode": 4,
                             "errorMsg": "Path not found." }
                             "errorMsg": "Path not found." }
                return response
            elif not os.path.isdir(path):
            elif not os.path.isdir(path):
                response = { "responseType": "ERROR",
                response = { "responseType": "ERROR",
                             "errorCode": 5,
                             "errorCode": 5,
                             "errorMsg": "Directory path expected." }
                             "errorMsg": "Directory path expected." }
                return response
            elif username not in path:
            elif username not in path:
                response = { "responseType": "ERROR",
                response = { "responseType": "ERROR",
                             "errorCode": 6,
                             "errorCode": 6,
                             "errorMsg": "Directory path does not contain the username." }
                             "errorMsg": "Directory path does not contain the username." }
                return response
            elif os.path.dirname(path) != pathPrefix + '/' + username:
            elif os.path.dirname(path) != pathPrefix + '/' + username:
                response = { "responseType": "ERROR",
                response = { "responseType": "ERROR",
                             "errorCode": 7,
                             "errorCode": 7,
                             "errorMsg": "Invalid path, directory must be located in " + pathPrefix + '/' + username  }
                             "errorMsg": "Invalid path, directory must be located in " + pathPrefix + '/' + username  }
                return response            
            elif self.importReadyQueue.len() >= self.maxReadyJobs:
                response = { "responseType": "ERROR",
                             "errorCode": 8, 
                             "errorMsg": "Import queue is full, please, retry later." }
            else:
            else:
                p = Process(target = self.load, 
                job = dict()
                            args = (self.tapeClient, 
                job["userId"] = userId
                                    self.dbConn, 
                job["path"] = path
                                    self.md5calc, 
                job["pathPrefix"] = pathPrefix
                                    self.systemUtils, 
                job["storageId"] = storageId
                                    path, 
                job["storageType"] = storageType
                                    pathPrefix, 
                self.importReadyQueue.insertJob(job)
                                    storageType, 
                
                                    storageId,
                #p = Process(target = self.load, 
                                    userId,), 
                #            args = (self.tapeClient, 
                            daemon = True)
                #                    self.dbConn, 
                p.start()
                #                    self.md5calc, 
                #                    self.systemUtils, 
                #                    path, 
                #                    pathPrefix, 
                #                    storageType, 
                #                    storageId,
                #                    userId,), 
                #            daemon = True)
                #p.start()
            # add a counter to track the number of nodes (files and dirs) + log file
            # add a counter to track the number of nodes (files and dirs) + log file
            response = { "responseType": "IMPORT_STARTED" }
            response = { "responseType": "IMPORT_STARTED" }
        else:
        else:
            response = { "responseType": "ERROR",
            response = { "responseType": "ERROR",
                         "errorCode": 8,
                         "errorCode": 9,
                         "errorMsg": "Unkown request type." }
                         "errorMsg": "Unkown request type." }


        return response    
        return response    
        
        
    def load(self, tapeClient, dbConn, md5calc, systemUtils, path, pathPrefix, storageType, storageId, userId):
    #def load(self, tapeClient, dbConn, md5calc, systemUtils, path, pathPrefix, storageType, storageId, userId):
        """
        This method performs an import and is executed from a separate process in order to allow the 
        'execute_callback' to return quickly.
        """
        start = dt.now()
        nodeList = []
        timestamp = dt.now().strftime("%Y_%m_%d-%H_%M_%S")
        nodeListFile = "vos_import_report-" + timestamp
        nlfp = open(nodeListFile, "w")
        
        out = open("import_amqp_server_log.txt", "a")
        if storageType == "cold":                
                tapeClient.connect()
                tapeClient.recallChecksumFiles(path)
                tapeClient.disconnect()                
                
        [ dirs, files ] = systemUtils.scanRecursive(os.path.dirname(path))
                
        tstampWrapperDirPattern = re.compile("/[0-9]{4}_[0-9]{2}_[0-9]{2}-[0-9]{2}_[0-9]{2}_[0-9]{2}-vos_wrapper")
        for dir in dirs:            
            out.write(f"DIR dir: {dir}\n")
            out.write(f"DIR pathPrefix: {pathPrefix}\n\n")
                    
            if path in dir:
                parentPath = os.path.dirname(dir).split(pathPrefix)[1]
                nodeName = os.path.basename(dir)

                cnode = Node(nodeName, "container")
                        
                if not tstampWrapperDirPattern.match("/" + nodeName):
                    if tstampWrapperDirPattern.search(parentPath):
                        tstampWrapperDir = tstampWrapperDirPattern.search(parentPath).group(0).lstrip('/')
                        parentPath = tstampWrapperDirPattern.sub("", parentPath)
                        cnode.setWrapperDir(tstampWrapperDir)
                                
                    if parentPath == '/':
                        vospacePath = parentPath + nodeName
                    else:
                        vospacePath = parentPath + '/' + nodeName
                        
                    cnode.setParentPath(parentPath)
                    locationId = dbConn.getLocationId(storageId)
                    cnode.setLocationId(locationId)
                    cnode.setCreatorID(userId)
                    cnode.setContentLength(0)
                    if not dbConn.nodeExists(cnode):
                        dbConn.insertNode(cnode)
                        dbConn.setAsyncTrans(vospacePath, True)
                        dbConn.setSticky(vospacePath, True)
                        now = dt.now()
                        nodeList.append([ now, dir, vospacePath, "container", "DONE" ])
                    else:
                        now = dt.now()
                        nodeList.append([ now, dir, vospacePath, "container", "SKIP" ])

        for flist in files:
            for file in flist:                
                if md5calc.fileIsValid(file) and path in os.path.dirname(file):
                    out.write(f"FILE files: {files}\n")
                    out.write(f"FILE flist: {flist}\n")
                    out.write(f"FILE file: {file}\n")
                    out.write(f"FILE pathPrefix: {pathPrefix}\n")
                    parentPath = os.path.dirname(file).split(pathPrefix)[1]
                    out.write(f"FILE parentPath: {parentPath}\n")
                    nodeName = os.path.basename(file)
                    out.write(f"FILE nodeName: {nodeName}\n")                                                    
                    dnode = Node(nodeName, "data")
                            
                    if tstampWrapperDirPattern.search(parentPath):
                        tstampWrapperDir = tstampWrapperDirPattern.search(parentPath).group(0).lstrip('/')
                        parentPath = tstampWrapperDirPattern.sub("", parentPath)
                        dnode.setWrapperDir(tstampWrapperDir)
                            
                    vospacePath = parentPath + '/' + nodeName
                    out.write(f"FILE vospacePath: {vospacePath}\n")
                    dnode.setParentPath(parentPath)
                    storageId = dbConn.getStorageId(pathPrefix)
                    locationId = dbConn.getLocationId(storageId)
                    dnode.setLocationId(locationId)
                    dnode.setCreatorID(userId)
                    dnode.setContentLength(os.path.getsize(file))
                    dnode.setContentMD5(md5calc.getMD5(file))
                            
                    if not dbConn.nodeExists(dnode):
                        dbConn.insertNode(dnode)
                        dbConn.setAsyncTrans(vospacePath, True)
                        dbConn.setSticky(vospacePath, True)
                        now = dt.now()
                        nodeList.append([ now, file, vospacePath, "data", "DONE" ])
                    else:
                        now = dt.now()
                        nodeList.append([ now, file, vospacePath, "data", "SKIP" ])
                        
        nlfp.write(tabulate(nodeList, 
                            headers = [ "Timestamp", "OS path", "VOSpace path", "Node type", "Result"],
                            tablefmt = "simple"))
        nlfp.close()
        end = dt.now()
                        
        m = Mailer()
        m.addRecipient("cristiano.urban@inaf.it")
        msg = f"""
        [VOSpace import procedure summary]
        
        Storage type: {storageType}
        Storage ID: {storageId}
        Creator ID: {userId}               
        Start time: {start}
        End time: {end}
        Processed nodes: {len(nodeList)}
        Imported nodes: {sum(res[-1] == 'DONE' for res in nodeList)}
        Skipped nodes: {sum(res[-1] == 'SKIP' for res in nodeList)}
        
        """
        m.setMessageWithAttachment("VOSpace import notification", msg, nodeListFile)
        m.send()                        
        
        
        os.remove(nodeListFile)


    def run(self):
    def run(self):
        print(f"Starting AMQP server of type {self.type}...")
        print(f"Starting AMQP server of type {self.type}...")
+181 −0
Original line number Original line Diff line number Diff line
#!/usr/bin/env python

import os
import re

from config import Config
from checksum import Checksum
from datetime import datetime as dt
from db_connector import DbConnector
from import_task_executor import ImportTaskExecutor 
from mailer import Mailer
from node import Node
from system_utils import SystemUtils
from tabulate import tabulate
from tape_client import TapeClient


class ImportExecutor(ImportTaskExecutor):
    
    def __init__(self):
        self.md5calc = Checksum()
        config = Config("/etc/vos_ts/vos_ts.conf")       
        self.params = config.loadSection("file_catalog")
        self.dbConn = DbConnector(self.params["user"], 
                                  self.params["password"], 
                                  self.params["host"], 
                                  self.params.getint("port"), 
                                  self.params["db"],
                                  1,
                                  1)
        self.params = config.loadSection("spectrum_archive")
        self.tapeClient = TapeClient(self.params["host"],
                                     self.params.getint("port"),
                                     self.params["user"],
                                     self.params["pkey_file_path"])
        self.systemUtils = SystemUtils()
        self.job = None
        self.userId = None
        self.path = None
        self.pathPrefix = None
        self.storageId = None
        self.storageType = None
        super(ImportExecutor, self).__init__()
        
    def importVOSpaceNodes(self):
        """This method performs the VOSpace import operation."""
        
        start = dt.now()
        nodeList = []
        timestamp = dt.now().strftime("%Y_%m_%d-%H_%M_%S")
        nodeListFile = "vos_import_report-" + timestamp
        nlfp = open(nodeListFile, "w")
        
        out = open("import_amqp_server_log.txt", "a")
        if self.storageType == "cold":                
            self.tapeClient.connect()
            self.tapeClient.recallChecksumFiles(self.path)
            self.tapeClient.disconnect()                
                
        [ dirs, files ] = self.systemUtils.scanRecursive(os.path.dirname(self.path))
                
        tstampWrapperDirPattern = re.compile("/[0-9]{4}_[0-9]{2}_[0-9]{2}-[0-9]{2}_[0-9]{2}_[0-9]{2}-vos_wrapper")
        for dir in dirs:            
            out.write(f"DIR dir: {dir}\n")
            out.write(f"DIR pathPrefix: {self.pathPrefix}\n\n")
                    
            if self.path in dir:
                parentPath = os.path.dirname(dir).split(self.pathPrefix)[1]
                nodeName = os.path.basename(dir)

                cnode = Node(nodeName, "container")
                        
                if not tstampWrapperDirPattern.match("/" + nodeName):
                    if tstampWrapperDirPattern.search(parentPath):
                        tstampWrapperDir = tstampWrapperDirPattern.search(parentPath).group(0).lstrip('/')
                        parentPath = tstampWrapperDirPattern.sub("", parentPath)
                        cnode.setWrapperDir(tstampWrapperDir)
                                
                    if parentPath == '/':
                        vospacePath = parentPath + nodeName
                    else:
                        vospacePath = parentPath + '/' + nodeName
                        
                    cnode.setParentPath(parentPath)
                    locationId = self.dbConn.getLocationId(self.storageId)
                    cnode.setLocationId(locationId)
                    cnode.setCreatorID(self.userId)
                    cnode.setContentLength(0)
                    if not self.dbConn.nodeExists(cnode):
                        self.dbConn.insertNode(cnode)
                        self.dbConn.setAsyncTrans(vospacePath, True)
                        self.dbConn.setSticky(vospacePath, True)
                        now = dt.now()
                        nodeList.append([ now, dir, vospacePath, "container", "DONE" ])
                    else:
                        now = dt.now()
                        nodeList.append([ now, dir, vospacePath, "container", "SKIP" ])

        for flist in files:
            for file in flist:                
                if self.md5calc.fileIsValid(file) and self.path in os.path.dirname(file):
                    out.write(f"FILE files: {files}\n")
                    out.write(f"FILE flist: {flist}\n")
                    out.write(f"FILE file: {file}\n")
                    out.write(f"FILE pathPrefix: {self.pathPrefix}\n")
                    parentPath = os.path.dirname(file).split(self.pathPrefix)[1]
                    out.write(f"FILE parentPath: {parentPath}\n")
                    nodeName = os.path.basename(file)
                    out.write(f"FILE nodeName: {nodeName}\n")                                                    
                    dnode = Node(nodeName, "data")
                            
                    if tstampWrapperDirPattern.search(parentPath):
                        tstampWrapperDir = tstampWrapperDirPattern.search(parentPath).group(0).lstrip('/')
                        parentPath = tstampWrapperDirPattern.sub("", parentPath)
                        dnode.setWrapperDir(tstampWrapperDir)
                            
                    vospacePath = parentPath + '/' + nodeName
                    out.write(f"FILE vospacePath: {vospacePath}\n")
                    dnode.setParentPath(parentPath)
                    self.storageId = self.dbConn.getStorageId(self.pathPrefix)
                    locationId = self.dbConn.getLocationId(self.storageId)
                    dnode.setLocationId(locationId)
                    dnode.setCreatorID(self.userId)
                    dnode.setContentLength(os.path.getsize(file))
                    dnode.setContentMD5(self.md5calc.getMD5(file))
                            
                    if not self.dbConn.nodeExists(dnode):
                        self.dbConn.insertNode(dnode)
                        self.dbConn.setAsyncTrans(vospacePath, True)
                        self.dbConn.setSticky(vospacePath, True)
                        now = dt.now()
                        nodeList.append([ now, file, vospacePath, "data", "DONE" ])
                    else:
                        now = dt.now()
                        nodeList.append([ now, file, vospacePath, "data", "SKIP" ])
                        
        nlfp.write(tabulate(nodeList, 
                            headers = [ "Timestamp", "OS path", "VOSpace path", "Node type", "Result"],
                            tablefmt = "simple"))
        nlfp.close()
        end = dt.now()
                        
        m = Mailer()
        m.addRecipient("cristiano.urban@inaf.it")
        msg = f"""
        [VOSpace import procedure summary]
        
        Storage type: {self.storageType}
        Storage ID: {self.storageId}
        Creator ID: {self.userId}               
        Start time: {start}
        End time: {end}
        Processed nodes: {len(nodeList)}
        Imported nodes: {sum(res[-1] == 'DONE' for res in nodeList)}
        Skipped nodes: {sum(res[-1] == 'SKIP' for res in nodeList)}
        
        """
        m.setMessageWithAttachment("VOSpace import notification", msg, nodeListFile)
        m.send()                        

        os.remove(nodeListFile)
    
    def run(self):
        print("Starting import executor...")
        self.setSourceQueueName("import_ready")
        self.setDestinationQueueName("import_terminated")
        while True:
            self.wait()
            if self.srcQueue.len() > 0:
                self.job = self.srcQueue.getJob()
                self.userId = self.job["userId"]
                self.path = self.job["path"]
                self.pathPrefix = self.job["pathPrefix"]
                self.storageId = self.job["storageId"]
                self.storageType = self.job["storageType"]
                self.importVOSpaceNodes()
                if self.destQueue.len() == self.maxTerminatedJobs:
                    self.destQueue.extractJob()
                self.destQueue.insertJob(self.jobObj)
                self.srcQueue.extractJob()
                print(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}")
+25 −0
Original line number Original line Diff line number Diff line
#!/usr/bin/env python

import json

from job_queue import JobQueue


class ImportJobQueue(JobQueue):
    
    def __init__(self, queueName):
        super(ImportJobQueue, self).__init__(queueName)
        
    def getJob(self):
        """Gets a copy of the first job without moving it out from the current queue."""
        job = json.loads(self.redisCli.lrange(self.queueName, self.len() - 1, self.len() - 1)[0].decode("utf-8"))
        return job

    def insertJob(self, job):
        """Pushes a new job into the queue."""
        self.redisCli.lpush(self.queueName, json.dumps(job))

    def extractJob(self):
        """Moves out a job from the end of the current queue."""
        job = json.loads(self.redisCli.brpop(self.queueName)[1].decode("utf-8"))
        return job
+15 −0
Original line number Original line Diff line number Diff line
#!/usr/bin/env python

from import_job_queue import ImportJobQueue
from task_executor import TaskExecutor

class ImportTaskExecutor(TaskExecutor):
    
    def __init__(self):
        super(ImportTaskExecutor, self).__init__()

    def setSourceQueueName(self, srcQueueName):
        self.srcQueue = ImportJobQueue(srcQueueName)

    def setDestinationQueueName(self, destQueueName):
        self.destQueue = ImportJobQueue(destQueueName)
+1 −0
Original line number Original line Diff line number Diff line
@@ -10,6 +10,7 @@ import redis
from config import Config
from config import Config
from job import Job
from job import Job



class JobQueue(object):
class JobQueue(object):


    def __init__(self, queueName):
    def __init__(self, queueName):
Loading