Loading transfer_service/tape_client.py +39 −94 Original line number Diff line number Diff line Loading @@ -16,13 +16,18 @@ from config import Config from exceptions import ScpInvalidFileException from exceptions import TapeClientException from redis_log_handler import RedisLogHandler from tape_pool import TapePool from tape_task import TapeTask class TapeClient(object): # 'eeadm' command location on the tape library frontend EEADM = "/opt/ibm/ltfsee/bin/eeadm" # 'dsmdf' command location on the tape library frontend DSMDF = "/usr/bin/dsmdf" # 'dsmrecall' command location on the tape library frontend DSMRECALL = "/usr/bin/dsmrecall" # 'dsmmigrate' command location on the tape library frontend DSMMIGRATE = "/usr/bin/dsmmigrate" # destination for the files containing the lists of files to recall or migrate VOSPACE_WD = "/tmp/vospace" Loading @@ -35,8 +40,7 @@ class TapeClient(object): self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.keyFile = keyFile self.scp = None self.taskList = [] self.poolList = [] self.HSMFilesystemList = [] def connect(self): """Connects to the tape library frontend.""" Loading @@ -51,81 +55,6 @@ class TapeClient(object): self.logger.exception("Unable to establish SSH connection with tape library frontend.") raise def getPoolList(self): """Returns a list of 'TapePool' objects.""" cmd = f"{self.EEADM} pool list --json" try: stdin, stdout, stderr = self.client.exec_command(cmd) except Exception: self.logger.exception(f"Unable to execute command: '{cmd}'") raise else: exitCode = stdout.channel.recv_exit_status() if not exitCode: result = json.loads(stdout.readlines()[0].rstrip('\n')) for el in result["payload"]: pool = TapePool() pool.id = el["id"] pool.name = el["name"] pool.mediaRestriction = el["media_restriction"] pool.capacity = el["capacity"] pool.usedSpace = el["used_space"] pool.freeSpace = el["free_space"] pool.reclaimableSpace = el["reclaimable_space"] pool.activeSpace = el["active_space"] pool.nonAppendableSpace = el["non_appendable_space"] pool.numOfTapes = el["num_of_tapes"] pool.formatClass = el["format_class"] pool.libraryName = el["library_name"] pool.libraryId = el["library_id"] pool.nodeGroupName = el["nodegroup_name"] pool.deviceType = el["device_type"] pool.worm = el["worm"] pool.fillPolicy = el["fill_policy"] pool.owner = el["owner"] pool.mountLimit = el["mount_limit"] pool.lowSpaceWarningEnable = el["low_space_warning_enable"] pool.lowSpaceWarningThreshold = el["low_space_warning_threshold"] pool.noSpaceWarningEnable = el["no_space_warning_enable"] pool.mode = el["mode"] self.poolList.append(pool) return self.poolList.copy() else: raise TapeClientException(cmd, exitCode, stderr) def getTaskList(self): """Returns the whole task list.""" cmd = f"{self.EEADM} task list --json" try: stdin, stdout, stderr = self.client.exec_command(cmd) except Exception: self.logger.exception(f"Unable to execute command: '{cmd}'") raise else: exitCode = stdout.channel.recv_exit_status() if not exitCode: result = json.loads(stdout.readlines()[0].rstrip('\n')) for el in result["payload"]: task = TapeTask() task.inUseTapes = el["inuse_tapes"] task.inUsePools = el["inuse_pools"] task.inUseNodeGroups = el["inuse_node_groups"] task.inUseDrives = el["inuse_drives"] task.cmdParam = el["cmd_param"] task.result = el["result"] task.status = el["status"] task.completedTime = el["completed_time"] task.startedTime = el["started_time"] task.createdTime = el["created_time"] task.setInUseLibs = el["inuse_libs"] task.type = el["type"] task.taskId = el["task_id"] task.id = el["id"] self.taskList.append(task) return self.taskList.copy() else: raise TapeClientException(cmd, exitCode, stderr) def copy(self, srcPath, destPath): """Copies files/dirs recursively by passing their absolute paths.""" try: Loading @@ -145,14 +74,31 @@ class TapeClient(object): finally: self.scp.close() def migrate(self, fileList, tapePool, jobId): def getHSMFilesystemList(self): """Returns a list containing all the available HSM filesystems.""" cmd = f"{self.DSMDF} -detail | grep \"HSM Filesystem\" | awk '{ print $3 }'" try: stdin, stdout, stderr = self.client.exec_command(cmd) except Exception: self.logger.exception(f"Unable to execute command: '{cmd}'") raise else: exitCode = stdout.channel.recv_exit_status() if not exitCode: result = stdout.readlines()[0].rstrip('\n') self.HSMFilesystemList = result.splitlines() return self.HSMFilesystemList.copy() else: raise TapeClientException(cmd, exitCode, stderr) def migrate(self, fileList, tapeHSMFilesystem, jobId): """ Migrates to tape all files whose absolute path is contained in 'fileList'. A tape pool and a VOSpace jobId are also required as parameters. A HSM filesystem and a VOSpace jobId are also required as parameters. """ self.logger.info(f"Starting MIGRATE operation (tape pool = '{tapePool}')...") self.logger.info(f"Starting MIGRATE operation (tape HSM filesystem = '{tapeHSMFilesystem}')...") migrateFileList = f"vos_migrate-{jobId}.lst" try: fp = open(migrateFileList, "a") Loading @@ -164,7 +110,7 @@ class TapeClient(object): fp.close() self.copy(f"./{migrateFileList}", f"{self.VOSPACE_WD}/{migrateFileList}") os.remove(f"./{migrateFileList}") cmd = f"{self.EEADM} migrate {self.VOSPACE_WD}/{migrateFileList} -p {tapePool} > /dev/null 2>&1" cmd = f"{self.DSMMIGRATE} -filelist={self.VOSPACE_WD}/{migrateFileList} > /dev/null 2>&1" try: stdin, stdout, stderr = self.client.exec_command(cmd) except Exception: Loading @@ -172,7 +118,7 @@ class TapeClient(object): raise else: exitCode = stdout.channel.recv_exit_status() if not exitCode: if exitCode == 0 or exitCode == 4: self.logger.info("MIGRATE operation COMPLETED.") else: self.logger.error("MIGRATE operation FAILED.") Loading @@ -197,7 +143,7 @@ class TapeClient(object): fp.close() self.copy(f"./{recallFileList}", f"{self.VOSPACE_WD}/{recallFileList}") os.remove(f"./{recallFileList}") cmd = f"{self.EEADM} recall {self.VOSPACE_WD}/{recallFileList} > /dev/null 2>&1" cmd = f"{self.DSMRECALL} -filelist={self.VOSPACE_WD}/{recallFileList} > /dev/null 2>&1" try: stdin, stdout, stderr = self.client.exec_command(cmd) except Exception: Loading @@ -205,7 +151,7 @@ class TapeClient(object): raise else: exitCode = stdout.channel.recv_exit_status() if not exitCode: if exitCode == 0 or exitCode == 4: self.logger.info("RECALL operation COMPLETED.") else: self.logger.error("RECALL operation FAILED.") Loading @@ -218,7 +164,8 @@ class TapeClient(object): the 'dirName' directory. """ self.logger.info("Starting RECALL_CHECKSUM operation...") cmd = f"find $(dirname {dirName}) -type f \( -iname \"*-md5sum.txt\" \) | {self.EEADM} recall > /dev/null 2>&1" checksumFileList = "vos_recall_checksum_files-{jobId}.lst" cmd = f"find $(dirname {dirName}) -type f \( -iname \"*-md5sum.txt\" \) > {self.VOSPACE_WD}/{checksumFileList} && {self.DSMRECALL} -filelist={self.VOSPACE_WD}/{checksumFileList} > /dev/null 2>&1" try: stdin, stdout, stderr = self.client.exec_command(cmd) except Exception: Loading @@ -226,7 +173,7 @@ class TapeClient(object): raise else: exitCode = stdout.channel.recv_exit_status() if not exitCode: if exitCode == 0 or exitCode == 4: self.logger.info("RECALL_CHECKSUM operation COMPLETED.") else: self.logger.error("RECALL_CHECKSUM operation FAILED.") Loading @@ -235,8 +182,7 @@ class TapeClient(object): def disconnect(self): """Performs a cleanup and closes the connection.""" self.taskList.clear() self.poolList.clear() self.HSMFilesystemList.clear() self.client.close() def getSize(self, fsMountPoint): Loading @@ -261,7 +207,6 @@ class TapeClient(object): #tc.connect() #tc.copy("/home/curban/store/mydir", "/home/mydir") #tc.copy("/home/curban/store/foo2.txt", "/home/mydir/foo2.txt") #tl = tc.getTaskList() #fsSize = tc.getSize("/ia2_tape_stb_01") #print(fsSize) #tc.disconnect() Loading Loading
transfer_service/tape_client.py +39 −94 Original line number Diff line number Diff line Loading @@ -16,13 +16,18 @@ from config import Config from exceptions import ScpInvalidFileException from exceptions import TapeClientException from redis_log_handler import RedisLogHandler from tape_pool import TapePool from tape_task import TapeTask class TapeClient(object): # 'eeadm' command location on the tape library frontend EEADM = "/opt/ibm/ltfsee/bin/eeadm" # 'dsmdf' command location on the tape library frontend DSMDF = "/usr/bin/dsmdf" # 'dsmrecall' command location on the tape library frontend DSMRECALL = "/usr/bin/dsmrecall" # 'dsmmigrate' command location on the tape library frontend DSMMIGRATE = "/usr/bin/dsmmigrate" # destination for the files containing the lists of files to recall or migrate VOSPACE_WD = "/tmp/vospace" Loading @@ -35,8 +40,7 @@ class TapeClient(object): self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.keyFile = keyFile self.scp = None self.taskList = [] self.poolList = [] self.HSMFilesystemList = [] def connect(self): """Connects to the tape library frontend.""" Loading @@ -51,81 +55,6 @@ class TapeClient(object): self.logger.exception("Unable to establish SSH connection with tape library frontend.") raise def getPoolList(self): """Returns a list of 'TapePool' objects.""" cmd = f"{self.EEADM} pool list --json" try: stdin, stdout, stderr = self.client.exec_command(cmd) except Exception: self.logger.exception(f"Unable to execute command: '{cmd}'") raise else: exitCode = stdout.channel.recv_exit_status() if not exitCode: result = json.loads(stdout.readlines()[0].rstrip('\n')) for el in result["payload"]: pool = TapePool() pool.id = el["id"] pool.name = el["name"] pool.mediaRestriction = el["media_restriction"] pool.capacity = el["capacity"] pool.usedSpace = el["used_space"] pool.freeSpace = el["free_space"] pool.reclaimableSpace = el["reclaimable_space"] pool.activeSpace = el["active_space"] pool.nonAppendableSpace = el["non_appendable_space"] pool.numOfTapes = el["num_of_tapes"] pool.formatClass = el["format_class"] pool.libraryName = el["library_name"] pool.libraryId = el["library_id"] pool.nodeGroupName = el["nodegroup_name"] pool.deviceType = el["device_type"] pool.worm = el["worm"] pool.fillPolicy = el["fill_policy"] pool.owner = el["owner"] pool.mountLimit = el["mount_limit"] pool.lowSpaceWarningEnable = el["low_space_warning_enable"] pool.lowSpaceWarningThreshold = el["low_space_warning_threshold"] pool.noSpaceWarningEnable = el["no_space_warning_enable"] pool.mode = el["mode"] self.poolList.append(pool) return self.poolList.copy() else: raise TapeClientException(cmd, exitCode, stderr) def getTaskList(self): """Returns the whole task list.""" cmd = f"{self.EEADM} task list --json" try: stdin, stdout, stderr = self.client.exec_command(cmd) except Exception: self.logger.exception(f"Unable to execute command: '{cmd}'") raise else: exitCode = stdout.channel.recv_exit_status() if not exitCode: result = json.loads(stdout.readlines()[0].rstrip('\n')) for el in result["payload"]: task = TapeTask() task.inUseTapes = el["inuse_tapes"] task.inUsePools = el["inuse_pools"] task.inUseNodeGroups = el["inuse_node_groups"] task.inUseDrives = el["inuse_drives"] task.cmdParam = el["cmd_param"] task.result = el["result"] task.status = el["status"] task.completedTime = el["completed_time"] task.startedTime = el["started_time"] task.createdTime = el["created_time"] task.setInUseLibs = el["inuse_libs"] task.type = el["type"] task.taskId = el["task_id"] task.id = el["id"] self.taskList.append(task) return self.taskList.copy() else: raise TapeClientException(cmd, exitCode, stderr) def copy(self, srcPath, destPath): """Copies files/dirs recursively by passing their absolute paths.""" try: Loading @@ -145,14 +74,31 @@ class TapeClient(object): finally: self.scp.close() def migrate(self, fileList, tapePool, jobId): def getHSMFilesystemList(self): """Returns a list containing all the available HSM filesystems.""" cmd = f"{self.DSMDF} -detail | grep \"HSM Filesystem\" | awk '{ print $3 }'" try: stdin, stdout, stderr = self.client.exec_command(cmd) except Exception: self.logger.exception(f"Unable to execute command: '{cmd}'") raise else: exitCode = stdout.channel.recv_exit_status() if not exitCode: result = stdout.readlines()[0].rstrip('\n') self.HSMFilesystemList = result.splitlines() return self.HSMFilesystemList.copy() else: raise TapeClientException(cmd, exitCode, stderr) def migrate(self, fileList, tapeHSMFilesystem, jobId): """ Migrates to tape all files whose absolute path is contained in 'fileList'. A tape pool and a VOSpace jobId are also required as parameters. A HSM filesystem and a VOSpace jobId are also required as parameters. """ self.logger.info(f"Starting MIGRATE operation (tape pool = '{tapePool}')...") self.logger.info(f"Starting MIGRATE operation (tape HSM filesystem = '{tapeHSMFilesystem}')...") migrateFileList = f"vos_migrate-{jobId}.lst" try: fp = open(migrateFileList, "a") Loading @@ -164,7 +110,7 @@ class TapeClient(object): fp.close() self.copy(f"./{migrateFileList}", f"{self.VOSPACE_WD}/{migrateFileList}") os.remove(f"./{migrateFileList}") cmd = f"{self.EEADM} migrate {self.VOSPACE_WD}/{migrateFileList} -p {tapePool} > /dev/null 2>&1" cmd = f"{self.DSMMIGRATE} -filelist={self.VOSPACE_WD}/{migrateFileList} > /dev/null 2>&1" try: stdin, stdout, stderr = self.client.exec_command(cmd) except Exception: Loading @@ -172,7 +118,7 @@ class TapeClient(object): raise else: exitCode = stdout.channel.recv_exit_status() if not exitCode: if exitCode == 0 or exitCode == 4: self.logger.info("MIGRATE operation COMPLETED.") else: self.logger.error("MIGRATE operation FAILED.") Loading @@ -197,7 +143,7 @@ class TapeClient(object): fp.close() self.copy(f"./{recallFileList}", f"{self.VOSPACE_WD}/{recallFileList}") os.remove(f"./{recallFileList}") cmd = f"{self.EEADM} recall {self.VOSPACE_WD}/{recallFileList} > /dev/null 2>&1" cmd = f"{self.DSMRECALL} -filelist={self.VOSPACE_WD}/{recallFileList} > /dev/null 2>&1" try: stdin, stdout, stderr = self.client.exec_command(cmd) except Exception: Loading @@ -205,7 +151,7 @@ class TapeClient(object): raise else: exitCode = stdout.channel.recv_exit_status() if not exitCode: if exitCode == 0 or exitCode == 4: self.logger.info("RECALL operation COMPLETED.") else: self.logger.error("RECALL operation FAILED.") Loading @@ -218,7 +164,8 @@ class TapeClient(object): the 'dirName' directory. """ self.logger.info("Starting RECALL_CHECKSUM operation...") cmd = f"find $(dirname {dirName}) -type f \( -iname \"*-md5sum.txt\" \) | {self.EEADM} recall > /dev/null 2>&1" checksumFileList = "vos_recall_checksum_files-{jobId}.lst" cmd = f"find $(dirname {dirName}) -type f \( -iname \"*-md5sum.txt\" \) > {self.VOSPACE_WD}/{checksumFileList} && {self.DSMRECALL} -filelist={self.VOSPACE_WD}/{checksumFileList} > /dev/null 2>&1" try: stdin, stdout, stderr = self.client.exec_command(cmd) except Exception: Loading @@ -226,7 +173,7 @@ class TapeClient(object): raise else: exitCode = stdout.channel.recv_exit_status() if not exitCode: if exitCode == 0 or exitCode == 4: self.logger.info("RECALL_CHECKSUM operation COMPLETED.") else: self.logger.error("RECALL_CHECKSUM operation FAILED.") Loading @@ -235,8 +182,7 @@ class TapeClient(object): def disconnect(self): """Performs a cleanup and closes the connection.""" self.taskList.clear() self.poolList.clear() self.HSMFilesystemList.clear() self.client.close() def getSize(self, fsMountPoint): Loading @@ -261,7 +207,6 @@ class TapeClient(object): #tc.connect() #tc.copy("/home/curban/store/mydir", "/home/mydir") #tc.copy("/home/curban/store/foo2.txt", "/home/mydir/foo2.txt") #tl = tc.getTaskList() #fsSize = tc.getSize("/ia2_tape_stb_01") #print(fsSize) #tc.disconnect() Loading