Loading transfer_service/start_job_rpc_server.py +60 −11 Original line number Diff line number Diff line Loading @@ -38,11 +38,11 @@ class StartJobRPCServer(RedisRPCServer): logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() logStreamHandler = logging.StreamHandler() logStreamHandler.setFormatter(logFormatter) #logStreamHandler = logging.StreamHandler() #logStreamHandler.setFormatter(logFormatter) redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) self.logger.addHandler(logStreamHandler) #self.logger.addHandler(logStreamHandler) self.pendingQueueRead = JobQueue("read_pending") self.terminatedQueueRead = JobQueue("read_terminated") super(StartJobRPCServer, self).__init__(host, port, db, rpcQueue) Loading @@ -52,25 +52,74 @@ class StartJobRPCServer(RedisRPCServer): out = open("start_job_amqp_server_log.txt", "a") out.write(json.dumps(requestBody)) job = Job() job.setId(requestBody["job"]["jobId"]) job.setType(requestBody["job"]["jobInfo"]["transfer"]["direction"]) job.setInfo(requestBody["job"]["jobInfo"]) job.setOwnerId(requestBody["job"]["ownerId"]) if self.pendingQueueRead.len() >= self.maxPendingJobs: try: pendingQueueLen = self.pendingQueueRead.len() terminatedQueueLen = self.terminatedQueueRead.len() except Exception: errorMsg = "Cache error." self.logger.exception(errorMsg) response = { "responseType": "ERROR", "errorCode": 3, "errorMsg": errorMsg } return response if pendingQueueLen >= self.maxPendingJobs: job.setPhase("ERROR") job.setErrorType("transient") job.setErrorMessage("Pending queue is full, please, retry later.") try: self.dbConn.insertJob(job) if self.terminatedQueueRead.len() == self.maxTerminatedJobs: except Exception: errorMsg = "Database error." self.logger.exception(errorMsg) response = { "responseType": "ERROR", "errorCode": 2, "errorMsg": errorMsg } return response try: if terminatedQueueLen >= self.maxTerminatedJobs: self.terminatedQueueRead.extractJob() self.terminatedQueueRead.insertJob(job) except Exception: errorMsg = "Cache error." self.logger.exception(errorMsg) response = { "responseType": "ERROR", "errorCode": 3, "errorMsg": errorMsg } return response else: job.setPhase(requestBody["job"]["phase"]) try: self.dbConn.insertJob(job) except Exception: errorMsg = "Database error." self.logger.exception(errorMsg) response = { "responseType": "ERROR", "errorCode": 2, "errorMsg": errorMsg } return response try: self.pendingQueueRead.insertJob(job) except Exception: errorMsg = "Cache error." self.logger.exception(errorMsg) response = { "responseType": "ERROR", "errorCode": 3, "errorMsg": errorMsg } return response try: response = self.dbConn.getJob(job.jobId) except Exception: errorMsg = "Database error." self.logger.exception(errorMsg) response = { "responseType": "ERROR", "errorCode": 2, "errorMsg": errorMsg } # debug block... out.write(f"Db response: {response}") Loading transfer_service/store_executor.py +92 −81 Original line number Diff line number Diff line Loading @@ -50,11 +50,11 @@ class StoreExecutor(TaskExecutor): logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() logStreamHandler = logging.StreamHandler() logStreamHandler.setFormatter(logFormatter) #logStreamHandler = logging.StreamHandler() #logStreamHandler.setFormatter(logFormatter) redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) self.logger.addHandler(logStreamHandler) #self.logger.addHandler(logStreamHandler) self.resDir = params["res_dir"] params = config.loadSection("spectrum_archive") self.tapePool = params["tape_pool"] Loading @@ -74,15 +74,25 @@ class StoreExecutor(TaskExecutor): self.nodeList = [] super(StoreExecutor, self).__init__() def copyData(self): def execute(self): try: self.logger.info("++++++++++ Start of execution phase ++++++++++") self.logger.info(f"Data size: {self.dataSize} B") try: self.dbConn.setPhase(self.jobId, "EXECUTING") self.dbConn.setStartTime(self.jobId) except Exception: self.logger.exception("FATAL: unable to update the file catalog.") return False srcPathPrefix = self.storageStorePath.replace("{username}", self.username) srcData = os.listdir(srcPathPrefix) self.logger.debug("Checking storage available space...") try: storageBasePath = self.dbConn.getStorageBasePath(self.storageId) except Exception: self.logger.exception("FATAL: unable to obtain the storage base path.") return False storageFreeSpace = 0 tapeFrontendFreeSpace = 0 if self.storageType == "hot": [ total, used, free ] = self.systemUtils.getFileSystemSize(storageBasePath) storageFreeSpace = free - self.TOL Loading @@ -90,24 +100,26 @@ class StoreExecutor(TaskExecutor): if storageFreeSpace < self.dataSize: return False else: #[ total, used, free ] = self.systemUtils.getFileSystemSize(storageBasePath) #tapeFrontendFreeSpace = free - self.maxBlockSize - self.TOL self.tapeClient.connect() tapePoolList = self.tapeClient.getPoolList() for el in tapePoolList: if el.getName() == self.tapePool: storageFreeSpace = el.getFreeSpace() - self.TOL self.tapeClient.disconnect() #self.logger.debug(f"tapeFrontendFreeSpace (cold): {tapeFrontendFreeSpace}") self.logger.debug(f"storageFreeSpace (cold): {storageFreeSpace} B") #if tapeFrontendFreeSpace < self.dataSize or storageFreeSpace < self.dataSize: if storageFreeSpace < self.dataSize: return False destPathPrefix = storageBasePath + '/' + self.username self.logger.info("Starting data copy...") sp = subprocess.run(["rsync", "-av", srcPathPrefix + '/', destPathPrefix + '/'], capture_output = True) if(sp.returncode or sp.stderr): self.logger.error("FATAL: an error occurred while copying the data.") return False except Exception: self.logger.exception("FATAL: something went wrong during the execution phase.") return False else: self.logger.info("++++++++++ End of execution phase ++++++++++") return True def cleanup(self): Loading @@ -131,6 +143,13 @@ class StoreExecutor(TaskExecutor): results = [{"target": ""}] self.dbConn.setResults(self.jobId, results) # Send e-mail notification m = Mailer(self.logger) m.addRecipient(self.adminEmail) self.userEmail = self.dbConn.getUserEmail(self.userId) if self.userEmail != self.adminEmail: m.addRecipient(self.userEmail) if status == "OK": for el in self.nodeList: nodeVOSPath = el[2] Loading @@ -149,15 +168,11 @@ class StoreExecutor(TaskExecutor): self.dbConn.setPhase(self.jobId, "COMPLETED") self.dbConn.setEndTime(self.jobId) # Send e-mail notification m = Mailer(self.logger) m.addRecipient(self.adminEmail) self.userEmail = self.dbConn.getUserEmail(self.userId) if self.userEmail != self.adminEmail: m.addRecipient(self.userEmail) msg = f""" [VOSpace data storage procedure summary] ########## VOSpace data storage procedure summary ########## Job ID: {self.jobId} Job type {self.jobObj.type} Storage type: {self.storageType} Storage ID: {self.storageId} Creator ID: {self.userId} Loading Loading @@ -185,25 +200,23 @@ class StoreExecutor(TaskExecutor): self.dbConn.setPhase(self.jobId, "ERROR") self.dbConn.setEndTime(self.jobId) # Send e-mail notification m = Mailer(self.logger) m.addRecipient(self.adminEmail) self.userEmail = self.dbConn.getUserEmail(self.userId) if self.userEmail != self.adminEmail: m.addRecipient(self.userEmail) msg = f""" [VOSpace data storage procedure summary] ########## VOSpace data storage procedure summary ########## Job ID: {self.jobId} Job type {self.jobObj.type} Storage type: {self.storageType} Storage ID: {self.storageId} Creator ID: {self.userId} """ info = f""" INFO: ERROR: the job was terminated due to an error that occurred while copying the data on the storage point. This issue will be automatically reported to the administrator. """ msg += info m.setMessage("VOSpace data storage notification: Job ERROR", msg) Loading @@ -215,7 +228,13 @@ class StoreExecutor(TaskExecutor): self.setDestinationQueueName("write_terminated") while True: self.wait() if self.srcQueue.len() > 0: try: srcQueueLen = self.srcQueue.len() destQueueLen = self.destQueue.len() except Exception: self.logger.exception("Cache error: failed to retrieve queue length.") else: if srcQueueLen > 0: self.jobObj = self.srcQueue.getJob() self.jobId = self.jobObj.jobId self.username = self.jobObj.jobInfo["userName"] Loading @@ -225,25 +244,17 @@ class StoreExecutor(TaskExecutor): self.storageType = self.jobObj.jobInfo["storageType"] self.nodeList = self.jobObj.jobInfo["nodeList"].copy() self.dataSize = self.jobObj.jobInfo["dataSize"] self.logger.debug(f"dataSize: {self.dataSize} B") # TODO # 1) Controlla il tipo di destinazione: hot (server) o cold (tape) # *) HOT # 1) Controlla di avere sufficiente spazio su disco sul server # 2) Ottieni la cartella o la lista delle cartelle sulla 'store' sul transf. node # 3) Avvia la copia delle cartelle con rsync da transf. node a frontend tape # *) COLD # 1) Controlla che altri non stiano migrando in questo preciso momento (con tape client) # 2) Ottieni la cartella o la lista delle cartelle sulla 'store' sul transf. node # 3) Avvia la copia delle cartelle con rsync da transf. node a frontend tape # 4) A copia finita, se tutto ok, rimuovi i dati sul tn e setta async_trans a true sul db if self.copyData(): if self.execute(): self.cleanup() self.update("OK") else: self.update("ERROR") if self.destQueue.len() >= self.maxTerminatedJobs: try: if destQueueLen >= self.maxTerminatedJobs: self.destQueue.extractJob() self.destQueue.insertJob(self.jobObj) self.srcQueue.extractJob() except Exception: self.logger.exception(f"Failed to move job {self.jobObj.jobId} from '{self.srcQueue.name()}' to '{self.destQueue.name()}'") else: self.logger.info(f"Job {self.jobObj.jobId} MOVED from '{self.srcQueue.name()}' to '{self.destQueue.name()}'") Loading
transfer_service/start_job_rpc_server.py +60 −11 Original line number Diff line number Diff line Loading @@ -38,11 +38,11 @@ class StartJobRPCServer(RedisRPCServer): logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() logStreamHandler = logging.StreamHandler() logStreamHandler.setFormatter(logFormatter) #logStreamHandler = logging.StreamHandler() #logStreamHandler.setFormatter(logFormatter) redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) self.logger.addHandler(logStreamHandler) #self.logger.addHandler(logStreamHandler) self.pendingQueueRead = JobQueue("read_pending") self.terminatedQueueRead = JobQueue("read_terminated") super(StartJobRPCServer, self).__init__(host, port, db, rpcQueue) Loading @@ -52,25 +52,74 @@ class StartJobRPCServer(RedisRPCServer): out = open("start_job_amqp_server_log.txt", "a") out.write(json.dumps(requestBody)) job = Job() job.setId(requestBody["job"]["jobId"]) job.setType(requestBody["job"]["jobInfo"]["transfer"]["direction"]) job.setInfo(requestBody["job"]["jobInfo"]) job.setOwnerId(requestBody["job"]["ownerId"]) if self.pendingQueueRead.len() >= self.maxPendingJobs: try: pendingQueueLen = self.pendingQueueRead.len() terminatedQueueLen = self.terminatedQueueRead.len() except Exception: errorMsg = "Cache error." self.logger.exception(errorMsg) response = { "responseType": "ERROR", "errorCode": 3, "errorMsg": errorMsg } return response if pendingQueueLen >= self.maxPendingJobs: job.setPhase("ERROR") job.setErrorType("transient") job.setErrorMessage("Pending queue is full, please, retry later.") try: self.dbConn.insertJob(job) if self.terminatedQueueRead.len() == self.maxTerminatedJobs: except Exception: errorMsg = "Database error." self.logger.exception(errorMsg) response = { "responseType": "ERROR", "errorCode": 2, "errorMsg": errorMsg } return response try: if terminatedQueueLen >= self.maxTerminatedJobs: self.terminatedQueueRead.extractJob() self.terminatedQueueRead.insertJob(job) except Exception: errorMsg = "Cache error." self.logger.exception(errorMsg) response = { "responseType": "ERROR", "errorCode": 3, "errorMsg": errorMsg } return response else: job.setPhase(requestBody["job"]["phase"]) try: self.dbConn.insertJob(job) except Exception: errorMsg = "Database error." self.logger.exception(errorMsg) response = { "responseType": "ERROR", "errorCode": 2, "errorMsg": errorMsg } return response try: self.pendingQueueRead.insertJob(job) except Exception: errorMsg = "Cache error." self.logger.exception(errorMsg) response = { "responseType": "ERROR", "errorCode": 3, "errorMsg": errorMsg } return response try: response = self.dbConn.getJob(job.jobId) except Exception: errorMsg = "Database error." self.logger.exception(errorMsg) response = { "responseType": "ERROR", "errorCode": 2, "errorMsg": errorMsg } # debug block... out.write(f"Db response: {response}") Loading
transfer_service/store_executor.py +92 −81 Original line number Diff line number Diff line Loading @@ -50,11 +50,11 @@ class StoreExecutor(TaskExecutor): logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() logStreamHandler = logging.StreamHandler() logStreamHandler.setFormatter(logFormatter) #logStreamHandler = logging.StreamHandler() #logStreamHandler.setFormatter(logFormatter) redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) self.logger.addHandler(logStreamHandler) #self.logger.addHandler(logStreamHandler) self.resDir = params["res_dir"] params = config.loadSection("spectrum_archive") self.tapePool = params["tape_pool"] Loading @@ -74,15 +74,25 @@ class StoreExecutor(TaskExecutor): self.nodeList = [] super(StoreExecutor, self).__init__() def copyData(self): def execute(self): try: self.logger.info("++++++++++ Start of execution phase ++++++++++") self.logger.info(f"Data size: {self.dataSize} B") try: self.dbConn.setPhase(self.jobId, "EXECUTING") self.dbConn.setStartTime(self.jobId) except Exception: self.logger.exception("FATAL: unable to update the file catalog.") return False srcPathPrefix = self.storageStorePath.replace("{username}", self.username) srcData = os.listdir(srcPathPrefix) self.logger.debug("Checking storage available space...") try: storageBasePath = self.dbConn.getStorageBasePath(self.storageId) except Exception: self.logger.exception("FATAL: unable to obtain the storage base path.") return False storageFreeSpace = 0 tapeFrontendFreeSpace = 0 if self.storageType == "hot": [ total, used, free ] = self.systemUtils.getFileSystemSize(storageBasePath) storageFreeSpace = free - self.TOL Loading @@ -90,24 +100,26 @@ class StoreExecutor(TaskExecutor): if storageFreeSpace < self.dataSize: return False else: #[ total, used, free ] = self.systemUtils.getFileSystemSize(storageBasePath) #tapeFrontendFreeSpace = free - self.maxBlockSize - self.TOL self.tapeClient.connect() tapePoolList = self.tapeClient.getPoolList() for el in tapePoolList: if el.getName() == self.tapePool: storageFreeSpace = el.getFreeSpace() - self.TOL self.tapeClient.disconnect() #self.logger.debug(f"tapeFrontendFreeSpace (cold): {tapeFrontendFreeSpace}") self.logger.debug(f"storageFreeSpace (cold): {storageFreeSpace} B") #if tapeFrontendFreeSpace < self.dataSize or storageFreeSpace < self.dataSize: if storageFreeSpace < self.dataSize: return False destPathPrefix = storageBasePath + '/' + self.username self.logger.info("Starting data copy...") sp = subprocess.run(["rsync", "-av", srcPathPrefix + '/', destPathPrefix + '/'], capture_output = True) if(sp.returncode or sp.stderr): self.logger.error("FATAL: an error occurred while copying the data.") return False except Exception: self.logger.exception("FATAL: something went wrong during the execution phase.") return False else: self.logger.info("++++++++++ End of execution phase ++++++++++") return True def cleanup(self): Loading @@ -131,6 +143,13 @@ class StoreExecutor(TaskExecutor): results = [{"target": ""}] self.dbConn.setResults(self.jobId, results) # Send e-mail notification m = Mailer(self.logger) m.addRecipient(self.adminEmail) self.userEmail = self.dbConn.getUserEmail(self.userId) if self.userEmail != self.adminEmail: m.addRecipient(self.userEmail) if status == "OK": for el in self.nodeList: nodeVOSPath = el[2] Loading @@ -149,15 +168,11 @@ class StoreExecutor(TaskExecutor): self.dbConn.setPhase(self.jobId, "COMPLETED") self.dbConn.setEndTime(self.jobId) # Send e-mail notification m = Mailer(self.logger) m.addRecipient(self.adminEmail) self.userEmail = self.dbConn.getUserEmail(self.userId) if self.userEmail != self.adminEmail: m.addRecipient(self.userEmail) msg = f""" [VOSpace data storage procedure summary] ########## VOSpace data storage procedure summary ########## Job ID: {self.jobId} Job type {self.jobObj.type} Storage type: {self.storageType} Storage ID: {self.storageId} Creator ID: {self.userId} Loading Loading @@ -185,25 +200,23 @@ class StoreExecutor(TaskExecutor): self.dbConn.setPhase(self.jobId, "ERROR") self.dbConn.setEndTime(self.jobId) # Send e-mail notification m = Mailer(self.logger) m.addRecipient(self.adminEmail) self.userEmail = self.dbConn.getUserEmail(self.userId) if self.userEmail != self.adminEmail: m.addRecipient(self.userEmail) msg = f""" [VOSpace data storage procedure summary] ########## VOSpace data storage procedure summary ########## Job ID: {self.jobId} Job type {self.jobObj.type} Storage type: {self.storageType} Storage ID: {self.storageId} Creator ID: {self.userId} """ info = f""" INFO: ERROR: the job was terminated due to an error that occurred while copying the data on the storage point. This issue will be automatically reported to the administrator. """ msg += info m.setMessage("VOSpace data storage notification: Job ERROR", msg) Loading @@ -215,7 +228,13 @@ class StoreExecutor(TaskExecutor): self.setDestinationQueueName("write_terminated") while True: self.wait() if self.srcQueue.len() > 0: try: srcQueueLen = self.srcQueue.len() destQueueLen = self.destQueue.len() except Exception: self.logger.exception("Cache error: failed to retrieve queue length.") else: if srcQueueLen > 0: self.jobObj = self.srcQueue.getJob() self.jobId = self.jobObj.jobId self.username = self.jobObj.jobInfo["userName"] Loading @@ -225,25 +244,17 @@ class StoreExecutor(TaskExecutor): self.storageType = self.jobObj.jobInfo["storageType"] self.nodeList = self.jobObj.jobInfo["nodeList"].copy() self.dataSize = self.jobObj.jobInfo["dataSize"] self.logger.debug(f"dataSize: {self.dataSize} B") # TODO # 1) Controlla il tipo di destinazione: hot (server) o cold (tape) # *) HOT # 1) Controlla di avere sufficiente spazio su disco sul server # 2) Ottieni la cartella o la lista delle cartelle sulla 'store' sul transf. node # 3) Avvia la copia delle cartelle con rsync da transf. node a frontend tape # *) COLD # 1) Controlla che altri non stiano migrando in questo preciso momento (con tape client) # 2) Ottieni la cartella o la lista delle cartelle sulla 'store' sul transf. node # 3) Avvia la copia delle cartelle con rsync da transf. node a frontend tape # 4) A copia finita, se tutto ok, rimuovi i dati sul tn e setta async_trans a true sul db if self.copyData(): if self.execute(): self.cleanup() self.update("OK") else: self.update("ERROR") if self.destQueue.len() >= self.maxTerminatedJobs: try: if destQueueLen >= self.maxTerminatedJobs: self.destQueue.extractJob() self.destQueue.insertJob(self.jobObj) self.srcQueue.extractJob() except Exception: self.logger.exception(f"Failed to move job {self.jobObj.jobId} from '{self.srcQueue.name()}' to '{self.destQueue.name()}'") else: self.logger.info(f"Job {self.jobObj.jobId} MOVED from '{self.srcQueue.name()}' to '{self.destQueue.name()}'")