Commit b1752bcd authored by Cristiano Urban's avatar Cristiano Urban
Browse files

Added 'StorageAMQPServer' class.

parent 6e56fe30
Loading
Loading
Loading
Loading
+3 −0
Original line number Diff line number Diff line
@@ -3,6 +3,7 @@ import sys

from import_amqp_server import ImportAMQPServer
from start_job_amqp_server import StartJobAMQPServer
from storage_amqp_server import StorageAMQPServer
from get_job_amqp_server import GetJobAMQPServer
from abort_job_amqp_server import AbortJobAMQPServer
from store_amqp_server import StoreAMQPServer
@@ -26,6 +27,8 @@ class JobHandler(object):
            self.amqpServerList.append(StoreAMQPServer(self.host, self.port, rpcQueue))
        elif srvType == 'import':
            self.amqpServerList.append(ImportAMQPServer(self.host, self.port, rpcQueue))
        elif srvType == 'storage':
            self.amqpServerList.append(StorageAMQPServer(self.host, self.port, rpcQueue))
        else:
            sys.exit(f"FATAL: unknown server type {srvType}.")

+61 −0
Original line number Diff line number Diff line
import os

from amqp_server import AMQPServer
from config import Config
from db_connector import DbConnector


class StorageAMQPServer(AMQPServer):

    def __init__(self, host, port, queue):
        self.type = "storage"
        config = Config("vos_ts.conf")       
        self.params = config.loadSection("file_catalog")
        self.dbConn = DbConnector(self.params["user"], 
                                  self.params["password"], 
                                  self.params["host"], 
                                  self.params.getint("port"), 
                                  self.params["db"])
        self.storageType = None
        self.storageBasePath = None
        self.storageHostname = None
        super(StorageAMQPServer, self).__init__(host, port, queue)
    
    def execute_callback(self, requestBody):
        # 'requestType', 'mountPoint', 'hostname' and 'storageType' attributes are mandatory
        if "requestType" not in requestBody:
            response = { "responseType": "ERROR",
                         "errorCode": 1, 
                         "errorMsg": "Malformed request, missing parameters." }
        elif requestBody["requestType"] == "STORAGE_ADD":
            self.storageType = requestBody["storageType"]
            self.storageBasePath = requestBody["basePath"]
            self.storageHostname = requestBody["hostname"]
            
            if not os.path.exists(self.storageBasePath):
                response = { "responseType": "ERROR",
                             "errorCode": 2,
                             "errorMsg": "Base path doesn't exist."}
                return response
            self.dbConn.connect()
            result = self.dbConn.insertStorage(self.storageType, 
                                               self.storageBasePath, 
                                               self.storageHostname)
            self.dbConn.disconnect()

            if result:
                response = { "responseType": "STORAGE_ADD_DONE" }
                return response
            else:
                response = { "responseType": "ERROR",
                             "errorCode": 3,
                             "errorMsg": "Storage point already exists." }
                return response
        elif requestBody["requestType"] == "STORAGE_RMV":
            pass
        elif requestBody["requestType"] == "STORAGE_LST":
            pass
    
    def run(self):
        print(f"Starting AMQP server of type {self.type}...")
        super(StorageAMQPServer, self).run()
+3 −0
Original line number Diff line number Diff line
@@ -25,6 +25,9 @@ class TransferService(object):
        # Import
        self.jobHandler.addAMQPServer('import', 'import_queue')
        
        # Storage
        self.jobHandler.addAMQPServer('storage', 'storage_queue')

    def start(self):
        self.jobScheduler.start()
        self.jobHandler.start()