Commit 65595a2b authored by Cristiano Urban's avatar Cristiano Urban
Browse files

Added client for internal use (push).

parent 626568e0
Loading
Loading
Loading
Loading
+118 −0
Original line number Diff line number Diff line
import pika
import uuid
import json
import sys


class AMQPClient(object):

    def __init__(self):
        self.rpcQueue = "store_job_queue"
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host = "rabbitmq"))
        self.channel = self.connection.channel()
        result = self.channel.queue_declare(queue = '', exclusive = True)
        self.callback_queue = result.method.queue
        self.channel.basic_consume(queue = self.callback_queue, on_message_callback = self.on_response, auto_ack = True)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = json.loads(body)

    def call(self, msg):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange = '',
                                   routing_key = self.rpcQueue,
                                   properties = pika.BasicProperties(reply_to = self.callback_queue,
                                                                     correlation_id = self.corr_id,
                                                                    ),
                                   body = json.dumps(msg))

        while self.response is None:
            self.connection.process_data_events()
        return self.response


def help():
    sys.exit("""
NAME
       data_archiver_cli

SYNOPSYS
       python3.x data_archiver_cli.py COMMAND USERNAME

DESCRIPTION
       The client accepts only one command at a time. This command is mandatory.
       A list of supported commands is shown here below:

       cstore
              performs a 'cold storage' request, data will be saved on the tape library

       hstore
              performs a 'hot storage' request, data will be saved on a standard server

       status
              returns the current status of the 'store' folder on the transfer node.

       The client needs also to know the username associated to a store request process.
       The username must be the same used for accessing the transfer node.
    """)

def store(cmd, username):
    request_type = cmd.upper()
    dataArchiverCli = AMQPClient()
    storeRequest = { "requestType": request_type, "userName": username }
    print(f"Sending {request_type} request:")
    print(json.dumps(storeRequest, indent = 3))
    storeResponse = dataArchiverCli.call(storeRequest)
    if "responseType" not in storeResponse:
        sys.exit("FATAL: Malformed response, store acknowledge expected.")
    elif storeResponse["responseType"] == "STORE_ACK":
        print("WARNING: if you answer 'yes', your data on the transfer node will be read-only!!!")
        confirm = None
        while not confirm in [ "yes", "no"]:
            confirm = input("Are you sure to proceed? [yes/no]: ")
        if confirm == "yes":
            confirmRequest = { "requestType": "STORE_CON", "userName": username }
            confirmResponse = dataArchiverCli.call(confirmRequest)
            if "responseType" not in storeResponse:
                sys.exit("FATAL: Malformed response, store confirmation expected.")
            elif confirmResponse["responseType"] == "STORE_RUN":
                print("Store process started successfully")
            else:
                sys.exit("FATAL: Unknown")
        else:
            sys.exit("Store process aborted gracefully.")
    elif storeResponse["responseType"] == "ERROR":
        errorCode = storeResponse["errorCode"]
        errorMsg = storeResponse["errorMsg"]
        sys.exit(f"Error code: {errorCode}, Error message: {errorMsg}")
    else:
        sys.exit("FATAL: Unknown response type.")
    #print("Response:")
    #print(json.dumps(response, indent = 3))


# Check the number of input args
if len(sys.argv) == 3:
    script, cmd, username = sys.argv
else:
    help()

# Check the command type passed by the user
if cmd == "cstore":
    store(cmd, username)
elif cmd == "hstore":
    store(cmd, username)
elif cmd == "status":
    dataArchiverCli = AMQPClient()
    statusRequest = { "requestType": "STATUS", "userName": username }
    print("Sending status request:")
    print(json.dumps(statusRequest, indent = 3))
    statusResponse = dataArchiverCli.call(statusRequest)
    if statusResponse["status"] == "READY":
        print("Ready to go!")
    elif statusResponse["status"] == "BUSY":
        print("A process is active...")
else:
    help()