Loading transfer_service/db_connector.py +5 −1 Original line number Diff line number Diff line Loading @@ -713,10 +713,11 @@ class DbConnector(object): start_time, end_time, job_info, node_list, results, error_message, error_type) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (job_id) DO UPDATE SET (owner_id, Loading @@ -725,6 +726,7 @@ class DbConnector(object): start_time, end_time, job_info, node_list, results, error_message, error_type) Loading @@ -734,6 +736,7 @@ class DbConnector(object): EXCLUDED.start_time, EXCLUDED.end_time, EXCLUDED.job_info, EXCLUDED.node_list, EXCLUDED.results, EXCLUDED.error_message, EXCLUDED.error_type); Loading @@ -745,6 +748,7 @@ class DbConnector(object): jobObj.startTime, jobObj.endTime, json.dumps(jobObj.jobInfo), json.dumps(jobObj.nodeList), json.dumps(jobObj.results), jobObj.errorMessage, jobObj.errorType,)) Loading transfer_service/job.py +4 −0 Original line number Diff line number Diff line Loading @@ -17,6 +17,7 @@ class Job(object): self.executionDuration = None self.destruction = None self.parameters = None self.nodeList = None self.results = None self.errorMessage = None self.errorType = None Loading Loading @@ -50,6 +51,9 @@ class Job(object): def setExecutionDuration(self, executionDuration): self.executionDuration = executionDuration def setNodeList(self, nodeList): self.nodeList = nodeList def setResults(self, results): self.results = results Loading transfer_service/job_queue.py +4 −1 Original line number Diff line number Diff line Loading @@ -68,6 +68,7 @@ class JobQueue(object): jobObj.setEndTime(job["endTime"]) jobObj.setExecutionDuration(job["executionDuration"]) jobObj.setInfo(job["jobInfo"]) jobObj.setNodeList(job["nodeList"]) except ConnectionError: if retry > 0: retry -= 1 Loading Loading @@ -96,7 +97,8 @@ class JobQueue(object): "destruction": jobObj.destruction, "parameters": jobObj.parameters, "results": jobObj.results, "jobInfo": jobObj.jobInfo } "jobInfo": jobObj.jobInfo, "nodeList": jobObj.nodeList } while True: try: self.redisCli.lpush(self.queueName, json.dumps(data)) Loading Loading @@ -139,6 +141,7 @@ class JobQueue(object): jobObj.setEndTime(job["endTime"]) jobObj.setExecutionDuration(job["executionDuration"]) jobObj.setInfo(job["jobInfo"]) jobObj.setNodeList(job["nodeList"]) return jobObj def moveJobTo(self, nextQueueName): Loading transfer_service/retrieve_cleaner.py +1 −1 Original line number Diff line number Diff line Loading @@ -98,7 +98,7 @@ class RetrieveCleaner(TaskExecutor): self.wait() if self.destQueue.len() < self.maxReadyJobs and self.srcQueue.len() > 0: self.jobObj = self.srcQueue.getJob() self.nodeList = self.jobObj.jobInfo["nodeList"].copy() self.nodeList = self.jobObj.nodeList.copy() self.destPathList = self.jobObj.jobInfo["destPathList"].copy() nodeInfo = self.dbConn.getOSPath(self.nodeList[0]) self.username = nodeInfo["username"] Loading transfer_service/retrieve_executor.py +1 −2 Original line number Diff line number Diff line Loading @@ -319,8 +319,7 @@ class RetrieveExecutor(TaskExecutor): if self.srcQueue.len() > 0: self.jobObj = self.srcQueue.getJob() self.jobId = self.jobObj.jobId self.nodeList = self.jobObj.jobInfo["nodeList"].copy() self.jobObj.jobInfo.pop("nodeList") self.nodeList = self.jobObj.nodeList.copy() self.buildFileList() self.buildBlocks() result = self.retrieveData() Loading Loading
transfer_service/db_connector.py +5 −1 Original line number Diff line number Diff line Loading @@ -713,10 +713,11 @@ class DbConnector(object): start_time, end_time, job_info, node_list, results, error_message, error_type) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (job_id) DO UPDATE SET (owner_id, Loading @@ -725,6 +726,7 @@ class DbConnector(object): start_time, end_time, job_info, node_list, results, error_message, error_type) Loading @@ -734,6 +736,7 @@ class DbConnector(object): EXCLUDED.start_time, EXCLUDED.end_time, EXCLUDED.job_info, EXCLUDED.node_list, EXCLUDED.results, EXCLUDED.error_message, EXCLUDED.error_type); Loading @@ -745,6 +748,7 @@ class DbConnector(object): jobObj.startTime, jobObj.endTime, json.dumps(jobObj.jobInfo), json.dumps(jobObj.nodeList), json.dumps(jobObj.results), jobObj.errorMessage, jobObj.errorType,)) Loading
transfer_service/job.py +4 −0 Original line number Diff line number Diff line Loading @@ -17,6 +17,7 @@ class Job(object): self.executionDuration = None self.destruction = None self.parameters = None self.nodeList = None self.results = None self.errorMessage = None self.errorType = None Loading Loading @@ -50,6 +51,9 @@ class Job(object): def setExecutionDuration(self, executionDuration): self.executionDuration = executionDuration def setNodeList(self, nodeList): self.nodeList = nodeList def setResults(self, results): self.results = results Loading
transfer_service/job_queue.py +4 −1 Original line number Diff line number Diff line Loading @@ -68,6 +68,7 @@ class JobQueue(object): jobObj.setEndTime(job["endTime"]) jobObj.setExecutionDuration(job["executionDuration"]) jobObj.setInfo(job["jobInfo"]) jobObj.setNodeList(job["nodeList"]) except ConnectionError: if retry > 0: retry -= 1 Loading Loading @@ -96,7 +97,8 @@ class JobQueue(object): "destruction": jobObj.destruction, "parameters": jobObj.parameters, "results": jobObj.results, "jobInfo": jobObj.jobInfo } "jobInfo": jobObj.jobInfo, "nodeList": jobObj.nodeList } while True: try: self.redisCli.lpush(self.queueName, json.dumps(data)) Loading Loading @@ -139,6 +141,7 @@ class JobQueue(object): jobObj.setEndTime(job["endTime"]) jobObj.setExecutionDuration(job["executionDuration"]) jobObj.setInfo(job["jobInfo"]) jobObj.setNodeList(job["nodeList"]) return jobObj def moveJobTo(self, nextQueueName): Loading
transfer_service/retrieve_cleaner.py +1 −1 Original line number Diff line number Diff line Loading @@ -98,7 +98,7 @@ class RetrieveCleaner(TaskExecutor): self.wait() if self.destQueue.len() < self.maxReadyJobs and self.srcQueue.len() > 0: self.jobObj = self.srcQueue.getJob() self.nodeList = self.jobObj.jobInfo["nodeList"].copy() self.nodeList = self.jobObj.nodeList.copy() self.destPathList = self.jobObj.jobInfo["destPathList"].copy() nodeInfo = self.dbConn.getOSPath(self.nodeList[0]) self.username = nodeInfo["username"] Loading
transfer_service/retrieve_executor.py +1 −2 Original line number Diff line number Diff line Loading @@ -319,8 +319,7 @@ class RetrieveExecutor(TaskExecutor): if self.srcQueue.len() > 0: self.jobObj = self.srcQueue.getJob() self.jobId = self.jobObj.jobId self.nodeList = self.jobObj.jobInfo["nodeList"].copy() self.jobObj.jobInfo.pop("nodeList") self.nodeList = self.jobObj.nodeList.copy() self.buildFileList() self.buildBlocks() result = self.retrieveData() Loading