Commit 692df5d1 authored by Cristiano Urban's avatar Cristiano Urban
Browse files

dataArchiverCli renamed in vos_data + added input exception handling + minor changes.

parent efc70873
Loading
Loading
Loading
Loading
+4 −3
Original line number Diff line number Diff line
@@ -8,14 +8,15 @@ RUN apt-get -y update && apt-get -y install postgresql-client && apt-get -y inst
# Create a new user called 'client', create the home directory and set the default shell
RUN useradd -m -s /bin/bash client

# Copy the source code of all client apps
COPY vos_data /usr/bin/
RUN chmod 755 /usr/bin/vos_data

# Run commands as 'client' user
USER client

# Set up a workdir for the container
WORKDIR /home/client/

# Copy the source code of all client apps
COPY *.py ./

# Install python dependencies
RUN pip3.9 install --no-cache-dir pika
+38 −17
Original line number Diff line number Diff line
#!/usr/bin/env python

import pika
import uuid
import json
@@ -36,23 +38,23 @@ class AMQPClient(object):
    def help(self):
        sys.exit("""
NAME
       dataArchiverCli.py
       vos_data

SYNOPSYS
       python3.x dataArchiverCli.py COMMAND USERNAME
       vos_data COMMAND USERNAME

DESCRIPTION
       The purpose of this client application is to notify to the VOSpace backend that
       data is ready to be saved somewhere.
       
       The client accepts only one command at a time. This command is mandatory.
       The client accepts only one (mandatory) command at a time.
       A list of supported commands is shown here below:

       cstore
              performs a 'cold storage' request, data will be saved on the tape library
              performs a 'cold storage' request, data will be saved on tape

       hstore
              performs a 'hot storage' request, data will be saved on a standard server
              performs a 'hot storage' request, data will be saved to disk

       The client also needs to know the username associated to a storage request process.
       The username must be the same used for accessing the transfer node.
@@ -63,39 +65,58 @@ DESCRIPTION
        storeRequest = { "requestType": request_type, "userName": username }
        print(f"\nSending {request_type} request...\n")
        storeResponse = self.call(storeRequest)
        if "responseType" not in storeResponse or "storageList" not in storeResponse:
            sys.exit("FATAL: Malformed response, store acknowledge expected.\n")
        if "responseType" not in storeResponse:
            sys.exit("FATAL: Malformed response, storage acknowledge expected.\n")
        elif storeResponse["responseType"] == "STORE_ACK":
            storageList = storeResponse["storageList"]
            print("Choose one of the following storage locations:\n")
            print("----------------------------------------------------------------------")
            storageIdList = []
            for st in storageList:
                storageIdList.append(st["storage_id"])
                print("storage_id: {:<2d}   =>   hostname: {}".format(st['storage_id'], st['hostname']))
                print("[*] storage_id: {:<2d}   =>   hostname: {}".format(st['storage_id'], st['hostname']))
            storageId = None
            print("----------------------------------------------------------------------\n")
            while not storageId in storageIdList:
                storageId = int(input("\nInsert the storage id: "))
            print("\nWARNING!!! WARNING!!! WARNING!!! WARNING!!! WARNING!!!")
                try:
                    storageId = input("Please, insert a storage id: ")
                    storageId = int(storageId)
                except ValueError:
                    print("Input type is not valid!")
                except EOFError:
                    print("\nPlease, use CTRL+C to quit.")
                except KeyboardInterrupt:
                    sys.exit("\nCTRL+C detected. Exiting...")
            print()
            print("!!!!!!!!!!!!!!!!!!!!!!!!!!WARNING!!!!!!!!!!!!!!!!!!!!!!!!!!!")
            print("If you confirm, all your data on the transfer node will be")
            print("available in read-only mode for all the time the archiving")
            print("available in read-only mode for all the time the storage")
            print("process is running.")
            print("WARNING!!! WARNING!!! WARNING!!! WARNING!!! WARNING!!!\n")
            print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
            print()
            confirm = None
            while not confirm in [ "yes", "no"]:
                try:
                    confirm = input("Are you sure to proceed? [yes/no]: ")
                except KeyboardInterrupt:
                    sys.exit("\nCTRL+C detected. Exiting...")
                except EOFError:
                    print("\nPlease, use CTRL+C to quit.")
            if confirm == "yes":
                confirmRequest = { "requestType": "STORE_CON", "userName": username, "storageId": storageId }
                confirmResponse = self.call(confirmRequest)
                if "responseType" not in confirmResponse:
                    sys.exit("\nFATAL: Malformed response, store confirmation expected.\n")
                    sys.exit("\nFATAL: Malformed response, storage confirmation expected.\n")
                elif confirmResponse["responseType"] == "STORE_RUN":
                    jobId = confirmResponse["jobId"]
                    print(f"\nJobID: {jobId}")
                    print("Store process started successfully!\n")
                    print()
                    print(f"JobID: {jobId}")
                    print("Storage process started successfully!")
                    print()
                else:
                    sys.exit("FATAL: Unknown response type.\n")
            else:
                sys.exit("\nStore process aborted gracefully.\n")
                sys.exit("\nStorage process aborted gracefully.\n")
        elif storeResponse["responseType"] == "ERROR":
            errorCode = storeResponse["errorCode"]
            errorMsg = storeResponse["errorMsg"]

client/vos_rest_client.py

deleted100644 → 0
+0 −96
Original line number Diff line number Diff line
import pika
import uuid
import json
import sys


class AMQPClient(object):

    def __init__(self, rpcQueue):
        self.rpcQueue = rpcQueue
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host = "rabbitmq"))
        self.channel = self.connection.channel()
        result = self.channel.queue_declare(queue = '', exclusive = True)
        self.callback_queue = result.method.queue
        self.channel.basic_consume(queue = self.callback_queue, on_message_callback = self.on_response, auto_ack = True)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = json.loads(body)

    def call(self, msg):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange = '',
                                   routing_key = self.rpcQueue,
                                   properties = pika.BasicProperties(reply_to = self.callback_queue,
                                                                     correlation_id = self.corr_id,
                                                                    ),
                                   body = json.dumps(msg))

        while self.response is None:
            self.connection.process_data_events()
        return self.response
      

if len(sys.argv) == 2:
    script, queue = sys.argv
    if queue != "start_job_queue":
        sys.exit("FATAL: wrong number of input args.")  
elif len(sys.argv) == 3:
    script, queue, jobid = sys.argv
    if queue != "poll_job_queue":
        sys.exit("FATAL: wrong number of input args.")
else:
    sys.exit("FATAL: wrong number of input args.")

if queue == "start_job_queue":
    testClient = AMQPClient(queue)
    transferRequest = {
        "jobId": str(uuid.uuid1().hex),
        "runId": None,
        "ownerId": "anonymous",
        "phase": "PENDING",
        "quote": None,
        "creationTime": None,
        "startTime": None,
        "endTime": None,
        "executionDuration": 0,
        "destruction": None,
        "parameters": None,
        "results": [],
        "errorSummary": None,
        "jobInfo": {
            "transfer": {
                "target": "vos://example.com!vospace/mydata1",
                "direction": "pullToVoSpace",
                "view": None,
                "protocol": [
                    {
                        "endpoint": None,
                        "param": [],
                        "uri": "ia2:tape-recall"
                    }
                ],
                "keepBytes": None,
                "version": None
            }
        },
        "version": None
    }

    print("Sending transfer request:")
    print(json.dumps(transferRequest, indent = 3))
    response = testClient.call(transferRequest)
    print("Response:")
    print(json.dumps(response, indent = 3))
elif queue == "poll_job_queue":
    testClient = AMQPClient(queue)
    pollRequest = { "jobId": jobid }
    print("Sending poll request:")
    print(json.dumps(pollRequest, indent = 3))
    response = testClient.call(pollRequest)
    print("Response:")
    print(json.dumps(response, indent = 3))
else:
    sys.exit("FATAL: invalid queue name.")
 No newline at end of file