Loading transfer_service/data_amqp_server.py +1 −1 Original line number Diff line number Diff line Loading @@ -93,7 +93,7 @@ class DataAMQPServer(RedisRpcServer): username = requestBody["userName"] self.prepare(username) job = Job() job.setType("other") job.setType("vos_data") job.setInfo(requestBody) job.setPhase("PENDING") job.setOwnerId(self.dbConn.getUserId(username)) Loading transfer_service/import_amqp_server.py +23 −9 Original line number Diff line number Diff line Loading @@ -4,7 +4,8 @@ import os from config import Config from db_connector import DbConnector from import_job_queue import ImportJobQueue from job_queue import JobQueue from job import Job from redis_rpc_server import RedisRpcServer from system_utils import SystemUtils Loading @@ -24,7 +25,7 @@ class ImportAMQPServer(RedisRpcServer): 1) self.params = config.loadSection("scheduling") self.maxReadyJobs = self.params.getint("max_ready_jobs") self.importReadyQueue = ImportJobQueue("import_ready") self.importReadyQueue = JobQueue("import_ready") self.systemUtils = SystemUtils() super(ImportAMQPServer, self).__init__(host, port, db, rpcQueue) Loading Loading @@ -80,13 +81,26 @@ class ImportAMQPServer(RedisRpcServer): "errorCode": 8, "errorMsg": "Import queue is full, please, retry later." } else: job = dict() job["userId"] = userId job["path"] = path job["pathPrefix"] = pathPrefix job["storageId"] = storageId job["storageType"] = storageType self.importReadyQueue.insertJob(job) jobObj = Job() jobObj.setType("vos_import") jobInfo = requestBody.copy() #jobInfo["userId"] = userId jobInfo["pathPrefix"] = pathPrefix jobInfo["storageId"] = storageId jobInfo["storageType"] = storageType jobObj.setInfo(jobInfo) jobObj.setPhase("QUEUED") jobObj.setOwnerId(userId) self.dbConn.insertJob(jobObj) #job = dict() #job["userId"] = userId #job["path"] = path #job["pathPrefix"] = pathPrefix #job["storageId"] = storageId #job["storageType"] = storageType self.importReadyQueue.insertJob(jobObj) #p = Process(target = self.load, # args = (self.tapeClient, Loading transfer_service/import_executor.py +22 −9 Original line number Diff line number Diff line Loading @@ -7,15 +7,15 @@ from config import Config from checksum import Checksum from datetime import datetime as dt from db_connector import DbConnector from import_task_executor import ImportTaskExecutor from mailer import Mailer from node import Node from system_utils import SystemUtils from tabulate import tabulate from tape_client import TapeClient from task_executor import TaskExecutor class ImportExecutor(ImportTaskExecutor): class ImportExecutor(TaskExecutor): def __init__(self): self.md5calc = Checksum() Loading @@ -34,7 +34,8 @@ class ImportExecutor(ImportTaskExecutor): self.params["user"], self.params["pkey_file_path"]) self.systemUtils = SystemUtils() self.job = None self.jobObj = None self.jobId = None self.userId = None self.path = None self.pathPrefix = None Loading @@ -45,6 +46,9 @@ class ImportExecutor(ImportTaskExecutor): def importVOSpaceNodes(self): """This method performs the VOSpace import operation.""" self.dbConn.setPhase(self.jobId, "EXECUTING") self.dbConn.setStartTime(self.jobId) start = dt.now() nodeList = [] timestamp = dt.now().strftime("%Y_%m_%d-%H_%M_%S") Loading Loading @@ -140,6 +144,13 @@ class ImportExecutor(ImportTaskExecutor): nlfp.close() end = dt.now() # Update job status (to be moved) results = [{"target": ""}] self.dbConn.setResults(self.jobId, results) self.jobObj.setPhase("COMPLETED") self.dbConn.setPhase(self.jobId, "COMPLETED") self.dbConn.setEndTime(self.jobId) m = Mailer() m.addRecipient("cristiano.urban@inaf.it") msg = f""" Loading @@ -160,6 +171,7 @@ class ImportExecutor(ImportTaskExecutor): os.remove(nodeListFile) def run(self): print("Starting import executor...") self.setSourceQueueName("import_ready") Loading @@ -167,12 +179,13 @@ class ImportExecutor(ImportTaskExecutor): while True: self.wait() if self.srcQueue.len() > 0: self.job = self.srcQueue.getJob() self.userId = self.job["userId"] self.path = self.job["path"] self.pathPrefix = self.job["pathPrefix"] self.storageId = self.job["storageId"] self.storageType = self.job["storageType"] self.jobObj = self.srcQueue.getJob() self.jobId = self.jobObj.jobId self.userId = self.jobObj.ownerId self.path = self.jobObj.jobInfo["path"] self.pathPrefix = self.jobObj.jobInfo["pathPrefix"] self.storageId = self.jobObj.jobInfo["storageId"] self.storageType = self.jobObj.jobInfo["storageType"] self.importVOSpaceNodes() if self.destQueue.len() == self.maxTerminatedJobs: self.destQueue.extractJob() Loading Loading
transfer_service/data_amqp_server.py +1 −1 Original line number Diff line number Diff line Loading @@ -93,7 +93,7 @@ class DataAMQPServer(RedisRpcServer): username = requestBody["userName"] self.prepare(username) job = Job() job.setType("other") job.setType("vos_data") job.setInfo(requestBody) job.setPhase("PENDING") job.setOwnerId(self.dbConn.getUserId(username)) Loading
transfer_service/import_amqp_server.py +23 −9 Original line number Diff line number Diff line Loading @@ -4,7 +4,8 @@ import os from config import Config from db_connector import DbConnector from import_job_queue import ImportJobQueue from job_queue import JobQueue from job import Job from redis_rpc_server import RedisRpcServer from system_utils import SystemUtils Loading @@ -24,7 +25,7 @@ class ImportAMQPServer(RedisRpcServer): 1) self.params = config.loadSection("scheduling") self.maxReadyJobs = self.params.getint("max_ready_jobs") self.importReadyQueue = ImportJobQueue("import_ready") self.importReadyQueue = JobQueue("import_ready") self.systemUtils = SystemUtils() super(ImportAMQPServer, self).__init__(host, port, db, rpcQueue) Loading Loading @@ -80,13 +81,26 @@ class ImportAMQPServer(RedisRpcServer): "errorCode": 8, "errorMsg": "Import queue is full, please, retry later." } else: job = dict() job["userId"] = userId job["path"] = path job["pathPrefix"] = pathPrefix job["storageId"] = storageId job["storageType"] = storageType self.importReadyQueue.insertJob(job) jobObj = Job() jobObj.setType("vos_import") jobInfo = requestBody.copy() #jobInfo["userId"] = userId jobInfo["pathPrefix"] = pathPrefix jobInfo["storageId"] = storageId jobInfo["storageType"] = storageType jobObj.setInfo(jobInfo) jobObj.setPhase("QUEUED") jobObj.setOwnerId(userId) self.dbConn.insertJob(jobObj) #job = dict() #job["userId"] = userId #job["path"] = path #job["pathPrefix"] = pathPrefix #job["storageId"] = storageId #job["storageType"] = storageType self.importReadyQueue.insertJob(jobObj) #p = Process(target = self.load, # args = (self.tapeClient, Loading
transfer_service/import_executor.py +22 −9 Original line number Diff line number Diff line Loading @@ -7,15 +7,15 @@ from config import Config from checksum import Checksum from datetime import datetime as dt from db_connector import DbConnector from import_task_executor import ImportTaskExecutor from mailer import Mailer from node import Node from system_utils import SystemUtils from tabulate import tabulate from tape_client import TapeClient from task_executor import TaskExecutor class ImportExecutor(ImportTaskExecutor): class ImportExecutor(TaskExecutor): def __init__(self): self.md5calc = Checksum() Loading @@ -34,7 +34,8 @@ class ImportExecutor(ImportTaskExecutor): self.params["user"], self.params["pkey_file_path"]) self.systemUtils = SystemUtils() self.job = None self.jobObj = None self.jobId = None self.userId = None self.path = None self.pathPrefix = None Loading @@ -45,6 +46,9 @@ class ImportExecutor(ImportTaskExecutor): def importVOSpaceNodes(self): """This method performs the VOSpace import operation.""" self.dbConn.setPhase(self.jobId, "EXECUTING") self.dbConn.setStartTime(self.jobId) start = dt.now() nodeList = [] timestamp = dt.now().strftime("%Y_%m_%d-%H_%M_%S") Loading Loading @@ -140,6 +144,13 @@ class ImportExecutor(ImportTaskExecutor): nlfp.close() end = dt.now() # Update job status (to be moved) results = [{"target": ""}] self.dbConn.setResults(self.jobId, results) self.jobObj.setPhase("COMPLETED") self.dbConn.setPhase(self.jobId, "COMPLETED") self.dbConn.setEndTime(self.jobId) m = Mailer() m.addRecipient("cristiano.urban@inaf.it") msg = f""" Loading @@ -160,6 +171,7 @@ class ImportExecutor(ImportTaskExecutor): os.remove(nodeListFile) def run(self): print("Starting import executor...") self.setSourceQueueName("import_ready") Loading @@ -167,12 +179,13 @@ class ImportExecutor(ImportTaskExecutor): while True: self.wait() if self.srcQueue.len() > 0: self.job = self.srcQueue.getJob() self.userId = self.job["userId"] self.path = self.job["path"] self.pathPrefix = self.job["pathPrefix"] self.storageId = self.job["storageId"] self.storageType = self.job["storageType"] self.jobObj = self.srcQueue.getJob() self.jobId = self.jobObj.jobId self.userId = self.jobObj.ownerId self.path = self.jobObj.jobInfo["path"] self.pathPrefix = self.jobObj.jobInfo["pathPrefix"] self.storageId = self.jobObj.jobInfo["storageId"] self.storageType = self.jobObj.jobInfo["storageType"] self.importVOSpaceNodes() if self.destQueue.len() == self.maxTerminatedJobs: self.destQueue.extractJob() Loading