Loading docker/transfer_service/amqp_server.py +4 −27 Original line number Diff line number Diff line import pika import threading import json import sys from job import Job from job_cache import JobCache Loading @@ -9,9 +8,8 @@ from job_cache import JobCache class AMQPServer(threading.Thread): def __init__(self, type, host, queue): def __init__(self, host, queue): threading.Thread.__init__(self) self.type = type self.host = host self.queue = queue self.jobCache = JobCache('redis') Loading @@ -24,15 +22,7 @@ class AMQPServer(threading.Thread): def on_request(self, ch, method, props, body): requestBody = json.loads(body) print(f"Request body: {json.dumps(requestBody)}") if self.type == 'start': response = self.start_callback(requestBody) elif self.type == 'poll': response = self.poll_callback(requestBody) elif self.type == 'abort': response = self.abort_callback(requestBody) else: sys.exit("Unknown type.") response = self.execute_callback(requestBody) ch.basic_publish(exchange = '', routing_key = props.reply_to, Loading @@ -40,21 +30,8 @@ class AMQPServer(threading.Thread): body = json.dumps(response)) ch.basic_ack(delivery_tag = method.delivery_tag) def start_callback(self, requestBody): job = Job() job.setInfo(requestBody) job.setPhase("RUN") self.jobCache.set(job) redis_res = self.jobCache.get(job.jobID) print(f"Redis response: {redis_res}") return redis_res def poll_callback(self): return 42 def abort_callback(self): return 42 def execute_callback(self, requestBody): pass def run(self): print(f"Starting AMQP server of type {self.type}...") self.channel.start_consuming() docker/transfer_service/job_handler.py +12 −2 Original line number Diff line number Diff line import pika from amqp_server import AMQPServer import sys from start_job_amqp_server import StartJobAMQPServer from get_job_amqp_server import GetJobAMQPServer from abort_job_amqp_server import AbortJobAMQPServer class JobHandler(object): Loading @@ -9,7 +12,14 @@ class JobHandler(object): self.amqpServerList = [] def addAMQPServer(self, srvType, rpcQueue): self.amqpServerList.append(AMQPServer(srvType, self.host, rpcQueue)) if srvType == 'start': self.amqpServerList.append(StartJobAMQPServer(self.host, rpcQueue)) elif srvType == 'poll': self.amqpServerList.append(GetJobAMQPServer(self.host, rpcQueue)) elif srvType == 'abort': self.amqpServerList.append(AbortJobAMQPServer(self.host, rpcQueue)) else: sys.exit(f"FATAL:: unknown server type {srvType}.") def run(self): for srv in self.amqpServerList: Loading docker/transfer_service/transfer_service.py +9 −9 Original line number Diff line number Diff line import time import os from XRootD import client #from XRootD import client from job_handler import JobHandler Loading @@ -20,11 +20,11 @@ ts = TransferService() ts.run() print("Transfer service is RUNNING...") time.sleep(10) print("Starting XRootD transfer test...") process = client.CopyProcess() process.add_job( 'root://tape_frontend//home/centos/data/aaa/1.txt', '/home/transfer_service' ) process.prepare() process.run() time.sleep(3) os.system('du -a /home/transfer_service') No newline at end of file #time.sleep(10) #print("Starting XRootD transfer test...") #process = client.CopyProcess() #process.add_job( 'root://tape_frontend//home/centos/data/aaa/1.txt', '/home/transfer_service' ) #process.prepare() #process.run() #time.sleep(3) #os.system('du -a /home/transfer_service') No newline at end of file Loading
docker/transfer_service/amqp_server.py +4 −27 Original line number Diff line number Diff line import pika import threading import json import sys from job import Job from job_cache import JobCache Loading @@ -9,9 +8,8 @@ from job_cache import JobCache class AMQPServer(threading.Thread): def __init__(self, type, host, queue): def __init__(self, host, queue): threading.Thread.__init__(self) self.type = type self.host = host self.queue = queue self.jobCache = JobCache('redis') Loading @@ -24,15 +22,7 @@ class AMQPServer(threading.Thread): def on_request(self, ch, method, props, body): requestBody = json.loads(body) print(f"Request body: {json.dumps(requestBody)}") if self.type == 'start': response = self.start_callback(requestBody) elif self.type == 'poll': response = self.poll_callback(requestBody) elif self.type == 'abort': response = self.abort_callback(requestBody) else: sys.exit("Unknown type.") response = self.execute_callback(requestBody) ch.basic_publish(exchange = '', routing_key = props.reply_to, Loading @@ -40,21 +30,8 @@ class AMQPServer(threading.Thread): body = json.dumps(response)) ch.basic_ack(delivery_tag = method.delivery_tag) def start_callback(self, requestBody): job = Job() job.setInfo(requestBody) job.setPhase("RUN") self.jobCache.set(job) redis_res = self.jobCache.get(job.jobID) print(f"Redis response: {redis_res}") return redis_res def poll_callback(self): return 42 def abort_callback(self): return 42 def execute_callback(self, requestBody): pass def run(self): print(f"Starting AMQP server of type {self.type}...") self.channel.start_consuming()
docker/transfer_service/job_handler.py +12 −2 Original line number Diff line number Diff line import pika from amqp_server import AMQPServer import sys from start_job_amqp_server import StartJobAMQPServer from get_job_amqp_server import GetJobAMQPServer from abort_job_amqp_server import AbortJobAMQPServer class JobHandler(object): Loading @@ -9,7 +12,14 @@ class JobHandler(object): self.amqpServerList = [] def addAMQPServer(self, srvType, rpcQueue): self.amqpServerList.append(AMQPServer(srvType, self.host, rpcQueue)) if srvType == 'start': self.amqpServerList.append(StartJobAMQPServer(self.host, rpcQueue)) elif srvType == 'poll': self.amqpServerList.append(GetJobAMQPServer(self.host, rpcQueue)) elif srvType == 'abort': self.amqpServerList.append(AbortJobAMQPServer(self.host, rpcQueue)) else: sys.exit(f"FATAL:: unknown server type {srvType}.") def run(self): for srv in self.amqpServerList: Loading
docker/transfer_service/transfer_service.py +9 −9 Original line number Diff line number Diff line import time import os from XRootD import client #from XRootD import client from job_handler import JobHandler Loading @@ -20,11 +20,11 @@ ts = TransferService() ts.run() print("Transfer service is RUNNING...") time.sleep(10) print("Starting XRootD transfer test...") process = client.CopyProcess() process.add_job( 'root://tape_frontend//home/centos/data/aaa/1.txt', '/home/transfer_service' ) process.prepare() process.run() time.sleep(3) os.system('du -a /home/transfer_service') No newline at end of file #time.sleep(10) #print("Starting XRootD transfer test...") #process = client.CopyProcess() #process.add_job( 'root://tape_frontend//home/centos/data/aaa/1.txt', '/home/transfer_service' ) #process.prepare() #process.run() #time.sleep(3) #os.system('du -a /home/transfer_service') No newline at end of file