Loading transfer_service/exceptions.py +14 −1 Original line number Diff line number Diff line Loading @@ -10,7 +10,7 @@ class Error(Exception): class TarFileCreationException(Error): def __init__(self, folder): self.message = "Error: cannot create a .tar for " + folder super(MultipleUsersException, self).__init__(self.message) super(TarFileCreationException, self).__init__(self.message) # RapClient exceptions Loading @@ -19,3 +19,16 @@ class MultipleUsersException(Error): def __init__(self): self.message = "Multiple users found with the same email address." super(MultipleUsersException, self).__init__(self.message) # TapeClient exceptions class TapeClientException(Error): def __init__(self, cmd, exitCode, errorMsg): self.message = f"{cmd}\nexitCode: {exitCode}\nerrorMsg: {errorMsg}\n" super(TapeClientException, self).__init__(self.message) class ScpInvalidFileException(Error): def __init__(self): self.message = "Error: invalid file or directory." super(ScpInvalidFileException, self).__init__(self.message) transfer_service/tape_client.py +163 −108 Original line number Diff line number Diff line Loading @@ -9,15 +9,17 @@ import sys import uuid from config import Config from exceptions import ScpInvalidFileException from exceptions import TapeClientException from redis_log_handler import RedisLogHandler from tape_pool import TapePool from tape_task import TapeTask class TapeClient(object): # 'eeadm' command location on the tape library frontend EEADM = "/opt/ibm/ltfsee/bin/eeadm" # Constructor def __init__(self, host, port, user, keyFile, logger): self.host = host self.port = port Loading @@ -25,22 +27,37 @@ class TapeClient(object): self.logger = logger self.client = paramiko.SSHClient() self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.key = paramiko.RSAKey.from_private_key_file(keyFile) self.client.load_system_host_keys() self.keyFile = keyFile #self.key = paramiko.RSAKey.from_private_key_file(keyFile) #self.client.load_system_host_keys() self.scp = None self.taskList = [] self.poolList = [] def connect(self): """Connects to the tape library frontend.""" try: self.key = paramiko.RSAKey.from_private_key_file(self.keyFile) self.client.load_system_host_keys() self.client.connect(hostname = self.host, port = self.port, username = self.user, pkey = self.key) except Exception: self.logger.exception("Unable to establish SSH connection with tape library frontend.") raise finally: self.client.disconnect() def getPoolList(self): """Returns a list of 'TapePool' objects.""" stdin, stdout, stderr = self.client.exec_command(f"{self.EEADM} pool list --json") cmd = f"{self.EEADM} pool list --json" try: stdin, stdout, stderr = self.client.exec_command(cmd) except Exception: self.logger.exception("Unable to execute command: '{cmd}'") raise else: exitCode = stdout.channel.recv_exit_status() if not exitCode: result = json.loads(stdout.readlines()[0].rstrip('\n')) Loading Loading @@ -72,17 +89,20 @@ class TapeClient(object): self.poolList.append(pool) return self.poolList.copy() else: sys.exit("cmd_exit_code = FAILURE") raise TapeClientException(cmd, exitCode, stderr) def getTaskList(self): """Returns the whole task list.""" stdin, stdout, stderr = self.client.exec_command(f"{self.EEADM} task list --json") cmd = f"{self.EEADM} task list --json" try: stdin, stdout, stderr = self.client.exec_command(cmd) except Exception: self.logger.exception("Unable to execute command: '{cmd}'") raise else: exitCode = stdout.channel.recv_exit_status() if not exitCode: result = json.loads(stdout.readlines()[0].rstrip('\n')) #print(result) #print(len(result["payload"])) #print(result["payload"][0]) for el in result["payload"]: task = TapeTask() task.inUseTapes = el["inuse_tapes"] Loading @@ -102,18 +122,25 @@ class TapeClient(object): self.taskList.append(task) return self.taskList.copy() else: sys.exit("cmd_exit_code = FAILURE") raise TapeClientException(cmd, exitCode, stderr) def copy(self, srcPath, destPath): """Copies files/dirs recursively by passing their absolute paths.""" try: self.scp = scp.SCPClient(self.client.get_transport()) print(f"Copying {srcPath} in {destPath}") except Exception: self.logger.error("Unable to get transport from SSH client.") raise else: self.logger.debug(f"Copying {srcPath} in {destPath}") if os.path.isdir(srcPath): self.scp.put(srcPath, recursive = True, remote_path = destPath) elif os.path.isfile(srcPath): self.scp.put(srcPath, destPath) else: sys.exit("FATAL: invalid file/dir.") self.logger.error("FATAL: invalid file/dir.") raise ScpInvalidFileException finally: self.scp.close() def migrate(self, fileList, tapePool): Loading @@ -121,58 +148,86 @@ class TapeClient(object): Migrates to tape all files whose absolute path is contained in 'fileList'. """ self.logger.debug("Starting MIGRATE operation...") self.logger.info("Starting MIGRATE operation...") tmp = str(uuid.uuid1().hex) + "-vos_migrate.tmp" try: fp = open(tmp, "a") except IOError: raise else: for f in fileList: fp.write(f"{f}\n") fp.close() self.copy(f"./{tmp}", f"/tmp/{tmp}") os.remove(f"./{tmp}") cmd = f"{self.EEADM} migrate /tmp/{tmp} -p {tapePool}" try: stdin, stdout, stderr = self.client.exec_command(cmd) except Exception: self.logger.exception("Unable to execute command: '{cmd}'") raise else: exitCode = stdout.channel.recv_exit_status() if not exitCode: self.logger.debug("MIGRATE operation COMPLETED.") self.logger.info("MIGRATE operation COMPLETED.") else: self.logger.debug("MIGRATE operation FAILED.") self.logger.error("MIGRATE operation FAILED.") raise TapeClientException(cmd, exitCode, stderr) return exitCode finally: fp.close() def recall(self, fileList): """ Recalls from tape all files whose absolute path is contained in 'fileList'. """ self.logger.debug("Starting RECALL operation...") self.logger.info("Starting RECALL operation...") tmp = str(uuid.uuid1().hex) + "-vos_recall.tmp" try: fp = open(tmp, "a") except IOError: raise else: for f in fileList: fp.write(f"{f}\n") fp.close() self.copy(f"./{tmp}", f"/tmp/{tmp}") os.remove(f"./{tmp}") cmd = f"{self.EEADM} recall /tmp/{tmp}" try: stdin, stdout, stderr = self.client.exec_command(cmd) except Exception: self.logger.exception("Unable to execute command: '{cmd}'") raise else: exitCode = stdout.channel.recv_exit_status() if not exitCode: self.logger.debug("RECALL operation COMPLETED.") self.logger.info("RECALL operation COMPLETED.") else: self.logger.debug("RECALL operation FAILED.") self.logger.error("RECALL operation FAILED.") raise TapeClientException(cmd, exitCode, stderr) return exitCode finally: fp.close() def recallChecksumFiles(self, dirName): """ Recursively recalls from tape all the checksum files related to the 'dirName' directory. """ self.logger.debug("Starting RECALL_CHECKSUM operation...") self.logger.info("Starting RECALL_CHECKSUM operation...") cmd = f"find $(dirname {dirName}) -type f \( -iname \"*-md5sum.txt\" \) | {self.EEADM} recall" try: stdin, stdout, stderr = self.client.exec_command(cmd) except Exception: self.logger.exception("Unable to execute command: '{cmd}'") raise else: exitCode = stdout.channel.recv_exit_status() if not exitCode: self.logger.debug("RECALL_CHECKSUM operation COMPLETED.") self.logger.info("RECALL_CHECKSUM operation COMPLETED.") else: self.logger.debug("RECALL_CHECKSUM operation FAILED.") self.logger.error("RECALL_CHECKSUM operation FAILED.") raise TapeClientException(cmd, exitCode, stderr) return exitCode def disconnect(self): Loading Loading
transfer_service/exceptions.py +14 −1 Original line number Diff line number Diff line Loading @@ -10,7 +10,7 @@ class Error(Exception): class TarFileCreationException(Error): def __init__(self, folder): self.message = "Error: cannot create a .tar for " + folder super(MultipleUsersException, self).__init__(self.message) super(TarFileCreationException, self).__init__(self.message) # RapClient exceptions Loading @@ -19,3 +19,16 @@ class MultipleUsersException(Error): def __init__(self): self.message = "Multiple users found with the same email address." super(MultipleUsersException, self).__init__(self.message) # TapeClient exceptions class TapeClientException(Error): def __init__(self, cmd, exitCode, errorMsg): self.message = f"{cmd}\nexitCode: {exitCode}\nerrorMsg: {errorMsg}\n" super(TapeClientException, self).__init__(self.message) class ScpInvalidFileException(Error): def __init__(self): self.message = "Error: invalid file or directory." super(ScpInvalidFileException, self).__init__(self.message)
transfer_service/tape_client.py +163 −108 Original line number Diff line number Diff line Loading @@ -9,15 +9,17 @@ import sys import uuid from config import Config from exceptions import ScpInvalidFileException from exceptions import TapeClientException from redis_log_handler import RedisLogHandler from tape_pool import TapePool from tape_task import TapeTask class TapeClient(object): # 'eeadm' command location on the tape library frontend EEADM = "/opt/ibm/ltfsee/bin/eeadm" # Constructor def __init__(self, host, port, user, keyFile, logger): self.host = host self.port = port Loading @@ -25,22 +27,37 @@ class TapeClient(object): self.logger = logger self.client = paramiko.SSHClient() self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.key = paramiko.RSAKey.from_private_key_file(keyFile) self.client.load_system_host_keys() self.keyFile = keyFile #self.key = paramiko.RSAKey.from_private_key_file(keyFile) #self.client.load_system_host_keys() self.scp = None self.taskList = [] self.poolList = [] def connect(self): """Connects to the tape library frontend.""" try: self.key = paramiko.RSAKey.from_private_key_file(self.keyFile) self.client.load_system_host_keys() self.client.connect(hostname = self.host, port = self.port, username = self.user, pkey = self.key) except Exception: self.logger.exception("Unable to establish SSH connection with tape library frontend.") raise finally: self.client.disconnect() def getPoolList(self): """Returns a list of 'TapePool' objects.""" stdin, stdout, stderr = self.client.exec_command(f"{self.EEADM} pool list --json") cmd = f"{self.EEADM} pool list --json" try: stdin, stdout, stderr = self.client.exec_command(cmd) except Exception: self.logger.exception("Unable to execute command: '{cmd}'") raise else: exitCode = stdout.channel.recv_exit_status() if not exitCode: result = json.loads(stdout.readlines()[0].rstrip('\n')) Loading Loading @@ -72,17 +89,20 @@ class TapeClient(object): self.poolList.append(pool) return self.poolList.copy() else: sys.exit("cmd_exit_code = FAILURE") raise TapeClientException(cmd, exitCode, stderr) def getTaskList(self): """Returns the whole task list.""" stdin, stdout, stderr = self.client.exec_command(f"{self.EEADM} task list --json") cmd = f"{self.EEADM} task list --json" try: stdin, stdout, stderr = self.client.exec_command(cmd) except Exception: self.logger.exception("Unable to execute command: '{cmd}'") raise else: exitCode = stdout.channel.recv_exit_status() if not exitCode: result = json.loads(stdout.readlines()[0].rstrip('\n')) #print(result) #print(len(result["payload"])) #print(result["payload"][0]) for el in result["payload"]: task = TapeTask() task.inUseTapes = el["inuse_tapes"] Loading @@ -102,18 +122,25 @@ class TapeClient(object): self.taskList.append(task) return self.taskList.copy() else: sys.exit("cmd_exit_code = FAILURE") raise TapeClientException(cmd, exitCode, stderr) def copy(self, srcPath, destPath): """Copies files/dirs recursively by passing their absolute paths.""" try: self.scp = scp.SCPClient(self.client.get_transport()) print(f"Copying {srcPath} in {destPath}") except Exception: self.logger.error("Unable to get transport from SSH client.") raise else: self.logger.debug(f"Copying {srcPath} in {destPath}") if os.path.isdir(srcPath): self.scp.put(srcPath, recursive = True, remote_path = destPath) elif os.path.isfile(srcPath): self.scp.put(srcPath, destPath) else: sys.exit("FATAL: invalid file/dir.") self.logger.error("FATAL: invalid file/dir.") raise ScpInvalidFileException finally: self.scp.close() def migrate(self, fileList, tapePool): Loading @@ -121,58 +148,86 @@ class TapeClient(object): Migrates to tape all files whose absolute path is contained in 'fileList'. """ self.logger.debug("Starting MIGRATE operation...") self.logger.info("Starting MIGRATE operation...") tmp = str(uuid.uuid1().hex) + "-vos_migrate.tmp" try: fp = open(tmp, "a") except IOError: raise else: for f in fileList: fp.write(f"{f}\n") fp.close() self.copy(f"./{tmp}", f"/tmp/{tmp}") os.remove(f"./{tmp}") cmd = f"{self.EEADM} migrate /tmp/{tmp} -p {tapePool}" try: stdin, stdout, stderr = self.client.exec_command(cmd) except Exception: self.logger.exception("Unable to execute command: '{cmd}'") raise else: exitCode = stdout.channel.recv_exit_status() if not exitCode: self.logger.debug("MIGRATE operation COMPLETED.") self.logger.info("MIGRATE operation COMPLETED.") else: self.logger.debug("MIGRATE operation FAILED.") self.logger.error("MIGRATE operation FAILED.") raise TapeClientException(cmd, exitCode, stderr) return exitCode finally: fp.close() def recall(self, fileList): """ Recalls from tape all files whose absolute path is contained in 'fileList'. """ self.logger.debug("Starting RECALL operation...") self.logger.info("Starting RECALL operation...") tmp = str(uuid.uuid1().hex) + "-vos_recall.tmp" try: fp = open(tmp, "a") except IOError: raise else: for f in fileList: fp.write(f"{f}\n") fp.close() self.copy(f"./{tmp}", f"/tmp/{tmp}") os.remove(f"./{tmp}") cmd = f"{self.EEADM} recall /tmp/{tmp}" try: stdin, stdout, stderr = self.client.exec_command(cmd) except Exception: self.logger.exception("Unable to execute command: '{cmd}'") raise else: exitCode = stdout.channel.recv_exit_status() if not exitCode: self.logger.debug("RECALL operation COMPLETED.") self.logger.info("RECALL operation COMPLETED.") else: self.logger.debug("RECALL operation FAILED.") self.logger.error("RECALL operation FAILED.") raise TapeClientException(cmd, exitCode, stderr) return exitCode finally: fp.close() def recallChecksumFiles(self, dirName): """ Recursively recalls from tape all the checksum files related to the 'dirName' directory. """ self.logger.debug("Starting RECALL_CHECKSUM operation...") self.logger.info("Starting RECALL_CHECKSUM operation...") cmd = f"find $(dirname {dirName}) -type f \( -iname \"*-md5sum.txt\" \) | {self.EEADM} recall" try: stdin, stdout, stderr = self.client.exec_command(cmd) except Exception: self.logger.exception("Unable to execute command: '{cmd}'") raise else: exitCode = stdout.channel.recv_exit_status() if not exitCode: self.logger.debug("RECALL_CHECKSUM operation COMPLETED.") self.logger.info("RECALL_CHECKSUM operation COMPLETED.") else: self.logger.debug("RECALL_CHECKSUM operation FAILED.") self.logger.error("RECALL_CHECKSUM operation FAILED.") raise TapeClientException(cmd, exitCode, stderr) return exitCode def disconnect(self): Loading