Commit 55c53e20 authored by Cristiano Urban's avatar Cristiano Urban
Browse files

Merge branch 'testing'

parents a2e66fb1 e3223622
Loading
Loading
Loading
Loading
+69 −12
Original line number Diff line number Diff line
Simple communication test that involes 4 docker containers:
- AMQP client (container_name: test_client, file to run: test_client.py)
- AMQP server (container_name: transfer_service)
- client (container_name: test_client, files to run: 'vos_rest_client.py' and 'dataArchiver.py')
- server (container_name: transfer_service)
- RabbitMQ (container_name: rabbitmq)
- Redis (container_name: redis)

@@ -16,18 +16,18 @@ These last two containers are employed to test transfers using xrootd python bin
You can start the whole environment with (launch the following command from the 'docker' dir):
$ docker-compose up

Once all the containers are up and running, open another shell and access the test_client container:
$ docker exec -it test_client /bin/bash
Once all the containers are up and running, open another shell and access the 'client' container:
$ docker exec -it client /bin/bash

At this point you can launch the test_client.py within the test_client container using the following syntax:
$ python test_client.py QUEUE_NAME
At this point you can launch the 'vos_rest_client.py' within the 'client' container using the following syntax:
$ python vos_rest_client.py QUEUE_NAME

For example:
$ python test_client.py start_job_queue
$ python vos_rest_client.py start_job_queue

The output should be something like this:

     test_client@a89c0bb962f7:~$ python test_client.py start_job_queue
     client@a89c0bb962f7:~$ python vos_rest_client.py start_job_queue
     Sending transfer request:
     {
        "transfer": {
@@ -64,14 +64,14 @@ The output should be something like this:
After processing the request, the server launches an internal thread delayed of 10 seconds which changes the 
state of the job from "PENDING" to "EXECUTING".
You can easily verify this change by launching again the client in this other way:
$ python test_client.py QUEUE_NAME JOB_ID
$ python vos_rest_client.py QUEUE_NAME JOB_ID

For example, in our case:
$ python test_client.py poll_job_queue 3ff92acedc9611eabf140242ac1f0007
$ python vos_rest_client.py poll_job_queue 3ff92acedc9611eabf140242ac1f0007

The output should be something like this:

     test_client@a89c0bb962f7:~$ python test_client.py poll_job_queue 3ff92acedc9611eabf140242ac1f0007
     client@a89c0bb962f7:~$ python vos_rest_client.py poll_job_queue 3ff92acedc9611eabf140242ac1f0007
     Sending poll request:
     {
        "jobID": "3ff92acedc9611eabf140242ac1f0007"
@@ -98,6 +98,52 @@ The output should be something like this:
        }
     }

---------------------------------------------------------------------------------------------------------------

Another thing you can do is to launch the 'dataArchiver.py' client.
Launching the client without any argument will show you how to use it:

client@28970a09202d:~$ python3 dataArchiverCli.py

NAME
       dataArchiverCli.py

SYNOPSYS
       python3.x dataArchiverCli.py COMMAND USERNAME

DESCRIPTION
       The purpose of this client application is to notify to the VOSpace backend that
       data is ready to be saved somewhere.
       
       The client accepts only one command at a time. This command is mandatory.
       A list of supported commands is shown here below:

       cstore
              performs a 'cold storage' request, data will be saved on the tape library

       hstore
              performs a 'hot storage' request, data will be saved on a standard server

       The client also needs to know the username associated to a storage request process.
       The username must be the same used for accessing the transfer node.

       
For example, if we want to perform a 'cold storage' request for a the 'transfer_service' user, we do:
client@28970a09202d:~$ python3 dataArchiverCli.py cstore transfer_service

Sending CSTORE request...

WARNING!!! WARNING!!! WARNING!!! WARNING!!! WARNING!!!
If you confirm, all your data on the transfer node will be
available in read-only mode for all the time the archiving
process is running.
WARNING!!! WARNING!!! WARNING!!! WARNING!!! WARNING!!!

Are you sure to proceed? [yes/no]: yes

JobID: c63697eafbf711eaa44d0242ac1c0008
Store process started successfully!

###############################################################################################################
     
You can access the rabbitmq web interface via browser:
@@ -105,7 +151,18 @@ You can access the rabbitmq web interface via browser:
    $ docker network inspect vos-ts_backend_net | grep -i -A 3 rabbitmq
    2) Open your browser and point it to http://IP_ADDRESS:15672 (user: guest, password: guest)

You can access the file catalog from the test_client container:
You can access the redis server from the 'client' container:
    1) Use redis-cli to connect to redis:
    $ redis-cli -h redis -n DB_INDEX
    NOTE: DB_INDEX is a non-negative number representing the db to work on:
    - 0: jobs that retrieve data (pullFromVOSpace) 
    - 1: jobs that store data (push)
    - 2: scheduling queues (not yet implemented)
    2) You can now perform a query based on the job ID, for example show the job object info stored on db = 1:
    get JOB_ID (if we consider the last example: "get  c63697eafbf711eaa44d0242ac1c0008")
    3) This will return all the information regarding the job
            
You can access the file catalog from the 'client' container:
    1) Access the db via psql client (password: postgres):
    $ psql -h file_catalog -U postgres -d vospace_testdb
    2) You can now perform a query, for example show all the tuples of the Node table:
+21 −0
Original line number Diff line number Diff line
@@ -2,19 +2,20 @@
FROM python:3

# Install psql client to be able to connect manually to the file_catalog container
RUN apt-get -y update && apt-get -y install postgresql-client 
# Install also redis-tools to be able to access the redis container via redis-cli
RUN apt-get -y update && apt-get -y install postgresql-client && apt-get -y install redis-tools 

# Create a new user called test_client, create the home directory and set the default shell
RUN useradd -m -s /bin/bash test_client
# Create a new user called 'client', create the home directory and set the default shell
RUN useradd -m -s /bin/bash client

# Run commands as test_client user
USER test_client
# Run commands as 'client' user
USER client

# Set up a workdir for the container
WORKDIR /home/test_client/
WORKDIR /home/client/

# Copy the source code of the test client app
COPY test_client.py ./
# Copy the source code of all client apps
COPY *.py ./

# Install python dependencies
RUN pip install --no-cache-dir pika
+111 −0
Original line number Diff line number Diff line
import pika
import uuid
import json
import sys


class AMQPClient(object):

    def __init__(self):
        self.rpcQueue = "store_job_queue"
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host = "rabbitmq"))
        self.channel = self.connection.channel()
        result = self.channel.queue_declare(queue = '', exclusive = True)
        self.callback_queue = result.method.queue
        self.channel.basic_consume(queue = self.callback_queue, on_message_callback = self.on_response, auto_ack = True)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = json.loads(body)

    def call(self, msg):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange = '',
                                   routing_key = self.rpcQueue,
                                   properties = pika.BasicProperties(reply_to = self.callback_queue,
                                                                     correlation_id = self.corr_id,
                                                                    ),
                                   body = json.dumps(msg))

        while self.response is None:
            self.connection.process_data_events()
        return self.response


    def help(self):
        sys.exit("""
NAME
       dataArchiverCli.py

SYNOPSYS
       python3.x dataArchiverCli.py COMMAND USERNAME

DESCRIPTION
       The purpose of this client application is to notify to the VOSpace backend that
       data is ready to be saved somewhere.
       
       The client accepts only one command at a time. This command is mandatory.
       A list of supported commands is shown here below:

       cstore
              performs a 'cold storage' request, data will be saved on the tape library

       hstore
              performs a 'hot storage' request, data will be saved on a standard server

       The client also needs to know the username associated to a storage request process.
       The username must be the same used for accessing the transfer node.
    """)

    def store(self, cmd, username):
        request_type = cmd.upper()
        storeRequest = { "requestType": request_type, "userName": username }
        print(f"\nSending {request_type} request...")
        storeResponse = self.call(storeRequest)
        if "responseType" not in storeResponse:
            sys.exit("FATAL: Malformed response, store acknowledge expected.\n")
        elif storeResponse["responseType"] == "STORE_ACK":
            print("\nWARNING!!! WARNING!!! WARNING!!! WARNING!!! WARNING!!!")
            print("If you confirm, all your data on the transfer node will be")
            print("available in read-only mode for all the time the archiving")
            print("process is running.")
            print("WARNING!!! WARNING!!! WARNING!!! WARNING!!! WARNING!!!\n")
            confirm = None
            while not confirm in [ "yes", "no"]:
                confirm = input("Are you sure to proceed? [yes/no]: ")
            if confirm == "yes":
                confirmRequest = { "requestType": "STORE_CON", "userName": username }
                confirmResponse = self.call(confirmRequest)
                if "responseType" not in confirmResponse:
                    sys.exit("\nFATAL: Malformed response, store confirmation expected.\n")
                elif confirmResponse["responseType"] == "STORE_RUN":
                    jobID = confirmResponse["jobID"]
                    print(f"\nJobID = {jobID}")
                    print("Store process started successfully!\n")
                else:
                    sys.exit("FATAL: Unknown response type.\n")
            else:
                sys.exit("\nStore process aborted gracefully.\n")
        elif storeResponse["responseType"] == "ERROR":
            errorCode = storeResponse["errorCode"]
            errorMsg = storeResponse["errorMsg"]
            sys.exit(f"Error code: {errorCode}, Error message: {errorMsg}\n")
        else:
            sys.exit("\nFATAL: Unknown response type.\n")


# Create new AMQPClient object
dataArchiverCli = AMQPClient()

# Check the number of input args
if len(sys.argv) == 3:
    script, cmd, username = sys.argv
else:
    dataArchiverCli.help()

# Check the command passed by the user
if cmd == "cstore" or cmd == "hstore":
    dataArchiverCli.store(cmd, username)
else:
    dataArchiverCli.help()
 No newline at end of file
+2 −2

File changed and moved.

Contains only whitespace changes.

+3 −3
Original line number Diff line number Diff line
@@ -54,9 +54,9 @@ services:
    stdin_open: true
    tty: true
    command: ["./wait-for-it.sh", "rabbitmq:5672", "--", "python3", "transfer_service.py"]
  test_client:
    build: ./test_client
    container_name: test_client
  client:
    build: ./client
    container_name: client
    networks:
    - backend_net
    stdin_open: true
Loading