Loading test_client/data_archiver.py→test_client/dataArchiverCli.py +109 −0 Original line number Diff line number Diff line Loading @@ -33,15 +33,18 @@ class AMQPClient(object): return self.response def help(): def help(self): sys.exit(""" NAME data_archiver_cli dataArchiverCli.py SYNOPSYS python3.x data_archiver_cli.py COMMAND USERNAME python3.x dataArchiverCli.py COMMAND USERNAME DESCRIPTION The purpose of this client application is to notify to the VOSpace backend that data is ready to be saved somewhere. The client accepts only one command at a time. This command is mandatory. A list of supported commands is shown here below: Loading @@ -51,36 +54,35 @@ DESCRIPTION hstore performs a 'hot storage' request, data will be saved on a standard server status returns the current status of the 'store' folder on the transfer node. The client needs also to know the username associated to a store request process. The client also needs to know the username associated to a storage request process. The username must be the same used for accessing the transfer node. """) def store(cmd, username): def store(self, cmd, username): request_type = cmd.upper() dataArchiverCli = AMQPClient() storeRequest = { "requestType": request_type, "userName": username } print(f"Sending {request_type} request:") print(json.dumps(storeRequest, indent = 3)) storeResponse = dataArchiverCli.call(storeRequest) print(f"Sending {request_type} request...") storeResponse = self.call(storeRequest) if "responseType" not in storeResponse: sys.exit("FATAL: Malformed response, store acknowledge expected.") elif storeResponse["responseType"] == "STORE_ACK": print("WARNING: if you answer 'yes', your data on the transfer node will be read-only!!!") print("\nWARNING!!! WARNING!!! WARNING!!! WARNING!!! WARNING!!!") print("If you confirm, all your data on the transfer node will be") print("available in read-only mode for all the time the archiving") print("process is running.") print("WARNING!!! WARNING!!! WARNING!!! WARNING!!! WARNING!!!\n") confirm = None while not confirm in [ "yes", "no"]: confirm = input("Are you sure to proceed? [yes/no]: ") if confirm == "yes": confirmRequest = { "requestType": "STORE_CON", "userName": username } confirmResponse = dataArchiverCli.call(confirmRequest) confirmResponse = self.call(confirmRequest) if "responseType" not in storeResponse: sys.exit("FATAL: Malformed response, store confirmation expected.") elif confirmResponse["responseType"] == "STORE_RUN": print("Store process started successfully") print("Store process started successfully!") else: sys.exit("FATAL: Unknown") sys.exit("FATAL: Unknown response type.") else: sys.exit("Store process aborted gracefully.") elif storeResponse["responseType"] == "ERROR": Loading @@ -89,35 +91,19 @@ def store(cmd, username): sys.exit(f"Error code: {errorCode}, Error message: {errorMsg}") else: sys.exit("FATAL: Unknown response type.") #print("Response:") #print(json.dumps(response, indent = 3)) # Create new AMQPClient object dataArchiverCli = AMQPClient() # Check the number of input args if len(sys.argv) == 3: script, cmd, username = sys.argv else: help() # Check the command type passed by the user if cmd == "cstore": store(cmd, username) elif cmd == "hstore": store(cmd, username) elif cmd == "status": dataArchiverCli = AMQPClient() statusRequest = { "requestType": "STATUS", "userName": username } print("Sending status request:") print(json.dumps(statusRequest, indent = 3)) statusResponse = dataArchiverCli.call(statusRequest) if statusResponse["responseType"] == "ERROR": errorCode = statusResponse["errorCode"] errorMsg = statusResponse["errorMsg"] sys.exit(f"Error code: {errorCode}, Error message: {errorMsg}") else: if statusResponse["status"] == "READY": print("Ready to go!") elif statusResponse["status"] == "BUSY": print("A process is active...") dataArchiverCli.help() # Check the command passed by the user if cmd == "cstore" or cmd == "hstore": dataArchiverCli.store(cmd, username) else: help() dataArchiverCli.help() No newline at end of file transfer_service/store_amqp_server.py +21 −25 Original line number Diff line number Diff line Loading @@ -18,16 +18,20 @@ class StoreAMQPServer(AMQPServer): def __init__(self, host, queue): self.type = "store" self.storeAck = False self.job = None self.username = None self.path = None super(StoreAMQPServer, self).__init__(host, queue) def execute_callback(self, requestBody): # requestType and userName attributes are mandatory # 'requestType' and 'userName' attributes are mandatory if "requestType" not in requestBody or "userName" not in requestBody: response = { "errorCode": 1, "errorMsg": "Malformed request, missing parameters." } elif requestBody["requestType"] == "STATUS": elif requestBody["requestType"] == "CSTORE" or requestBody["requestType"] == "HSTORE": user = requestBody["userName"] folderPath = "/home/" + user + "/store" userInfo = self.userInfo(user) # Check if the user exists on the transfer node if not userInfo: response = { "responseType": "ERROR", "errorCode": 2, Loading @@ -35,31 +39,22 @@ class StoreAMQPServer(AMQPServer): else: uid = os.stat(folderPath).st_uid gid = os.stat(folderPath).st_gid # Check if uid and gid match and avoid privilege escalation if uid == userInfo[1] and gid == userInfo[2] and uid != 0 and gid != 0: if os.access(folderPath, os.W_OK) and not os.listdir(folderPath): response = { "responseType": "STATUS", "status": "READY" } else: response = { "responseType": "STATUS", "status": "BUSY" } else: response = { "responseType": "ERROR", "errorCode": 3, "errorMsg": "Permission denied." } elif requestBody["requestType"] == "CSTORE" or requestBody["requestType"] == "HSTORE": # check if the user dir is not empty and write permissions are enabled # if so, generate a Job object, store it in the cache and push it # also to the pending queue user = requestBody["userName"] folderPath = "/home/" + user + "/store" uid = os.stat(folderPath).st_uid gid = os.stat(folderPath).st_gid if os.access(folderPath, os.W_OK) and os.listdir(folderPath) and uid and gid: # If write permissions are set and the 'store' folder is not empty, # it means that data is ready to be copied, otherwise, nothing can # be done until the write permissions are restored. if os.access(folderPath, os.W_OK) and os.listdir(folderPath): response = { "responseType": "STORE_ACK" } self.storeAck = True else: response = { "responseType": "ERROR", "errorCode": 3, "errorMsg": "FATAL: permission denied." } "errorMsg": "Service busy." } else: response = { "responseType": "ERROR", "errorCode": 4, "errorMsg": "Permission denied." } elif requestBody["requestType"] == "STORE_CON": if self.storeAck: self.storeAck = False Loading @@ -71,11 +66,11 @@ class StoreAMQPServer(AMQPServer): # push data on redis... else: response = { "responseType": "ERROR", "errorCode": 4, "errorMsg": "FATAL: store request not acknowledged." } "errorCode": 5, "errorMsg": "Store request not acknowledged." } else: response = { "responseType": "ERROR", "errorCode": 2, "errorCode": 6, "errorMsg": "Unkown request type." } return response Loading @@ -87,6 +82,7 @@ class StoreAMQPServer(AMQPServer): # to be removed from store_preprocessor.py # or simply add a chmod -x here, to more fast? def prepare(self, username): self.username = username self.path = "/home/" + username + "/store" Loading Loading
test_client/data_archiver.py→test_client/dataArchiverCli.py +109 −0 Original line number Diff line number Diff line Loading @@ -33,15 +33,18 @@ class AMQPClient(object): return self.response def help(): def help(self): sys.exit(""" NAME data_archiver_cli dataArchiverCli.py SYNOPSYS python3.x data_archiver_cli.py COMMAND USERNAME python3.x dataArchiverCli.py COMMAND USERNAME DESCRIPTION The purpose of this client application is to notify to the VOSpace backend that data is ready to be saved somewhere. The client accepts only one command at a time. This command is mandatory. A list of supported commands is shown here below: Loading @@ -51,36 +54,35 @@ DESCRIPTION hstore performs a 'hot storage' request, data will be saved on a standard server status returns the current status of the 'store' folder on the transfer node. The client needs also to know the username associated to a store request process. The client also needs to know the username associated to a storage request process. The username must be the same used for accessing the transfer node. """) def store(cmd, username): def store(self, cmd, username): request_type = cmd.upper() dataArchiverCli = AMQPClient() storeRequest = { "requestType": request_type, "userName": username } print(f"Sending {request_type} request:") print(json.dumps(storeRequest, indent = 3)) storeResponse = dataArchiverCli.call(storeRequest) print(f"Sending {request_type} request...") storeResponse = self.call(storeRequest) if "responseType" not in storeResponse: sys.exit("FATAL: Malformed response, store acknowledge expected.") elif storeResponse["responseType"] == "STORE_ACK": print("WARNING: if you answer 'yes', your data on the transfer node will be read-only!!!") print("\nWARNING!!! WARNING!!! WARNING!!! WARNING!!! WARNING!!!") print("If you confirm, all your data on the transfer node will be") print("available in read-only mode for all the time the archiving") print("process is running.") print("WARNING!!! WARNING!!! WARNING!!! WARNING!!! WARNING!!!\n") confirm = None while not confirm in [ "yes", "no"]: confirm = input("Are you sure to proceed? [yes/no]: ") if confirm == "yes": confirmRequest = { "requestType": "STORE_CON", "userName": username } confirmResponse = dataArchiverCli.call(confirmRequest) confirmResponse = self.call(confirmRequest) if "responseType" not in storeResponse: sys.exit("FATAL: Malformed response, store confirmation expected.") elif confirmResponse["responseType"] == "STORE_RUN": print("Store process started successfully") print("Store process started successfully!") else: sys.exit("FATAL: Unknown") sys.exit("FATAL: Unknown response type.") else: sys.exit("Store process aborted gracefully.") elif storeResponse["responseType"] == "ERROR": Loading @@ -89,35 +91,19 @@ def store(cmd, username): sys.exit(f"Error code: {errorCode}, Error message: {errorMsg}") else: sys.exit("FATAL: Unknown response type.") #print("Response:") #print(json.dumps(response, indent = 3)) # Create new AMQPClient object dataArchiverCli = AMQPClient() # Check the number of input args if len(sys.argv) == 3: script, cmd, username = sys.argv else: help() # Check the command type passed by the user if cmd == "cstore": store(cmd, username) elif cmd == "hstore": store(cmd, username) elif cmd == "status": dataArchiverCli = AMQPClient() statusRequest = { "requestType": "STATUS", "userName": username } print("Sending status request:") print(json.dumps(statusRequest, indent = 3)) statusResponse = dataArchiverCli.call(statusRequest) if statusResponse["responseType"] == "ERROR": errorCode = statusResponse["errorCode"] errorMsg = statusResponse["errorMsg"] sys.exit(f"Error code: {errorCode}, Error message: {errorMsg}") else: if statusResponse["status"] == "READY": print("Ready to go!") elif statusResponse["status"] == "BUSY": print("A process is active...") dataArchiverCli.help() # Check the command passed by the user if cmd == "cstore" or cmd == "hstore": dataArchiverCli.store(cmd, username) else: help() dataArchiverCli.help() No newline at end of file
transfer_service/store_amqp_server.py +21 −25 Original line number Diff line number Diff line Loading @@ -18,16 +18,20 @@ class StoreAMQPServer(AMQPServer): def __init__(self, host, queue): self.type = "store" self.storeAck = False self.job = None self.username = None self.path = None super(StoreAMQPServer, self).__init__(host, queue) def execute_callback(self, requestBody): # requestType and userName attributes are mandatory # 'requestType' and 'userName' attributes are mandatory if "requestType" not in requestBody or "userName" not in requestBody: response = { "errorCode": 1, "errorMsg": "Malformed request, missing parameters." } elif requestBody["requestType"] == "STATUS": elif requestBody["requestType"] == "CSTORE" or requestBody["requestType"] == "HSTORE": user = requestBody["userName"] folderPath = "/home/" + user + "/store" userInfo = self.userInfo(user) # Check if the user exists on the transfer node if not userInfo: response = { "responseType": "ERROR", "errorCode": 2, Loading @@ -35,31 +39,22 @@ class StoreAMQPServer(AMQPServer): else: uid = os.stat(folderPath).st_uid gid = os.stat(folderPath).st_gid # Check if uid and gid match and avoid privilege escalation if uid == userInfo[1] and gid == userInfo[2] and uid != 0 and gid != 0: if os.access(folderPath, os.W_OK) and not os.listdir(folderPath): response = { "responseType": "STATUS", "status": "READY" } else: response = { "responseType": "STATUS", "status": "BUSY" } else: response = { "responseType": "ERROR", "errorCode": 3, "errorMsg": "Permission denied." } elif requestBody["requestType"] == "CSTORE" or requestBody["requestType"] == "HSTORE": # check if the user dir is not empty and write permissions are enabled # if so, generate a Job object, store it in the cache and push it # also to the pending queue user = requestBody["userName"] folderPath = "/home/" + user + "/store" uid = os.stat(folderPath).st_uid gid = os.stat(folderPath).st_gid if os.access(folderPath, os.W_OK) and os.listdir(folderPath) and uid and gid: # If write permissions are set and the 'store' folder is not empty, # it means that data is ready to be copied, otherwise, nothing can # be done until the write permissions are restored. if os.access(folderPath, os.W_OK) and os.listdir(folderPath): response = { "responseType": "STORE_ACK" } self.storeAck = True else: response = { "responseType": "ERROR", "errorCode": 3, "errorMsg": "FATAL: permission denied." } "errorMsg": "Service busy." } else: response = { "responseType": "ERROR", "errorCode": 4, "errorMsg": "Permission denied." } elif requestBody["requestType"] == "STORE_CON": if self.storeAck: self.storeAck = False Loading @@ -71,11 +66,11 @@ class StoreAMQPServer(AMQPServer): # push data on redis... else: response = { "responseType": "ERROR", "errorCode": 4, "errorMsg": "FATAL: store request not acknowledged." } "errorCode": 5, "errorMsg": "Store request not acknowledged." } else: response = { "responseType": "ERROR", "errorCode": 2, "errorCode": 6, "errorMsg": "Unkown request type." } return response Loading @@ -87,6 +82,7 @@ class StoreAMQPServer(AMQPServer): # to be removed from store_preprocessor.py # or simply add a chmod -x here, to more fast? def prepare(self, username): self.username = username self.path = "/home/" + username + "/store" Loading