Commit 1a8959ba authored by Cristiano Urban's avatar Cristiano Urban
Browse files

Removed 'pika' from dependencies.

parent d5ff3932
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -16,4 +16,4 @@ RUN yum -y install gcc openssl-devel bzip2-devel libffi-devel zlib-devel && \
    make altinstall && cd .. && rm Python-3.9.5.tgz
    
# Install pika, redis, psycopg2, paramiko, scp, tabulate Python packages
RUN pip3.9 install --no-cache-dir pika redis psycopg2-binary paramiko scp tabulate
RUN pip3.9 install --no-cache-dir redis psycopg2-binary paramiko scp tabulate

client/amqp_client.py

deleted100644 → 0
+0 −36
Original line number Diff line number Diff line
#!/usr/bin/env python

import pika
import uuid
import json


class AMQPClient(object):
  
    def __init__(self, host, port, queue):
        self.host = host
        self.port = port
        self.rpcQueue = queue
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host = self.host, port = self.port))
        self.channel = self.connection.channel()
        result = self.channel.queue_declare(queue = '', exclusive = True)
        self.callback_queue = result.method.queue
        self.channel.basic_consume(queue = self.callback_queue, on_message_callback = self.on_response, auto_ack = True)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = json.loads(body)

    def call(self, msg):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange = '',
                                   routing_key = self.rpcQueue,
                                   properties = pika.BasicProperties(reply_to = self.callback_queue,
                                                                     correlation_id = self.corr_id,
                                                                    ),
                                   body = json.dumps(msg))

        while self.response is None:
            self.connection.process_data_events()
        return self.response 

transfer_service/amqp_server.py

deleted100644 → 0
+0 −40
Original line number Diff line number Diff line
#!/usr/bin/env python

import pika
import threading
import json


class AMQPServer(threading.Thread):

    def __init__(self, host, port, queue):
        threading.Thread.__init__(self)
        self.host = host
        self.port = port
        self.queue = queue
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host = self.host, port = self.port))
        self.channel = self.connection.channel();
        self.channel.queue_declare(queue = self.queue)
        self.channel.basic_qos(prefetch_count = 16)
        threading.Thread(target = self.channel.basic_consume(queue = self.queue, on_message_callback = self.on_request))

    def on_request(self, ch, method, props, body):
        requestBody = json.loads(body)
        print(f"Request body: {json.dumps(requestBody)}")
        response = self.execute_callback(requestBody)

        ch.basic_publish(exchange = '',
                         routing_key = props.reply_to,
                         properties = pika.BasicProperties(correlation_id = props.correlation_id),
                                                           body = json.dumps(response))
        ch.basic_ack(delivery_tag = method.delivery_tag)

    def execute_callback(self, requestBody):
        """
        This method must be implemented by
        inherited classes
        """
        pass

    def run(self):
        self.channel.start_consuming()
+0 −25
Original line number Diff line number Diff line
#!/usr/bin/env python

import json

from job_queue import JobQueue


class ImportJobQueue(JobQueue):
    
    def __init__(self, queueName):
        super(ImportJobQueue, self).__init__(queueName)
        
    def getJob(self):
        """Gets a copy of the first job without moving it out from the current queue."""
        job = json.loads(self.redisCli.lrange(self.queueName, self.len() - 1, self.len() - 1)[0].decode("utf-8"))
        return job

    def insertJob(self, job):
        """Pushes a new job into the queue."""
        self.redisCli.lpush(self.queueName, json.dumps(job))

    def extractJob(self):
        """Moves out a job from the end of the current queue."""
        job = json.loads(self.redisCli.brpop(self.queueName)[1].decode("utf-8"))
        return job
+0 −15
Original line number Diff line number Diff line
#!/usr/bin/env python

from import_job_queue import ImportJobQueue
from task_executor import TaskExecutor

class ImportTaskExecutor(TaskExecutor):
    
    def __init__(self):
        super(ImportTaskExecutor, self).__init__()

    def setSourceQueueName(self, srcQueueName):
        self.srcQueue = ImportJobQueue(srcQueueName)

    def setDestinationQueueName(self, destQueueName):
        self.destQueue = ImportJobQueue(destQueueName)