Loading transfer_service/store_executor.py +98 −36 Original line number Diff line number Diff line Loading @@ -19,8 +19,13 @@ from task_executor import TaskExecutor class StoreExecutor(TaskExecutor): # We use 10 GB of tolerance when we calculate # the free space in our storage point TOL = 10 * (2**30) def __init__(self): self.type = "store_executor" self.systemUtils = SystemUtils() config = Config("/etc/vos_ts/vos_ts.conf") params = config.loadSection("transfer_node") self.storageStorePath = params["store_path"] Loading @@ -32,6 +37,8 @@ class StoreExecutor(TaskExecutor): params["db"], 1, 1) params = config.loadSection("transfer") self.maxBlockSize = self.systemUtils.convertSizeToBytes(params["block_size"]) params = config.loadSection("scheduling") self.maxTerminatedJobs = params.getint("max_terminated_jobs") params = config.loadSection("mail") Loading @@ -50,6 +57,7 @@ class StoreExecutor(TaskExecutor): self.logger.addHandler(logStreamHandler) self.resDir = params["res_dir"] params = config.loadSection("spectrum_archive") self.tapePool = params["tape_pool"] self.tapeClient = TapeClient(params["host"], params.getint("port"), params["user"], Loading @@ -64,7 +72,6 @@ class StoreExecutor(TaskExecutor): self.storageId = None self.storageType = None self.nodeList = [] self.systemUtils = SystemUtils() super(StoreExecutor, self).__init__() def copyData(self): Loading @@ -72,7 +79,30 @@ class StoreExecutor(TaskExecutor): self.dbConn.setStartTime(self.jobId) srcPathPrefix = self.storageStorePath.replace("{username}", self.username) srcData = os.listdir(srcPathPrefix) destPathPrefix = self.dbConn.getStorageBasePath(self.storageId) + '/' + self.username self.logger.debug("Checking storage available space...") storageBasePath = self.dbConn.getStorageBasePath(self.storageId) storageFreeSpace = 0 tapeFrontendFreeSpace = 0 if self.storageType == "hot": [ total, used, free ] = self.systemUtils.getFileSystemSize(storageBasePath) storageFreeSpace = free - self.TOL self.logger.debug(f"storageFreeSpace (hot): {storageFreeSpace}") if storageFreeSpace < self.dataSize: return False else: [ total, used, free ] = self.systemUtils.getFileSystemSize(storageBasePath) tapeFrontendFreeSpace = free - self.maxBlockSize - self.TOL self.tapeClient.connect() tapePoolList = self.tapeClient.getPoolList() for el in tapePoolList: if el.getName() == self.tapePool: storageFreeSpace = el.getFreeSpace() - self.TOL self.tapeClient.disconnect() self.logger.debug(f"tapeFrontendFreeSpace (cold): {tapeFrontendFreeSpace}") self.logger.debug(f"storageFreeSpace (cold): {storageFreeSpace}") if tapeFrontendFreeSpace < self.dataSize or storageFreeSpace < self.dataSize: return False destPathPrefix = storageBasePath + '/' + self.username sp = subprocess.run(["rsync", "-av", srcPathPrefix + '/', destPathPrefix + '/'], capture_output = True) if(sp.returncode or sp.stderr): return False Loading @@ -96,9 +126,11 @@ class StoreExecutor(TaskExecutor): os.chown(srcPathPrefix, uid, gid) os.chmod(srcPathPrefix, 0o755) def update(self): def update(self, status): results = [{"target": ""}] self.dbConn.setResults(self.jobId, results) if status == "OK": for el in self.nodeList: nodeVOSPath = el[2] self.dbConn.setAsyncTrans(nodeVOSPath, True) Loading Loading @@ -147,7 +179,33 @@ class StoreExecutor(TaskExecutor): msg += info m.setMessage("VOSpace data storage notification", msg) m.send() else: self.jobObj.setPhase("ERROR") self.dbConn.setPhase(self.jobId, "ERROR") self.dbConn.setEndTime(self.jobId) # Send e-mail notification m = Mailer(self.logger) m.addRecipient(self.adminEmail) self.userEmail = self.dbConn.getUserEmail(self.userId) if self.userEmail != self.adminEmail: m.addRecipient(self.userEmail) msg = f""" [VOSpace data storage procedure summary] Storage type: {self.storageType} Storage ID: {self.storageId} Creator ID: {self.userId} """ info = f""" INFO: the job was terminated due to an error that occurred while copying the data on the storage point. """ msg += info m.setMessage("VOSpace data storage notification: Job ERROR", msg) self.nodeList.clear() def run(self): Loading @@ -165,6 +223,8 @@ class StoreExecutor(TaskExecutor): self.storageId = self.jobObj.jobInfo["storageId"] self.storageType = self.jobObj.jobInfo["storageType"] self.nodeList = self.jobObj.jobInfo["nodeList"].copy() self.dataSize = self.jobObj.jobInfo["dataSize"] self.logger.debug(f"dataSize: {self.dataSize} B") # TODO # 1) Controlla il tipo di destinazione: hot (server) o cold (tape) # *) HOT Loading @@ -176,10 +236,12 @@ class StoreExecutor(TaskExecutor): # 2) Ottieni la cartella o la lista delle cartelle sulla 'store' sul transf. node # 3) Avvia la copia delle cartelle con rsync da transf. node a frontend tape # 4) A copia finita, se tutto ok, rimuovi i dati sul tn e setta async_trans a true sul db self.copyData() if self.copyData(): self.cleanup() self.update() if self.destQueue.len() == self.maxTerminatedJobs: self.update("OK") else: self.update("ERROR") if self.destQueue.len() >= self.maxTerminatedJobs: self.destQueue.extractJob() self.destQueue.insertJob(self.jobObj) self.srcQueue.extractJob() Loading Loading
transfer_service/store_executor.py +98 −36 Original line number Diff line number Diff line Loading @@ -19,8 +19,13 @@ from task_executor import TaskExecutor class StoreExecutor(TaskExecutor): # We use 10 GB of tolerance when we calculate # the free space in our storage point TOL = 10 * (2**30) def __init__(self): self.type = "store_executor" self.systemUtils = SystemUtils() config = Config("/etc/vos_ts/vos_ts.conf") params = config.loadSection("transfer_node") self.storageStorePath = params["store_path"] Loading @@ -32,6 +37,8 @@ class StoreExecutor(TaskExecutor): params["db"], 1, 1) params = config.loadSection("transfer") self.maxBlockSize = self.systemUtils.convertSizeToBytes(params["block_size"]) params = config.loadSection("scheduling") self.maxTerminatedJobs = params.getint("max_terminated_jobs") params = config.loadSection("mail") Loading @@ -50,6 +57,7 @@ class StoreExecutor(TaskExecutor): self.logger.addHandler(logStreamHandler) self.resDir = params["res_dir"] params = config.loadSection("spectrum_archive") self.tapePool = params["tape_pool"] self.tapeClient = TapeClient(params["host"], params.getint("port"), params["user"], Loading @@ -64,7 +72,6 @@ class StoreExecutor(TaskExecutor): self.storageId = None self.storageType = None self.nodeList = [] self.systemUtils = SystemUtils() super(StoreExecutor, self).__init__() def copyData(self): Loading @@ -72,7 +79,30 @@ class StoreExecutor(TaskExecutor): self.dbConn.setStartTime(self.jobId) srcPathPrefix = self.storageStorePath.replace("{username}", self.username) srcData = os.listdir(srcPathPrefix) destPathPrefix = self.dbConn.getStorageBasePath(self.storageId) + '/' + self.username self.logger.debug("Checking storage available space...") storageBasePath = self.dbConn.getStorageBasePath(self.storageId) storageFreeSpace = 0 tapeFrontendFreeSpace = 0 if self.storageType == "hot": [ total, used, free ] = self.systemUtils.getFileSystemSize(storageBasePath) storageFreeSpace = free - self.TOL self.logger.debug(f"storageFreeSpace (hot): {storageFreeSpace}") if storageFreeSpace < self.dataSize: return False else: [ total, used, free ] = self.systemUtils.getFileSystemSize(storageBasePath) tapeFrontendFreeSpace = free - self.maxBlockSize - self.TOL self.tapeClient.connect() tapePoolList = self.tapeClient.getPoolList() for el in tapePoolList: if el.getName() == self.tapePool: storageFreeSpace = el.getFreeSpace() - self.TOL self.tapeClient.disconnect() self.logger.debug(f"tapeFrontendFreeSpace (cold): {tapeFrontendFreeSpace}") self.logger.debug(f"storageFreeSpace (cold): {storageFreeSpace}") if tapeFrontendFreeSpace < self.dataSize or storageFreeSpace < self.dataSize: return False destPathPrefix = storageBasePath + '/' + self.username sp = subprocess.run(["rsync", "-av", srcPathPrefix + '/', destPathPrefix + '/'], capture_output = True) if(sp.returncode or sp.stderr): return False Loading @@ -96,9 +126,11 @@ class StoreExecutor(TaskExecutor): os.chown(srcPathPrefix, uid, gid) os.chmod(srcPathPrefix, 0o755) def update(self): def update(self, status): results = [{"target": ""}] self.dbConn.setResults(self.jobId, results) if status == "OK": for el in self.nodeList: nodeVOSPath = el[2] self.dbConn.setAsyncTrans(nodeVOSPath, True) Loading Loading @@ -147,7 +179,33 @@ class StoreExecutor(TaskExecutor): msg += info m.setMessage("VOSpace data storage notification", msg) m.send() else: self.jobObj.setPhase("ERROR") self.dbConn.setPhase(self.jobId, "ERROR") self.dbConn.setEndTime(self.jobId) # Send e-mail notification m = Mailer(self.logger) m.addRecipient(self.adminEmail) self.userEmail = self.dbConn.getUserEmail(self.userId) if self.userEmail != self.adminEmail: m.addRecipient(self.userEmail) msg = f""" [VOSpace data storage procedure summary] Storage type: {self.storageType} Storage ID: {self.storageId} Creator ID: {self.userId} """ info = f""" INFO: the job was terminated due to an error that occurred while copying the data on the storage point. """ msg += info m.setMessage("VOSpace data storage notification: Job ERROR", msg) self.nodeList.clear() def run(self): Loading @@ -165,6 +223,8 @@ class StoreExecutor(TaskExecutor): self.storageId = self.jobObj.jobInfo["storageId"] self.storageType = self.jobObj.jobInfo["storageType"] self.nodeList = self.jobObj.jobInfo["nodeList"].copy() self.dataSize = self.jobObj.jobInfo["dataSize"] self.logger.debug(f"dataSize: {self.dataSize} B") # TODO # 1) Controlla il tipo di destinazione: hot (server) o cold (tape) # *) HOT Loading @@ -176,10 +236,12 @@ class StoreExecutor(TaskExecutor): # 2) Ottieni la cartella o la lista delle cartelle sulla 'store' sul transf. node # 3) Avvia la copia delle cartelle con rsync da transf. node a frontend tape # 4) A copia finita, se tutto ok, rimuovi i dati sul tn e setta async_trans a true sul db self.copyData() if self.copyData(): self.cleanup() self.update() if self.destQueue.len() == self.maxTerminatedJobs: self.update("OK") else: self.update("ERROR") if self.destQueue.len() >= self.maxTerminatedJobs: self.destQueue.extractJob() self.destQueue.insertJob(self.jobObj) self.srcQueue.extractJob() Loading