Loading client/vos_cli.conf +1 −1 Original line number Diff line number Diff line Loading @@ -5,7 +5,7 @@ host = rabbitmq port = 5672 [vos_data] rpc_queue = store_job_queue rpc_queue = data_queue [vos_import] rpc_queue = import_queue Loading transfer_service/cli_handler.py 0 → 100644 +32 −0 Original line number Diff line number Diff line #!/usr/bin/env python import sys from data_amqp_server import DataAMQPServer from import_amqp_server import ImportAMQPServer from job_amqp_server import JobAMQPServer from storage_amqp_server import StorageAMQPServer class CliHandler(object): def __init__(self, host, port): self.host = host self.port = port self.amqpServerList = [] def addAMQPServer(self, srvType, rpcQueue): if srvType == 'data': self.amqpServerList.append(DataAMQPServer(self.host, self.port, rpcQueue)) elif srvType == 'import': self.amqpServerList.append(ImportAMQPServer(self.host, self.port, rpcQueue)) elif srvType == 'storage': self.amqpServerList.append(StorageAMQPServer(self.host, self.port, rpcQueue)) elif srvType == 'job': self.amqpServerList.append(JobAMQPServer(self.host, self.port, rpcQueue)) else: sys.exit(f"FATAL: unknown server type {srvType}.") def start(self): for srv in self.amqpServerList: srv.start() transfer_service/store_amqp_server.py→transfer_service/data_amqp_server.py +4 −4 Original line number Diff line number Diff line Loading @@ -18,10 +18,10 @@ from job_queue import JobQueue from system_utils import SystemUtils class StoreAMQPServer(AMQPServer): class DataAMQPServer(AMQPServer): def __init__(self, host, port, queue): self.type = "store" self.type = "data" self.storeAck = False config = Config("vos_ts.conf") self.params = config.loadSection("file_catalog") Loading @@ -36,7 +36,7 @@ class StoreAMQPServer(AMQPServer): self.storageStorePath = self.params["store_path"] self.pendingQueueWrite = JobQueue("write_pending") self.systemUtils = SystemUtils() super(StoreAMQPServer, self).__init__(host, port, queue) super(DataAMQPServer, self).__init__(host, port, queue) def execute_callback(self, requestBody): # 'requestType' and 'userName' attributes are mandatory Loading Loading @@ -125,4 +125,4 @@ class StoreAMQPServer(AMQPServer): def run(self): print(f"Starting AMQP server of type {self.type}...") super(StoreAMQPServer, self).run() super(DataAMQPServer, self).run() transfer_service/job_cache.pydeleted 100644 → 0 +0 −37 Original line number Diff line number Diff line import redis import json from job import Job class JobCache(object): def __init__(self, host, port, db): self.host = host self.port = port self.db = db self.redisCli = redis.StrictRedis(host = self.host, port = self.port, db = self.db) def set(self, jobObj): data = { "jobId": jobObj.jobId, "jobType": jobObj.type, "ownerId": jobObj.ownerId, "phase": jobObj.phase, "quote": jobObj.quote, "startTime": jobObj.startTime, "endTime": jobObj.endTime, "executionDuration": jobObj.executionDuration, "destruction": jobObj.destruction, "parameters": jobObj.parameters, "results": jobObj.results, "jobInfo": jobObj.jobInfo } self.redisCli.set(jobObj.jobId, json.dumps(data)) def get(self, jobId): if self.redisCli.exists(jobId): jobObj = self.redisCli.get(jobId).decode("utf-8") return json.loads(jobObj) else: return json.loads('{ "error": "JOB_NOT_FOUND" }') No newline at end of file transfer_service/transfer_service.py +18 −15 Original line number Diff line number Diff line #!/usr/bin/env python import time import os from config import Config from job_handler import JobHandler from cli_handler import CliHandler from job_scheduler import JobScheduler from vospace_rest_handler import VOSpaceRestHandler class TransferService(object): Loading @@ -13,30 +11,35 @@ class TransferService(object): def __init__(self): config = Config("vos_ts.conf") self.params = config.loadSection("amqp") self.jobHandler = JobHandler(self.params["host"], self.params.getint("port")) self.cliHandler = CliHandler(self.params["host"], self.params.getint("port")) self.vosRestHandler = VOSpaceRestHandler(self.params["host"], self.params.getint("port")) self.jobScheduler = JobScheduler() # PullToVOSpace (via REST API) self.jobHandler.addAMQPServer('start', 'start_job_queue') self.jobHandler.addAMQPServer('poll', 'poll_job_queue') self.jobHandler.addAMQPServer('abort', 'abort_job_queue') self.vosRestHandler.addAMQPServer('start', 'start_job_queue') self.vosRestHandler.addAMQPServer('poll', 'poll_job_queue') self.vosRestHandler.addAMQPServer('abort', 'abort_job_queue') # PushToVOSpace (via vos_data, the 'unofficial' command line client) self.jobHandler.addAMQPServer('store', 'store_job_queue') self.cliHandler.addAMQPServer('data', 'data_queue') # Import self.jobHandler.addAMQPServer('import', 'import_queue') self.cliHandler.addAMQPServer('import', 'import_queue') # Job self.jobHandler.addAMQPServer('job', 'job_queue') self.cliHandler.addAMQPServer('job', 'job_queue') # Storage self.jobHandler.addAMQPServer('storage', 'storage_queue') self.cliHandler.addAMQPServer('storage', 'storage_queue') def start(self): # Startup self.jobScheduler.start() self.jobHandler.start() self.vosRestHandler.start() self.cliHandler.start() def start(self): print("\nTransfer service is RUNNING...\n") ts = TransferService() ts.start() print("Transfer service is RUNNING...") Loading
client/vos_cli.conf +1 −1 Original line number Diff line number Diff line Loading @@ -5,7 +5,7 @@ host = rabbitmq port = 5672 [vos_data] rpc_queue = store_job_queue rpc_queue = data_queue [vos_import] rpc_queue = import_queue Loading
transfer_service/cli_handler.py 0 → 100644 +32 −0 Original line number Diff line number Diff line #!/usr/bin/env python import sys from data_amqp_server import DataAMQPServer from import_amqp_server import ImportAMQPServer from job_amqp_server import JobAMQPServer from storage_amqp_server import StorageAMQPServer class CliHandler(object): def __init__(self, host, port): self.host = host self.port = port self.amqpServerList = [] def addAMQPServer(self, srvType, rpcQueue): if srvType == 'data': self.amqpServerList.append(DataAMQPServer(self.host, self.port, rpcQueue)) elif srvType == 'import': self.amqpServerList.append(ImportAMQPServer(self.host, self.port, rpcQueue)) elif srvType == 'storage': self.amqpServerList.append(StorageAMQPServer(self.host, self.port, rpcQueue)) elif srvType == 'job': self.amqpServerList.append(JobAMQPServer(self.host, self.port, rpcQueue)) else: sys.exit(f"FATAL: unknown server type {srvType}.") def start(self): for srv in self.amqpServerList: srv.start()
transfer_service/store_amqp_server.py→transfer_service/data_amqp_server.py +4 −4 Original line number Diff line number Diff line Loading @@ -18,10 +18,10 @@ from job_queue import JobQueue from system_utils import SystemUtils class StoreAMQPServer(AMQPServer): class DataAMQPServer(AMQPServer): def __init__(self, host, port, queue): self.type = "store" self.type = "data" self.storeAck = False config = Config("vos_ts.conf") self.params = config.loadSection("file_catalog") Loading @@ -36,7 +36,7 @@ class StoreAMQPServer(AMQPServer): self.storageStorePath = self.params["store_path"] self.pendingQueueWrite = JobQueue("write_pending") self.systemUtils = SystemUtils() super(StoreAMQPServer, self).__init__(host, port, queue) super(DataAMQPServer, self).__init__(host, port, queue) def execute_callback(self, requestBody): # 'requestType' and 'userName' attributes are mandatory Loading Loading @@ -125,4 +125,4 @@ class StoreAMQPServer(AMQPServer): def run(self): print(f"Starting AMQP server of type {self.type}...") super(StoreAMQPServer, self).run() super(DataAMQPServer, self).run()
transfer_service/job_cache.pydeleted 100644 → 0 +0 −37 Original line number Diff line number Diff line import redis import json from job import Job class JobCache(object): def __init__(self, host, port, db): self.host = host self.port = port self.db = db self.redisCli = redis.StrictRedis(host = self.host, port = self.port, db = self.db) def set(self, jobObj): data = { "jobId": jobObj.jobId, "jobType": jobObj.type, "ownerId": jobObj.ownerId, "phase": jobObj.phase, "quote": jobObj.quote, "startTime": jobObj.startTime, "endTime": jobObj.endTime, "executionDuration": jobObj.executionDuration, "destruction": jobObj.destruction, "parameters": jobObj.parameters, "results": jobObj.results, "jobInfo": jobObj.jobInfo } self.redisCli.set(jobObj.jobId, json.dumps(data)) def get(self, jobId): if self.redisCli.exists(jobId): jobObj = self.redisCli.get(jobId).decode("utf-8") return json.loads(jobObj) else: return json.loads('{ "error": "JOB_NOT_FOUND" }') No newline at end of file
transfer_service/transfer_service.py +18 −15 Original line number Diff line number Diff line #!/usr/bin/env python import time import os from config import Config from job_handler import JobHandler from cli_handler import CliHandler from job_scheduler import JobScheduler from vospace_rest_handler import VOSpaceRestHandler class TransferService(object): Loading @@ -13,30 +11,35 @@ class TransferService(object): def __init__(self): config = Config("vos_ts.conf") self.params = config.loadSection("amqp") self.jobHandler = JobHandler(self.params["host"], self.params.getint("port")) self.cliHandler = CliHandler(self.params["host"], self.params.getint("port")) self.vosRestHandler = VOSpaceRestHandler(self.params["host"], self.params.getint("port")) self.jobScheduler = JobScheduler() # PullToVOSpace (via REST API) self.jobHandler.addAMQPServer('start', 'start_job_queue') self.jobHandler.addAMQPServer('poll', 'poll_job_queue') self.jobHandler.addAMQPServer('abort', 'abort_job_queue') self.vosRestHandler.addAMQPServer('start', 'start_job_queue') self.vosRestHandler.addAMQPServer('poll', 'poll_job_queue') self.vosRestHandler.addAMQPServer('abort', 'abort_job_queue') # PushToVOSpace (via vos_data, the 'unofficial' command line client) self.jobHandler.addAMQPServer('store', 'store_job_queue') self.cliHandler.addAMQPServer('data', 'data_queue') # Import self.jobHandler.addAMQPServer('import', 'import_queue') self.cliHandler.addAMQPServer('import', 'import_queue') # Job self.jobHandler.addAMQPServer('job', 'job_queue') self.cliHandler.addAMQPServer('job', 'job_queue') # Storage self.jobHandler.addAMQPServer('storage', 'storage_queue') self.cliHandler.addAMQPServer('storage', 'storage_queue') def start(self): # Startup self.jobScheduler.start() self.jobHandler.start() self.vosRestHandler.start() self.cliHandler.start() def start(self): print("\nTransfer service is RUNNING...\n") ts = TransferService() ts.start() print("Transfer service is RUNNING...")