Commit 626568e0 authored by Cristiano Urban's avatar Cristiano Urban
Browse files

Added store_amqp_server.

parent 03eefc10
Loading
Loading
Loading
Loading
+3 −0
Original line number Diff line number Diff line
@@ -24,5 +24,8 @@ RUN dnf install -y xrootd-client xrootd-client-devel xrootd python3-xrootd
# Run commands as transfer_service user
USER transfer_service

# Create a 'store' directory with a file
RUN mkdir /home/transfer_service/store && touch /home/transfer_service/store/foo.txt

# Run a shell
CMD /bin/bash
+3 −0
Original line number Diff line number Diff line
@@ -4,6 +4,7 @@ import sys
from start_job_amqp_server import StartJobAMQPServer
from get_job_amqp_server import GetJobAMQPServer
from abort_job_amqp_server import AbortJobAMQPServer
from store_amqp_server import StoreAMQPServer

class JobHandler(object):

@@ -18,6 +19,8 @@ class JobHandler(object):
            self.amqpServerList.append(GetJobAMQPServer(self.host, rpcQueue))
        elif srvType == 'abort':
            self.amqpServerList.append(AbortJobAMQPServer(self.host, rpcQueue))
        elif srvType == 'store':
            self.amqpServerList.append(StoreAMQPServer(self.host, rpcQueue))
        else:
            sys.exit(f"FATAL: unknown server type {srvType}.")

+81 −0
Original line number Diff line number Diff line
#  TODO:
#  - error codes and status codes list and description
#
#


import os
import sys
import json

from amqp_server import AMQPServer
from job import Job

class StoreAMQPServer(AMQPServer):

    def __init__(self, host, queue):
        self.type = "store"
        self.storeAck = False
        super(StoreAMQPServer, self).__init__(host, queue)

    def execute_callback(self, requestBody):
        # requestType and userName attributes are mandatory
        if "requestType" not in requestBody or "userName" not in requestBody:
            response = { "errorCode": 1, "errorMsg": "Malformed request." }
        elif requestBody["requestType"] == "STATUS":
            folderPath = "/home/" + requestBody["userName"] + "/store"
            uid = os.stat(folderPath).st_uid
            gid = os.stat(folderPath).st_gid
            if os.access(folderPath, os.W_OK) and uid and gid:
                response = { "responseType": "STATUS", "status": "READY" }
            else:
                response = { "responseType": "STATUS", "status": "BUSY" }
        elif requestBody["requestType"] == "CSTORE" or requestBody["requestType"] == "HSTORE":
            # check if the user dir is not empty and write permissions are enabled
            # if so, generate a Job object, store it in the cache and push it
            # also to the pending queue
            folderPath = "/home/" + requestBody["userName"] + "/store"
            uid = os.stat(folderPath).st_uid
            gid = os.stat(folderPath).st_gid
            if os.access(folderPath, os.W_OK) and os.listdir(folderPath) and uid and gid:
                response = { "responseType": "STORE_ACK" }
                self.storeAck = True
            else:
                response = { "responseType": "ERROR",
                             "errorCode": 3,
                             "errorMsg": "FATAL: permission denied." }
        elif requestBody["requestType"] == "STORE_CON":
            if self.storeAck:
                self.storeAck = False
                self.job = Job()
                self.job.setInfo(requestBody)
                self.job.setPhase("PENDING")
                print("Job generated.")
                response = { "responseType": "STORE_RUN" }
                # push data on redis...
            else:
                response = { "responseType": "ERROR",
                             "errorCode": 4,
                             "errorMsg": "FATAL: store request not acknowledged." }
        else:
            response = { "responseType": "ERROR",
                         "errorCode": 2,
                         "errorMsg": "Unkown request type." }

        return response

        #self.jobCache.set(self.job)
        #redis_res = self.jobCache.get(self.job.jobID)
        #print(f"Redis response: {redis_res}")
        #return redis_res

    def userExists(self, username):
        fp = open("/etc/passwd", 'r')
        for line in fp:
            if line.split(':')[0] == username:
                return True
        return False

    def run(self):
        print(f"Starting AMQP server of type {self.type}...")
        super(StoreAMQPServer, self).run()
+5 −0
Original line number Diff line number Diff line
@@ -9,10 +9,15 @@ class TransferService(object):

    def __init__(self):
        self.jobHandler = JobHandler('rabbitmq')

        # PullFromVOSpace
        self.jobHandler.addAMQPServer('start', 'start_job_queue')
        self.jobHandler.addAMQPServer('poll', 'poll_job_queue')
        self.jobHandler.addAMQPServer('abort', 'abort_job_queue')

        # Push
        self.jobHandler.addAMQPServer('store', 'store_job_queue')

    def run(self):
        self.jobHandler.run()