Loading docker/transfer_service/amqp_server.py +4 −0 Original line number Diff line number Diff line Loading @@ -31,6 +31,10 @@ class AMQPServer(threading.Thread): ch.basic_ack(delivery_tag = method.delivery_tag) def execute_callback(self, requestBody): """ This method must be implemented by inherited classes """ pass def run(self): Loading docker/transfer_service/get_job_amqp_server.py +6 −1 Original line number Diff line number Diff line Loading @@ -8,6 +8,11 @@ class GetJobAMQPServer(AMQPServer): super(GetJobAMQPServer, self).__init__(host, queue) def execute_callback(self, requestBody): if "jobID" in requestBody: redis_res = self.jobCache.get(requestBody["jobID"]) print(f"Redis response: {redis_res}") return redis_res else: return 42 def run(self): Loading docker/transfer_service/job.py +3 −0 Original line number Diff line number Diff line Loading @@ -15,6 +15,9 @@ class Job(object): self.results = None self.jobInfo = None def setID(self, jobID): self.jobID = jobID def setPhase(self, phase): self.phase = phase Loading docker/transfer_service/job_cache.py +5 −3 Original line number Diff line number Diff line Loading @@ -20,9 +20,11 @@ class JobCache(object): "destruction": jobObj.destruction, "parameters": jobObj.parameters, "jobInfo": jobObj.jobInfo } self.redisCli.set(jobObj.jobID, json.dumps(data)) def get(self, jobID): if self.redisCli.exists(jobID): jobObj = self.redisCli.get(jobID).decode("utf-8") return json.loads(jobObj) else: return json.loads('{ "error": "JOB_NOT_FOUND" }') No newline at end of file docker/transfer_service/start_job_amqp_server.py +20 −7 Original line number Diff line number Diff line import threading # only for testing purposes import time # only for testing purposes from amqp_server import AMQPServer from job import Job Loading @@ -8,14 +11,24 @@ class StartJobAMQPServer(AMQPServer): super(StartJobAMQPServer, self).__init__(host, queue) def execute_callback(self, requestBody): job = Job() job.setInfo(requestBody) job.setPhase("RUN") self.jobCache.set(job) redis_res = self.jobCache.get(job.jobID) self.job = Job() self.job.setInfo(requestBody) self.job.setPhase("PENDING") self.jobCache.set(self.job) redis_res = self.jobCache.get(self.job.jobID) print(f"Redis response: {redis_res}") t = threading.Thread(target = self.fake_job) # only for testing purposes t.start() # only for testing purposes return redis_res def run(self): print(f"Starting AMQP server of type {self.type}...") super(StartJobAMQPServer, self).run() # only for testing purposes def fake_job(self): time.sleep(10) print("fake_job: changing job state...") self.job.setPhase("EXECUTING") self.jobCache.set(self.job) print("fake_job: state changed!") No newline at end of file Loading
docker/transfer_service/amqp_server.py +4 −0 Original line number Diff line number Diff line Loading @@ -31,6 +31,10 @@ class AMQPServer(threading.Thread): ch.basic_ack(delivery_tag = method.delivery_tag) def execute_callback(self, requestBody): """ This method must be implemented by inherited classes """ pass def run(self): Loading
docker/transfer_service/get_job_amqp_server.py +6 −1 Original line number Diff line number Diff line Loading @@ -8,6 +8,11 @@ class GetJobAMQPServer(AMQPServer): super(GetJobAMQPServer, self).__init__(host, queue) def execute_callback(self, requestBody): if "jobID" in requestBody: redis_res = self.jobCache.get(requestBody["jobID"]) print(f"Redis response: {redis_res}") return redis_res else: return 42 def run(self): Loading
docker/transfer_service/job.py +3 −0 Original line number Diff line number Diff line Loading @@ -15,6 +15,9 @@ class Job(object): self.results = None self.jobInfo = None def setID(self, jobID): self.jobID = jobID def setPhase(self, phase): self.phase = phase Loading
docker/transfer_service/job_cache.py +5 −3 Original line number Diff line number Diff line Loading @@ -20,9 +20,11 @@ class JobCache(object): "destruction": jobObj.destruction, "parameters": jobObj.parameters, "jobInfo": jobObj.jobInfo } self.redisCli.set(jobObj.jobID, json.dumps(data)) def get(self, jobID): if self.redisCli.exists(jobID): jobObj = self.redisCli.get(jobID).decode("utf-8") return json.loads(jobObj) else: return json.loads('{ "error": "JOB_NOT_FOUND" }') No newline at end of file
docker/transfer_service/start_job_amqp_server.py +20 −7 Original line number Diff line number Diff line import threading # only for testing purposes import time # only for testing purposes from amqp_server import AMQPServer from job import Job Loading @@ -8,14 +11,24 @@ class StartJobAMQPServer(AMQPServer): super(StartJobAMQPServer, self).__init__(host, queue) def execute_callback(self, requestBody): job = Job() job.setInfo(requestBody) job.setPhase("RUN") self.jobCache.set(job) redis_res = self.jobCache.get(job.jobID) self.job = Job() self.job.setInfo(requestBody) self.job.setPhase("PENDING") self.jobCache.set(self.job) redis_res = self.jobCache.get(self.job.jobID) print(f"Redis response: {redis_res}") t = threading.Thread(target = self.fake_job) # only for testing purposes t.start() # only for testing purposes return redis_res def run(self): print(f"Starting AMQP server of type {self.type}...") super(StartJobAMQPServer, self).run() # only for testing purposes def fake_job(self): time.sleep(10) print("fake_job: changing job state...") self.job.setPhase("EXECUTING") self.jobCache.set(self.job) print("fake_job: state changed!") No newline at end of file