Commit 01fd0e58 authored by Cristiano Urban's avatar Cristiano Urban
Browse files

Merge branch 'testing'

parents cd6311c1 b4b842a3
Loading
Loading
Loading
Loading
+2 −1
Original line number Diff line number Diff line
@@ -44,7 +44,8 @@ transferRequest = { "transfer":
                     }
                  }

print("Sending transfer request...")
print("Sending transfer request:")
print(json.dumps(transferRequest, indent = 3))
response = testClient.call(transferRequest)
print("Response:")
print(json.dumps(response, indent = 3))
+0 −27
Original line number Diff line number Diff line
FROM python:3

# Create a new user called transfer_service, create the home directory and set the default shell
RUN useradd -m -s /bin/bash transfer_service

# Set up a workdir for the container
WORKDIR /home/transfer_service/

# Copy the source code of the server app
COPY wait-for-it.sh *.py ./
RUN chown transfer_service wait-for-it.sh *.py && \ 
    chmod 755 wait-for-it.sh *.py        
    
# Download, build and install xrootd with python bindings
RUN apt-get update && apt-get install -y apt-utils build-essential sudo git cmake python3-dev python3-setuptools

# Run commands as transfer_service user
USER transfer_service 

RUN git clone https://github.com/xrootd/xrootd.git && \
    cd xrootd && mkdir build && cd build && \
    cmake .. -DCMAKE_INSTALL_PREFIX=/opt/xrootd \
             -DENABLE_PERL=FALSE && \
    make && sudo make install   
    
# Install python dependencies
RUN pip3 install --no-cache-dir pika redis
 No newline at end of file
+15 −0
Original line number Diff line number Diff line
from amqp_server import AMQPServer


class AbortJobAMQPServer(AMQPServer):
  
    def __init__(self, host, queue):
        self.type = "abort"
        super(AbortJobAMQPServer, self).__init__(host, queue)      

    def execute_callback(self, requestBody):
        return 42
      
    def run(self):
        print(f"Starting AMQP server of type {self.type}...")
        super(AbortJobAMQPServer, self).run() 
+4 −27
Original line number Diff line number Diff line
import pika
import threading
import json
import sys

from job import Job
from job_cache import JobCache
@@ -9,9 +8,8 @@ from job_cache import JobCache

class AMQPServer(threading.Thread):

    def __init__(self, type, host, queue):
    def __init__(self, host, queue):
        threading.Thread.__init__(self)
        self.type = type
        self.host = host
        self.queue = queue
        self.jobCache = JobCache('redis')
@@ -24,15 +22,7 @@ class AMQPServer(threading.Thread):
    def on_request(self, ch, method, props, body):
        requestBody = json.loads(body)
        print(f"Request body: {json.dumps(requestBody)}")

        if self.type == 'start':
            response = self.start_callback(requestBody)
        elif self.type == 'poll':
            response = self.poll_callback(requestBody)
        elif self.type == 'abort':
            response = self.abort_callback(requestBody)
        else:
            sys.exit("Unknown type.")
        response = self.execute_callback(requestBody)

        ch.basic_publish(exchange = '',
                         routing_key = props.reply_to,
@@ -40,21 +30,8 @@ class AMQPServer(threading.Thread):
                                                           body = json.dumps(response))
        ch.basic_ack(delivery_tag = method.delivery_tag)

    def start_callback(self, requestBody):
        job = Job()
        job.setInfo(requestBody)
        job.setPhase("RUN")
        self.jobCache.set(job)
        redis_res = self.jobCache.get(job.jobID)
        print(f"Redis response: {redis_res}")
        return redis_res

    def poll_callback(self):
        return 42

    def abort_callback(self):
        return 42
    def execute_callback(self, requestBody):
        pass

    def run(self):
        print(f"Starting AMQP server of type {self.type}...")
        self.channel.start_consuming()
+15 −0
Original line number Diff line number Diff line
from amqp_server import AMQPServer


class GetJobAMQPServer(AMQPServer):
  
    def __init__(self, host, queue):
        self.type = "poll"
        super(GetJobAMQPServer, self).__init__(host, queue)      

    def execute_callback(self, requestBody):
        return 42
      
    def run(self):
        print(f"Starting AMQP server of type {self.type}...")
        super(GetJobAMQPServer, self).run() 
Loading