Commit 0729557d authored by Cristiano Urban's avatar Cristiano Urban
Browse files

Added cli config file + added external class AMQPClient + modified Dockerfile.

parent 0b8c848a
Loading
Loading
Loading
Loading
+14 −4
Original line number Diff line number Diff line
@@ -9,10 +9,20 @@ RUN apt-get -y update && apt-get -y install postgresql-client && apt-get -y inst
RUN useradd -m -s /bin/bash client

# Copy the source code of all client apps
COPY vos_data vos_import vos_storage /usr/bin/
RUN chmod 755 /usr/bin/vos_data && \
    chmod 755 /usr/bin/vos_import && \
    chmod 755 /usr/bin/vos_storage
RUN mkdir -p /usr/bin/vos_cli
COPY *.py \
     vos_data \
     vos_import \
     vos_storage /usr/bin/vos_cli/
RUN chmod -R 755 /usr/bin/vos_cli

# Copy configuration file
RUN mkdir -p /etc/vos_cli
COPY vos_cli.conf /etc/vos_cli/
RUN chmod -R 755 /etc/vos_cli

# Set the PAH environment variable
ENV PATH "$PATH:/usr/bin/vos_cli"    
    
# Run commands as 'client' user
USER client

client/amqp_client.py

0 → 100644
+36 −0
Original line number Diff line number Diff line
#!/usr/bin/env python

import pika
import uuid
import json


class AMQPClient(object):
  
    def __init__(self, host, port, queue):
        self.host = host
        self.port = port
        self.rpcQueue = queue
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host = self.host, port = self.port))
        self.channel = self.connection.channel()
        result = self.channel.queue_declare(queue = '', exclusive = True)
        self.callback_queue = result.method.queue
        self.channel.basic_consume(queue = self.callback_queue, on_message_callback = self.on_response, auto_ack = True)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = json.loads(body)

    def call(self, msg):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange = '',
                                   routing_key = self.rpcQueue,
                                   properties = pika.BasicProperties(reply_to = self.callback_queue,
                                                                     correlation_id = self.corr_id,
                                                                    ),
                                   body = json.dumps(msg))

        while self.response is None:
            self.connection.process_data_events()
        return self.response 

client/config.py

0 → 100644
+19 −0
Original line number Diff line number Diff line
from configparser import ConfigParser, ExtendedInterpolation


class Config(object):

    def __init__(self, fileName):
        self.fileName = fileName
        self.config = ConfigParser(interpolation = ExtendedInterpolation())
        self.config.read(fileName)        

    def loadSection(self, sectionName):
        section = self.config[sectionName]
        return section

# Test
#c = Config("vos_ts.conf")
#params = c.loadSection("file_grouper")
#print(params.getint("min_num_files"))
#print(eval(params["max_dir_size"]))

client/vos_cli.conf

0 → 100644
+20 −0
Original line number Diff line number Diff line
[server]
; hostname or IP address of the machine that hosts the RabbitMQ message broker
host = rabbitmq
; port at which the broker is available, default is 5672 TCP
port = 5672

[vos_data]
rpc_queue = store_job_queue

[vos_import]
rpc_queue = import_queue

[vos_job]
rpc_queue = job_queue

[vos_storage]
rpc_queue = storage_queue

[vos_user]
rpc_queue = user_queue
 No newline at end of file
+15 −31
Original line number Diff line number Diff line
#!/usr/bin/env python

import pika
import uuid
import json
#import pika
#import uuid
#import json
import sys

from amqp_client import AMQPClient
from config import Config
from tabulate import tabulate


class AMQPClient(object):
class VOSData(AMQPClient):

    def __init__(self):
        self.rpcQueue = "store_job_queue"
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host = "rabbitmq"))
        self.channel = self.connection.channel()
        result = self.channel.queue_declare(queue = '', exclusive = True)
        self.callback_queue = result.method.queue
        self.channel.basic_consume(queue = self.callback_queue, on_message_callback = self.on_response, auto_ack = True)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = json.loads(body)

    def call(self, msg):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange = '',
                                   routing_key = self.rpcQueue,
                                   properties = pika.BasicProperties(reply_to = self.callback_queue,
                                                                     correlation_id = self.corr_id,
                                                                    ),
                                   body = json.dumps(msg))

        while self.response is None:
            self.connection.process_data_events()
        return self.response

        config = Config("/etc/vos_cli/vos_cli.conf")
        self.params = config.loadSection("server")
        self.host = self.params["host"]
        self.port = self.params.getint("port")
        self.params = config.loadSection("vos_data")
        self.rpcQueue = self.params["rpc_queue"]
        super(VOSData, self).__init__(self.host, self.port, self.rpcQueue)

    def help(self):
        sys.exit("""
@@ -131,8 +115,8 @@ DESCRIPTION
            sys.exit("\nFATAL: Unknown response type.\n")


# Create new AMQPClient object
vosDataCli = AMQPClient()
# Create new VOSData object
vosDataCli = VOSData()

# Check the number of input args
if len(sys.argv) == 3:
Loading