Loading transfer_service/job_queue.py +96 −46 Original line number Diff line number Diff line Loading @@ -5,7 +5,11 @@ # import json import logging import redis import time from redis.exceptions import ConnectionError from config import Config from job import Job Loading @@ -21,10 +25,21 @@ class JobQueue(object): db = params["db_sched"]) self.queueName = queueName def len(self): def len(self, retry = 10, timeout = 30): """Returns the number of jobs in the current queue.""" if retry < 1: retry = 1 if timeout < 1: timeout = 1 while True: try: numJobs = self.redisCli.llen(self.queueName) except ConnectionError: if retry > 0: retry -= 1 time.sleep(timeout) else: raise except Exception: raise else: Loading @@ -34,10 +49,15 @@ class JobQueue(object): """Returns the name of the current queue.""" return self.queueName def getJob(self): def getJob(self, retry = 10, timeout = 30): """Gets a copy of the first job without moving it out from the current queue.""" if retry < 1: retry = 1 if timeout < 1: timeout = 1 while True: try: job = json.loads(self.redisCli.lrange(self.queueName, self.len() - 1, self.len() - 1)[0].decode("utf-8")) job = json.loads(self.redisCli.lrange(self.queueName, -1, -1)[0].decode("utf-8")) jobObj = Job() jobObj.setId(job["jobId"]) jobObj.setType(job["jobType"]) Loading @@ -48,13 +68,23 @@ class JobQueue(object): jobObj.setEndTime(job["endTime"]) jobObj.setExecutionDuration(job["executionDuration"]) jobObj.setInfo(job["jobInfo"]) except ConnectionError: if retry > 0: retry -= 1 time.sleep(timeout) else: raise except Exception: raise else: return jobObj def insertJob(self, jobObj): def insertJob(self, jobObj, retry = 10, timeout = 30): """Pushes a new job into the queue.""" if retry < 1: retry = 1 if timeout < 1: timeout = 1 data = { "jobId": jobObj.jobId, "jobType": jobObj.type, "ownerId": jobObj.ownerId, Loading @@ -67,15 +97,38 @@ class JobQueue(object): "parameters": jobObj.parameters, "results": jobObj.results, "jobInfo": jobObj.jobInfo } while True: try: self.redisCli.lpush(self.queueName, json.dumps(data)) except ConnectionError: if retry > 0: retry -= 1 time.sleep(timeout) else: raise except Exception: raise else: return def extractJob(self): def extractJob(self, retry = 10, timeout = 30): """Moves out a job from the end of the current queue.""" if retry < 1: retry = 1 if timeout < 1: timeout = 1 while True: try: job = json.loads(self.redisCli.brpop(self.queueName)[1].decode("utf-8")) except ConnectionError: if retry > 0: retry -= 1 time.sleep(timeout) else: raise except Exception: raise else: jobObj = Job() jobObj.setId(job["jobId"]) jobObj.setType(job["jobType"]) Loading @@ -86,9 +139,6 @@ class JobQueue(object): jobObj.setEndTime(job["endTime"]) jobObj.setExecutionDuration(job["executionDuration"]) jobObj.setInfo(job["jobInfo"]) except Exception: raise else: return jobObj def moveJobTo(self, nextQueueName): Loading transfer_service/log_listener.py +19 −14 Original line number Diff line number Diff line Loading @@ -5,6 +5,7 @@ import redis import time from multiprocessing import Process from redis.exceptions import ConnectionError from config import Config Loading @@ -28,12 +29,14 @@ class LogListener(Process): if os.path.exists(self.logFilePath): os.remove(self.logFilePath) while True: time.sleep(1) time.sleep(2) try: queueLen = self.redisCli.llen(self.logQueue) except ConnectionError: pass except Exception: queueLen = 0 raise else: while queueLen > 0: try: lfp = open(self.logFilePath, 'a') Loading @@ -42,6 +45,8 @@ class LogListener(Process): else: try: logRecord = self.redisCli.brpop(self.logQueue)[1].decode("utf-8") except ConnectionError: break except Exception: raise else: Loading transfer_service/redis_rpc_server.py +10 −3 Original line number Diff line number Diff line import json import redis import threading import time from redis.exceptions import ConnectionError class RedisRPCServer(threading.Thread): Loading @@ -14,14 +17,18 @@ class RedisRPCServer(threading.Thread): def run(self): while True: try: # block the connection if the there is nothing to pop channel, request = self.client.brpop(self.rpcQueue) except ConnectionError: time.sleep(2) except Exception: raise else: channel = channel.decode("utf-8") request = json.loads(request.decode("utf-8")) response = self.callback(request) self.client.rpush(request["req_id"], json.dumps(response)) self.client.expire(request["req_id"], 30) except Exception: raise def callback(self, request): """ Loading Loading
transfer_service/job_queue.py +96 −46 Original line number Diff line number Diff line Loading @@ -5,7 +5,11 @@ # import json import logging import redis import time from redis.exceptions import ConnectionError from config import Config from job import Job Loading @@ -21,10 +25,21 @@ class JobQueue(object): db = params["db_sched"]) self.queueName = queueName def len(self): def len(self, retry = 10, timeout = 30): """Returns the number of jobs in the current queue.""" if retry < 1: retry = 1 if timeout < 1: timeout = 1 while True: try: numJobs = self.redisCli.llen(self.queueName) except ConnectionError: if retry > 0: retry -= 1 time.sleep(timeout) else: raise except Exception: raise else: Loading @@ -34,10 +49,15 @@ class JobQueue(object): """Returns the name of the current queue.""" return self.queueName def getJob(self): def getJob(self, retry = 10, timeout = 30): """Gets a copy of the first job without moving it out from the current queue.""" if retry < 1: retry = 1 if timeout < 1: timeout = 1 while True: try: job = json.loads(self.redisCli.lrange(self.queueName, self.len() - 1, self.len() - 1)[0].decode("utf-8")) job = json.loads(self.redisCli.lrange(self.queueName, -1, -1)[0].decode("utf-8")) jobObj = Job() jobObj.setId(job["jobId"]) jobObj.setType(job["jobType"]) Loading @@ -48,13 +68,23 @@ class JobQueue(object): jobObj.setEndTime(job["endTime"]) jobObj.setExecutionDuration(job["executionDuration"]) jobObj.setInfo(job["jobInfo"]) except ConnectionError: if retry > 0: retry -= 1 time.sleep(timeout) else: raise except Exception: raise else: return jobObj def insertJob(self, jobObj): def insertJob(self, jobObj, retry = 10, timeout = 30): """Pushes a new job into the queue.""" if retry < 1: retry = 1 if timeout < 1: timeout = 1 data = { "jobId": jobObj.jobId, "jobType": jobObj.type, "ownerId": jobObj.ownerId, Loading @@ -67,15 +97,38 @@ class JobQueue(object): "parameters": jobObj.parameters, "results": jobObj.results, "jobInfo": jobObj.jobInfo } while True: try: self.redisCli.lpush(self.queueName, json.dumps(data)) except ConnectionError: if retry > 0: retry -= 1 time.sleep(timeout) else: raise except Exception: raise else: return def extractJob(self): def extractJob(self, retry = 10, timeout = 30): """Moves out a job from the end of the current queue.""" if retry < 1: retry = 1 if timeout < 1: timeout = 1 while True: try: job = json.loads(self.redisCli.brpop(self.queueName)[1].decode("utf-8")) except ConnectionError: if retry > 0: retry -= 1 time.sleep(timeout) else: raise except Exception: raise else: jobObj = Job() jobObj.setId(job["jobId"]) jobObj.setType(job["jobType"]) Loading @@ -86,9 +139,6 @@ class JobQueue(object): jobObj.setEndTime(job["endTime"]) jobObj.setExecutionDuration(job["executionDuration"]) jobObj.setInfo(job["jobInfo"]) except Exception: raise else: return jobObj def moveJobTo(self, nextQueueName): Loading
transfer_service/log_listener.py +19 −14 Original line number Diff line number Diff line Loading @@ -5,6 +5,7 @@ import redis import time from multiprocessing import Process from redis.exceptions import ConnectionError from config import Config Loading @@ -28,12 +29,14 @@ class LogListener(Process): if os.path.exists(self.logFilePath): os.remove(self.logFilePath) while True: time.sleep(1) time.sleep(2) try: queueLen = self.redisCli.llen(self.logQueue) except ConnectionError: pass except Exception: queueLen = 0 raise else: while queueLen > 0: try: lfp = open(self.logFilePath, 'a') Loading @@ -42,6 +45,8 @@ class LogListener(Process): else: try: logRecord = self.redisCli.brpop(self.logQueue)[1].decode("utf-8") except ConnectionError: break except Exception: raise else: Loading
transfer_service/redis_rpc_server.py +10 −3 Original line number Diff line number Diff line import json import redis import threading import time from redis.exceptions import ConnectionError class RedisRPCServer(threading.Thread): Loading @@ -14,14 +17,18 @@ class RedisRPCServer(threading.Thread): def run(self): while True: try: # block the connection if the there is nothing to pop channel, request = self.client.brpop(self.rpcQueue) except ConnectionError: time.sleep(2) except Exception: raise else: channel = channel.decode("utf-8") request = json.loads(request.decode("utf-8")) response = self.callback(request) self.client.rpush(request["req_id"], json.dumps(response)) self.client.expire(request["req_id"], 30) except Exception: raise def callback(self, request): """ Loading