Loading client/vos_storage +70 −2 Original line number Diff line number Diff line Loading @@ -83,6 +83,69 @@ class AMQPClient(object): else: sys.exit("\nFATAL: Unknown response type.\n") def delete(self): storageRequest = { "requestType": "STORAGE_DEL_REQ" } storageResponse = self.call(storageRequest) if "responseType" not in storageResponse: sys.exit("FATAL: Malformed response, storage acknowledge expected.\n") elif storageResponse["responseType"] == "STORAGE_DEL_ACK": storageList = storageResponse["storageList"] if not storageList: sys.exit("No storage point found. Please add at least one storage point.\n") print("\nSelect the storage location to remove:\n") print(tabulate(storageList, headers = "keys", tablefmt = "pretty")) print() storageIdList = [] for st in storageList: storageIdList.append(st["storage_id"]) storageId = None while not storageId in storageIdList: try: storageId = input("Please, insert a storage id: ") storageId = int(storageId) except ValueError: print("Input type is not valid!") except EOFError: print("\nPlease, use CTRL+C to quit.") except KeyboardInterrupt: sys.exit("\nCTRL+C detected. Exiting...") print() print("!!!!!!!!!!!!!!!!!!!!!!!!!!WARNING!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") print("! This operation will remove the selected storage location only !") print("! from the database. !") print("! The mount point on the transfer node will not be removed, you !") print("! must do it manually. !") print("! Anyway, you MUST BE AWARE that all the VOSpace nodes affected !") print("! by this operation will not be accessible anymore from now on. !") print("! We strongly recommend to move all the data to another storage !") print("! location and update the database accordingly. !") print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") print() confirm = None while not confirm in ( "yes", "no" ): try: confirm = input("Are you sure to proceed? [yes/no]: ") except KeyboardInterrupt: sys.exit("\nCTRL+C detected. Exiting...") except EOFError: print("\nPlease, use CTRL+C to quit.") if confirm == "yes": confirmRequest = { "requestType": "STORAGE_DEL_CON", "storageId": storageId } confirmResponse = self.call(confirmRequest) if "responseType" not in confirmResponse: sys.exit("\nFATAL: Malformed response, storage confirmation expected.\n") elif confirmResponse["responseType"] == "STORAGE_DEL_DONE": print("\nStorage location deleted successfully!\n") else: sys.exit("FATAL: Unknown response type.\n") elif storeResponse["responseType"] == "ERROR": errorCode = storeResponse["errorCode"] errorMsg = storeResponse["errorMsg"] sys.exit(f"Error code: {errorCode}\nError message: {errorMsg}\n") else: sys.exit("\nFATAL: Unknown response type.\n") def list(self): storageRequest = { "requestType": "STORAGE_LST" } storageResponse = self.call(storageRequest) Loading Loading @@ -117,8 +180,11 @@ DESCRIPTION add adds a storage point to the database. del deletes a storage point from the database. list prints the storage points list. prints the full storage point list. """) # Create new AMQPClient object Loading @@ -134,5 +200,7 @@ if cmd == "add": vosStorageCli.add() elif cmd == "list": vosStorageCli.list() elif cmd == "del": vosStorageCli.delete() else: vosStorageCli.help() transfer_service/storage_amqp_server.py +37 −6 Original line number Diff line number Diff line Loading @@ -19,6 +19,8 @@ class StorageAMQPServer(AMQPServer): self.storageType = None self.storageBasePath = None self.storageHostname = None self.storageId = None self.storageAck = False super(StorageAMQPServer, self).__init__(host, port, queue) def execute_callback(self, requestBody): Loading @@ -27,6 +29,7 @@ class StorageAMQPServer(AMQPServer): response = { "responseType": "ERROR", "errorCode": 1, "errorMsg": "Malformed request, missing parameters." } elif requestBody["requestType"] == "STORAGE_ADD": self.storageType = requestBody["storageType"] self.storageBasePath = requestBody["basePath"] Loading @@ -37,6 +40,7 @@ class StorageAMQPServer(AMQPServer): "errorCode": 2, "errorMsg": "Base path doesn't exist."} return response self.dbConn.connect() result = self.dbConn.insertStorage(self.storageType, self.storageBasePath, Loading @@ -45,14 +49,36 @@ class StorageAMQPServer(AMQPServer): if result: response = { "responseType": "STORAGE_ADD_DONE" } return response else: response = { "responseType": "ERROR", "errorCode": 3, "errorMsg": "Storage point already exists." } return response elif requestBody["requestType"] == "STORAGE_RMV": pass elif requestBody["requestType"] == "STORAGE_DEL_REQ": self.dbConn.connect() result = self.dbConn.getStorageList() self.dbConn.disconnect() response = { "responseType": "STORAGE_DEL_ACK", "storageList": result } self.storageAck = True elif requestBody["requestType"] == "STORAGE_DEL_CON": self.storageId = requestBody["storageId"] if self.storageAck: self.storageAck = False self.dbConn.connect() self.dbConn.deleteStorage(self.storageId) self.dbConn.disconnect() response = { "responseType": "STORAGE_DEL_DONE" } else: response = { "responseType": "ERROR", "errorCode": 4, "errorMsg": "Store request not acknowledged." } elif requestBody["requestType"] == "STORAGE_LST": self.dbConn.connect() result = self.dbConn.getStorageList() Loading @@ -61,6 +87,11 @@ class StorageAMQPServer(AMQPServer): response = { "responseType": "STORAGE_LST_DONE", "storageList": result } else: response = { "responseType": "ERROR", "errorCode": 5, "errorMsg": "Unkown request type." } return response def run(self): Loading Loading
client/vos_storage +70 −2 Original line number Diff line number Diff line Loading @@ -83,6 +83,69 @@ class AMQPClient(object): else: sys.exit("\nFATAL: Unknown response type.\n") def delete(self): storageRequest = { "requestType": "STORAGE_DEL_REQ" } storageResponse = self.call(storageRequest) if "responseType" not in storageResponse: sys.exit("FATAL: Malformed response, storage acknowledge expected.\n") elif storageResponse["responseType"] == "STORAGE_DEL_ACK": storageList = storageResponse["storageList"] if not storageList: sys.exit("No storage point found. Please add at least one storage point.\n") print("\nSelect the storage location to remove:\n") print(tabulate(storageList, headers = "keys", tablefmt = "pretty")) print() storageIdList = [] for st in storageList: storageIdList.append(st["storage_id"]) storageId = None while not storageId in storageIdList: try: storageId = input("Please, insert a storage id: ") storageId = int(storageId) except ValueError: print("Input type is not valid!") except EOFError: print("\nPlease, use CTRL+C to quit.") except KeyboardInterrupt: sys.exit("\nCTRL+C detected. Exiting...") print() print("!!!!!!!!!!!!!!!!!!!!!!!!!!WARNING!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") print("! This operation will remove the selected storage location only !") print("! from the database. !") print("! The mount point on the transfer node will not be removed, you !") print("! must do it manually. !") print("! Anyway, you MUST BE AWARE that all the VOSpace nodes affected !") print("! by this operation will not be accessible anymore from now on. !") print("! We strongly recommend to move all the data to another storage !") print("! location and update the database accordingly. !") print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") print() confirm = None while not confirm in ( "yes", "no" ): try: confirm = input("Are you sure to proceed? [yes/no]: ") except KeyboardInterrupt: sys.exit("\nCTRL+C detected. Exiting...") except EOFError: print("\nPlease, use CTRL+C to quit.") if confirm == "yes": confirmRequest = { "requestType": "STORAGE_DEL_CON", "storageId": storageId } confirmResponse = self.call(confirmRequest) if "responseType" not in confirmResponse: sys.exit("\nFATAL: Malformed response, storage confirmation expected.\n") elif confirmResponse["responseType"] == "STORAGE_DEL_DONE": print("\nStorage location deleted successfully!\n") else: sys.exit("FATAL: Unknown response type.\n") elif storeResponse["responseType"] == "ERROR": errorCode = storeResponse["errorCode"] errorMsg = storeResponse["errorMsg"] sys.exit(f"Error code: {errorCode}\nError message: {errorMsg}\n") else: sys.exit("\nFATAL: Unknown response type.\n") def list(self): storageRequest = { "requestType": "STORAGE_LST" } storageResponse = self.call(storageRequest) Loading Loading @@ -117,8 +180,11 @@ DESCRIPTION add adds a storage point to the database. del deletes a storage point from the database. list prints the storage points list. prints the full storage point list. """) # Create new AMQPClient object Loading @@ -134,5 +200,7 @@ if cmd == "add": vosStorageCli.add() elif cmd == "list": vosStorageCli.list() elif cmd == "del": vosStorageCli.delete() else: vosStorageCli.help()
transfer_service/storage_amqp_server.py +37 −6 Original line number Diff line number Diff line Loading @@ -19,6 +19,8 @@ class StorageAMQPServer(AMQPServer): self.storageType = None self.storageBasePath = None self.storageHostname = None self.storageId = None self.storageAck = False super(StorageAMQPServer, self).__init__(host, port, queue) def execute_callback(self, requestBody): Loading @@ -27,6 +29,7 @@ class StorageAMQPServer(AMQPServer): response = { "responseType": "ERROR", "errorCode": 1, "errorMsg": "Malformed request, missing parameters." } elif requestBody["requestType"] == "STORAGE_ADD": self.storageType = requestBody["storageType"] self.storageBasePath = requestBody["basePath"] Loading @@ -37,6 +40,7 @@ class StorageAMQPServer(AMQPServer): "errorCode": 2, "errorMsg": "Base path doesn't exist."} return response self.dbConn.connect() result = self.dbConn.insertStorage(self.storageType, self.storageBasePath, Loading @@ -45,14 +49,36 @@ class StorageAMQPServer(AMQPServer): if result: response = { "responseType": "STORAGE_ADD_DONE" } return response else: response = { "responseType": "ERROR", "errorCode": 3, "errorMsg": "Storage point already exists." } return response elif requestBody["requestType"] == "STORAGE_RMV": pass elif requestBody["requestType"] == "STORAGE_DEL_REQ": self.dbConn.connect() result = self.dbConn.getStorageList() self.dbConn.disconnect() response = { "responseType": "STORAGE_DEL_ACK", "storageList": result } self.storageAck = True elif requestBody["requestType"] == "STORAGE_DEL_CON": self.storageId = requestBody["storageId"] if self.storageAck: self.storageAck = False self.dbConn.connect() self.dbConn.deleteStorage(self.storageId) self.dbConn.disconnect() response = { "responseType": "STORAGE_DEL_DONE" } else: response = { "responseType": "ERROR", "errorCode": 4, "errorMsg": "Store request not acknowledged." } elif requestBody["requestType"] == "STORAGE_LST": self.dbConn.connect() result = self.dbConn.getStorageList() Loading @@ -61,6 +87,11 @@ class StorageAMQPServer(AMQPServer): response = { "responseType": "STORAGE_LST_DONE", "storageList": result } else: response = { "responseType": "ERROR", "errorCode": 5, "errorMsg": "Unkown request type." } return response def run(self): Loading