Loading client/vos_storage +33 −10 Original line number Diff line number Diff line Loading @@ -28,6 +28,8 @@ class VOSStorage(RedisRPCClient): storageType = None storageBasePath = None storageHostname = None tapePool = None tapePoolList = [] while storageType not in ("cold", "hot"): try: storageType = input("\nStorage type ['cold' or 'hot']: ") Loading @@ -37,6 +39,26 @@ class VOSStorage(RedisRPCClient): print("\nPlease, use CTRL+C to quit.") except KeyboardInterrupt: sys.exit("\nCTRL+C detected. Exiting...") if storageType == "cold": storageRequest = { "requestType": "TAPE_POOL_LST" } storageResponse = self.call(storageRequest) if "responseType" not in storageResponse: sys.exit("FATAL: Malformed response, storage acknowledge expected.\n") elif storageResponse["responseType"] == "TAPE_POOL_LST_DONE": tapePoolList = storageResponse["tapePoolList"] while tapePool not in tapePoolList: print("\nSelect one of the available tape pools:") print("\n" + tabulate(tapePoolList, headers = ["tape_pool"], tablefmt = "pretty") + "\n") try: tapePool = input("Please, insert a tape pool: ") except ValueError: print("Input type is not valid!") except EOFError: print("\nPlease, use CTRL+C to quit.") except KeyboardInterrupt: sys.exit("\nCTRL+C detected. Exiting...") else: sys.exit("\nFATAL: Unknown response type.\n") while not storageBasePath: try: storageBasePath = input("\nStorage base path: ") Loading @@ -58,7 +80,8 @@ class VOSStorage(RedisRPCClient): storageRequest = { "requestType": "STORAGE_ADD", "storageType": storageType, "basePath": storageBasePath, "hostname": storageHostname } "hostname": storageHostname, "tapePool": tapePool } storageResponse = self.call(storageRequest) if "responseType" not in storageResponse: Loading transfer_service/config/vos_ts.conf.sample +0 −2 Original line number Diff line number Diff line Loading @@ -40,8 +40,6 @@ port = 22 user = root ; SSH private key file absolute path pkey_file_path = /root/.ssh/tape_rsa ; tape pool name tape_pool = pl_generic_rw_01 ############################ Loading transfer_service/db_connector.py +35 −32 Original line number Diff line number Diff line Loading @@ -171,7 +171,7 @@ class DbConnector(object): conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" SELECT storage_type, base_path, user_name, tstamp_wrapper_dir, '/' || fs_path AS os_path, content_length SELECT storage_type, base_path, user_name, tstamp_wrapper_dir, '/' || fs_path AS os_path, content_length, tape_pool FROM node n JOIN location l ON n.location_id = l.location_id JOIN storage s ON s.storage_id = l.storage_src_id Loading @@ -192,6 +192,7 @@ class DbConnector(object): tstampWrappedDir = result[0]["tstamp_wrapper_dir"] osPath = result[0]["os_path"] contentLength = result[0]["content_length"] tapePool = result[0]["tape_pool"] if tstampWrappedDir is None: baseSrcPath = basePath + "/" + userName else: Loading @@ -203,7 +204,8 @@ class DbConnector(object): "storageType": storageType, "username": userName, "osPath": osPath, "contentLength": contentLength "contentLength": contentLength, "tapePool": tapePool } return fileInfo finally: Loading Loading @@ -1197,7 +1199,7 @@ class DbConnector(object): ##### Storage ##### def insertStorage(self, storageType, storageBasePath, storageHostname, vospaceUserBasePath): def insertStorage(self, storageType, storageBasePath, storageHostname, vospaceUserBasePath, tapePool = None): """Inserts a storage point.""" if not self.getStorageId(storageBasePath): try: Loading @@ -1206,8 +1208,9 @@ class DbConnector(object): cursor.execute(""" INSERT INTO storage(storage_type, base_path, hostname) VALUES (%s, %s, %s) hostname, tape_pool) VALUES (%s, %s, %s, %s) RETURNING storage_id; """, (storageType, Loading transfer_service/retrieve_executor.py +46 −43 Original line number Diff line number Diff line Loading @@ -72,12 +72,12 @@ class RetrieveExecutor(TaskExecutor): 1, self.logger) params = config.loadSection("spectrum_archive") self.tapePool = params["tape_pool"] self.tapeClient = TapeClient(params["host"], params.getint("port"), params["user"], params["pkey_file_path"], self.logger) self.tapePool = None self.storageType = None self.jobObj = None self.jobId = None Loading Loading @@ -116,7 +116,10 @@ class RetrieveExecutor(TaskExecutor): # Obtain the storage type try: self.storageType = self.dbConn.getOSPath(self.nodeList[0])["storageType"] #self.storageType = self.dbConn.getOSPath(self.nodeList[0])["storageType"] fileInfo = self.dbConn.getOSPath(self.nodeList[0]) self.storageType = fileInfo["storageType"] self.tapePool = fileInfo["tapePool"] except Exception: self.logger.exception("FATAL: unable to obtain the storage type.") return False Loading transfer_service/storage_rpc_server.py +23 −2 Original line number Diff line number Diff line Loading @@ -12,6 +12,7 @@ from redis_log_handler import RedisLogHandler from redis_rpc_server import RedisRPCServer from config import Config from db_connector import DbConnector from tape_client import TapeClient class StorageRPCServer(RedisRPCServer): Loading Loading @@ -39,6 +40,12 @@ class StorageRPCServer(RedisRPCServer): 1, 2, self.logger) params = config.loadSection("spectrum_archive") self.tapeClient = TapeClient(params["host"], params.getint("port"), params["user"], params["pkey_file_path"], self.logger) super(StorageRPCServer, self).__init__(host, port, db, rpcQueue) def callback(self, requestBody): Loading Loading @@ -126,12 +133,26 @@ class StorageRPCServer(RedisRPCServer): else: response = { "responseType": "STORAGE_LST_DONE", "storageList": result } elif requestBody["requestType"] == "TAPE_POOL_LST": try: self.tapeClient.connect() tapePools = self.tapeClient.getPoolList() self.tapeClient.disconnect() tapePoolList = [ p.getName() for p in tapePools ] except Exception: errorMsg = "Unable to get tape pool list." self.logger.exception(errorMsg) response = { "responseType": "ERROR", "errorCode": 6, "errorMsg": errorMsg } else: response = { "responseType": "TAPE_POOL_LST_DONE", "tapePoolList": tapePoolList } else: errorMsg = "Unkown request type." self.logger.error(errorMsg) response = { "responseType": "ERROR", "errorCode": 6, "errorCode": 7, "errorMsg": errorMsg } return response Loading Loading
client/vos_storage +33 −10 Original line number Diff line number Diff line Loading @@ -28,6 +28,8 @@ class VOSStorage(RedisRPCClient): storageType = None storageBasePath = None storageHostname = None tapePool = None tapePoolList = [] while storageType not in ("cold", "hot"): try: storageType = input("\nStorage type ['cold' or 'hot']: ") Loading @@ -37,6 +39,26 @@ class VOSStorage(RedisRPCClient): print("\nPlease, use CTRL+C to quit.") except KeyboardInterrupt: sys.exit("\nCTRL+C detected. Exiting...") if storageType == "cold": storageRequest = { "requestType": "TAPE_POOL_LST" } storageResponse = self.call(storageRequest) if "responseType" not in storageResponse: sys.exit("FATAL: Malformed response, storage acknowledge expected.\n") elif storageResponse["responseType"] == "TAPE_POOL_LST_DONE": tapePoolList = storageResponse["tapePoolList"] while tapePool not in tapePoolList: print("\nSelect one of the available tape pools:") print("\n" + tabulate(tapePoolList, headers = ["tape_pool"], tablefmt = "pretty") + "\n") try: tapePool = input("Please, insert a tape pool: ") except ValueError: print("Input type is not valid!") except EOFError: print("\nPlease, use CTRL+C to quit.") except KeyboardInterrupt: sys.exit("\nCTRL+C detected. Exiting...") else: sys.exit("\nFATAL: Unknown response type.\n") while not storageBasePath: try: storageBasePath = input("\nStorage base path: ") Loading @@ -58,7 +80,8 @@ class VOSStorage(RedisRPCClient): storageRequest = { "requestType": "STORAGE_ADD", "storageType": storageType, "basePath": storageBasePath, "hostname": storageHostname } "hostname": storageHostname, "tapePool": tapePool } storageResponse = self.call(storageRequest) if "responseType" not in storageResponse: Loading
transfer_service/config/vos_ts.conf.sample +0 −2 Original line number Diff line number Diff line Loading @@ -40,8 +40,6 @@ port = 22 user = root ; SSH private key file absolute path pkey_file_path = /root/.ssh/tape_rsa ; tape pool name tape_pool = pl_generic_rw_01 ############################ Loading
transfer_service/db_connector.py +35 −32 Original line number Diff line number Diff line Loading @@ -171,7 +171,7 @@ class DbConnector(object): conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" SELECT storage_type, base_path, user_name, tstamp_wrapper_dir, '/' || fs_path AS os_path, content_length SELECT storage_type, base_path, user_name, tstamp_wrapper_dir, '/' || fs_path AS os_path, content_length, tape_pool FROM node n JOIN location l ON n.location_id = l.location_id JOIN storage s ON s.storage_id = l.storage_src_id Loading @@ -192,6 +192,7 @@ class DbConnector(object): tstampWrappedDir = result[0]["tstamp_wrapper_dir"] osPath = result[0]["os_path"] contentLength = result[0]["content_length"] tapePool = result[0]["tape_pool"] if tstampWrappedDir is None: baseSrcPath = basePath + "/" + userName else: Loading @@ -203,7 +204,8 @@ class DbConnector(object): "storageType": storageType, "username": userName, "osPath": osPath, "contentLength": contentLength "contentLength": contentLength, "tapePool": tapePool } return fileInfo finally: Loading Loading @@ -1197,7 +1199,7 @@ class DbConnector(object): ##### Storage ##### def insertStorage(self, storageType, storageBasePath, storageHostname, vospaceUserBasePath): def insertStorage(self, storageType, storageBasePath, storageHostname, vospaceUserBasePath, tapePool = None): """Inserts a storage point.""" if not self.getStorageId(storageBasePath): try: Loading @@ -1206,8 +1208,9 @@ class DbConnector(object): cursor.execute(""" INSERT INTO storage(storage_type, base_path, hostname) VALUES (%s, %s, %s) hostname, tape_pool) VALUES (%s, %s, %s, %s) RETURNING storage_id; """, (storageType, Loading
transfer_service/retrieve_executor.py +46 −43 Original line number Diff line number Diff line Loading @@ -72,12 +72,12 @@ class RetrieveExecutor(TaskExecutor): 1, self.logger) params = config.loadSection("spectrum_archive") self.tapePool = params["tape_pool"] self.tapeClient = TapeClient(params["host"], params.getint("port"), params["user"], params["pkey_file_path"], self.logger) self.tapePool = None self.storageType = None self.jobObj = None self.jobId = None Loading Loading @@ -116,7 +116,10 @@ class RetrieveExecutor(TaskExecutor): # Obtain the storage type try: self.storageType = self.dbConn.getOSPath(self.nodeList[0])["storageType"] #self.storageType = self.dbConn.getOSPath(self.nodeList[0])["storageType"] fileInfo = self.dbConn.getOSPath(self.nodeList[0]) self.storageType = fileInfo["storageType"] self.tapePool = fileInfo["tapePool"] except Exception: self.logger.exception("FATAL: unable to obtain the storage type.") return False Loading
transfer_service/storage_rpc_server.py +23 −2 Original line number Diff line number Diff line Loading @@ -12,6 +12,7 @@ from redis_log_handler import RedisLogHandler from redis_rpc_server import RedisRPCServer from config import Config from db_connector import DbConnector from tape_client import TapeClient class StorageRPCServer(RedisRPCServer): Loading Loading @@ -39,6 +40,12 @@ class StorageRPCServer(RedisRPCServer): 1, 2, self.logger) params = config.loadSection("spectrum_archive") self.tapeClient = TapeClient(params["host"], params.getint("port"), params["user"], params["pkey_file_path"], self.logger) super(StorageRPCServer, self).__init__(host, port, db, rpcQueue) def callback(self, requestBody): Loading Loading @@ -126,12 +133,26 @@ class StorageRPCServer(RedisRPCServer): else: response = { "responseType": "STORAGE_LST_DONE", "storageList": result } elif requestBody["requestType"] == "TAPE_POOL_LST": try: self.tapeClient.connect() tapePools = self.tapeClient.getPoolList() self.tapeClient.disconnect() tapePoolList = [ p.getName() for p in tapePools ] except Exception: errorMsg = "Unable to get tape pool list." self.logger.exception(errorMsg) response = { "responseType": "ERROR", "errorCode": 6, "errorMsg": errorMsg } else: response = { "responseType": "TAPE_POOL_LST_DONE", "tapePoolList": tapePoolList } else: errorMsg = "Unkown request type." self.logger.error(errorMsg) response = { "responseType": "ERROR", "errorCode": 6, "errorCode": 7, "errorMsg": errorMsg } return response Loading