Commit 69f3da72 authored by Cristiano Urban's avatar Cristiano Urban
Browse files

Added hanlder for 'vos_job'.

parent 3105f5bb
Loading
Loading
Loading
Loading
+51 −0
Original line number Diff line number Diff line
import os

from amqp_server import AMQPServer
from config import Config
from db_connector import DbConnector


class JobAMQPServer(AMQPServer):

    def __init__(self, host, port, queue):
        self.type = "job"
        config = Config("vos_ts.conf")
        self.params = config.loadSection("file_catalog")
        self.dbConn = DbConnector(self.params["user"],
                                  self.params["password"],
                                  self.params["host"],
                                  self.params.getint("port"),
                                  self.params["db"])
        self.jobPhase = None
        super(JobAMQPServer, self).__init__(host, port, queue)

    def execute_callback(self, requestBody):
        # 'requestType' attribute is mandatory
        if "requestType" not in requestBody:
            response = { "responseType": "ERROR",
                         "errorCode": 1, 
                         "errorMsg": "Malformed request, missing parameters." }
        elif requestBody["requestType"] == "JOB_LIST":
            self.dbConn.connect()
            result = self.dbConn.listActiveJobs()
            self.dbConn.disconnect()
            response = { "responseType": "LST_DONE", "jobList": result }
        elif requestBody["requestType"] == "JOB_BY_PHASE":
            self.jobPhase = requestBody["jobPhase"]
            self.dbConn.connect()
            result = self.dbConn.listJobsByPhase(self.jobPhase)
            self.dbConn.disconnect()
            response = { "responseType": "LST_BY_PHASE_DONE", "jobList": result }
        elif requestBody["requestType"] == "JOB_INFO":
            pass
        elif requestBody["requestType"] == "JOB_RESULTS":
            pass                
        else:
            response = { "responseType": "ERROR",
                         "errorCode": 2,
                         "errorMsg": "Unkown request type." }
        return response
      
    def run(self):
        print(f"Starting AMQP server of type {self.type}...")
        super(JobAMQPServer, self).run()
+3 −0
Original line number Diff line number Diff line
@@ -2,6 +2,7 @@ import pika
import sys

from import_amqp_server import ImportAMQPServer
from job_amqp_server import JobAMQPServer
from start_job_amqp_server import StartJobAMQPServer
from storage_amqp_server import StorageAMQPServer
from get_job_amqp_server import GetJobAMQPServer
@@ -29,6 +30,8 @@ class JobHandler(object):
            self.amqpServerList.append(ImportAMQPServer(self.host, self.port, rpcQueue))
        elif srvType == 'storage':
            self.amqpServerList.append(StorageAMQPServer(self.host, self.port, rpcQueue))
        elif srvType == 'job':
            self.amqpServerList.append(JobAMQPServer(self.host, self.port, rpcQueue))
        else:
            sys.exit(f"FATAL: unknown server type {srvType}.")

+4 −1
Original line number Diff line number Diff line
@@ -19,12 +19,15 @@ class TransferService(object):
        self.jobHandler.addAMQPServer('poll', 'poll_job_queue')
        self.jobHandler.addAMQPServer('abort', 'abort_job_queue')

        # PushToVOSpace (via dataArchiver, the 'unofficial' command line client)
        # PushToVOSpace (via vos_data, the 'unofficial' command line client)
        self.jobHandler.addAMQPServer('store', 'store_job_queue')       
        
        # Import
        self.jobHandler.addAMQPServer('import', 'import_queue')
        
        # Job
        self.jobHandler.addAMQPServer('job', 'job_queue')
        
        # Storage
        self.jobHandler.addAMQPServer('storage', 'storage_queue')