Loading transfer_service/store_amqp_server.py +22 −16 Original line number Diff line number Diff line Loading @@ -19,6 +19,7 @@ class StoreAMQPServer(AMQPServer): def __init__(self, host, queue): self.type = "store" self.storeAck = False self.jobCache = JobCache('redis', 6379, 1) self.job = None self.username = None self.path = None Loading @@ -29,6 +30,9 @@ class StoreAMQPServer(AMQPServer): if "requestType" not in requestBody or "userName" not in requestBody: response = { "errorCode": 1, "errorMsg": "Malformed request, missing parameters." } elif requestBody["requestType"] == "CSTORE" or requestBody["requestType"] == "HSTORE": self.job = Job() self.job.setInfo(requestBody) self.job.setPhase("PENDING") user = requestBody["userName"] folderPath = "/home/" + user + "/store" userInfo = self.userInfo(user) Loading @@ -44,7 +48,8 @@ class StoreAMQPServer(AMQPServer): if uid == userInfo[1] and gid == userInfo[2] and uid != 0 and gid != 0: # If write permissions are set and the 'store' folder is not empty, # it means that data is ready to be copied, otherwise, nothing can # be done until the write permissions are restored. # be done until the write permissions are restored or new data is # copied on the transfer node by the user. if os.access(folderPath, os.W_OK) and os.listdir(folderPath): response = { "responseType": "STORE_ACK" } self.storeAck = True Loading @@ -59,35 +64,36 @@ class StoreAMQPServer(AMQPServer): elif requestBody["requestType"] == "STORE_CON": if self.storeAck: self.storeAck = False self.job = Job() self.job.setInfo(requestBody) self.job.setPhase("PENDING") print("Job generated.") response = { "responseType": "STORE_RUN" } # push data on redis... else: user = requestBody["userName"] self.prepare(user) self.jobCache.set(self.job) redisResponse = self.jobCache.get(self.job.jobID) if "error" in redisResponse: response = { "responseType": "ERROR", "errorCode": 5, "errorMsg": "Store request not acknowledged." } "errorMsg": "Job creation failed." } else: response = { "responseType": "STORE_RUN", "jobID": self.job.jobID } else: response = { "responseType": "ERROR", "errorCode": 6, "errorMsg": "Store request not acknowledged." } else: response = { "responseType": "ERROR", "errorCode": 7, "errorMsg": "Unkown request type." } return response #self.jobCache.set(self.job) #redis_res = self.jobCache.get(self.job.jobID) #print(f"Redis response: {redis_res}") #return redis_res # to be removed from store_preprocessor.py # or simply add a chmod -x here, to more fast? # or simply add a chmod -x here, to be faster? def prepare(self, username): self.username = username self.path = "/home/" + username + "/store" for folder, subfolders, files in os.walk(self.path): os.chown(folder, 0, 0) os.chmod(folder, 0o555) for s in subfolders: os.chown(os.path.join(folder, s), 0, 0) os.chmod(os.path.join(folder, s), 0o555) Loading Loading
transfer_service/store_amqp_server.py +22 −16 Original line number Diff line number Diff line Loading @@ -19,6 +19,7 @@ class StoreAMQPServer(AMQPServer): def __init__(self, host, queue): self.type = "store" self.storeAck = False self.jobCache = JobCache('redis', 6379, 1) self.job = None self.username = None self.path = None Loading @@ -29,6 +30,9 @@ class StoreAMQPServer(AMQPServer): if "requestType" not in requestBody or "userName" not in requestBody: response = { "errorCode": 1, "errorMsg": "Malformed request, missing parameters." } elif requestBody["requestType"] == "CSTORE" or requestBody["requestType"] == "HSTORE": self.job = Job() self.job.setInfo(requestBody) self.job.setPhase("PENDING") user = requestBody["userName"] folderPath = "/home/" + user + "/store" userInfo = self.userInfo(user) Loading @@ -44,7 +48,8 @@ class StoreAMQPServer(AMQPServer): if uid == userInfo[1] and gid == userInfo[2] and uid != 0 and gid != 0: # If write permissions are set and the 'store' folder is not empty, # it means that data is ready to be copied, otherwise, nothing can # be done until the write permissions are restored. # be done until the write permissions are restored or new data is # copied on the transfer node by the user. if os.access(folderPath, os.W_OK) and os.listdir(folderPath): response = { "responseType": "STORE_ACK" } self.storeAck = True Loading @@ -59,35 +64,36 @@ class StoreAMQPServer(AMQPServer): elif requestBody["requestType"] == "STORE_CON": if self.storeAck: self.storeAck = False self.job = Job() self.job.setInfo(requestBody) self.job.setPhase("PENDING") print("Job generated.") response = { "responseType": "STORE_RUN" } # push data on redis... else: user = requestBody["userName"] self.prepare(user) self.jobCache.set(self.job) redisResponse = self.jobCache.get(self.job.jobID) if "error" in redisResponse: response = { "responseType": "ERROR", "errorCode": 5, "errorMsg": "Store request not acknowledged." } "errorMsg": "Job creation failed." } else: response = { "responseType": "STORE_RUN", "jobID": self.job.jobID } else: response = { "responseType": "ERROR", "errorCode": 6, "errorMsg": "Store request not acknowledged." } else: response = { "responseType": "ERROR", "errorCode": 7, "errorMsg": "Unkown request type." } return response #self.jobCache.set(self.job) #redis_res = self.jobCache.get(self.job.jobID) #print(f"Redis response: {redis_res}") #return redis_res # to be removed from store_preprocessor.py # or simply add a chmod -x here, to more fast? # or simply add a chmod -x here, to be faster? def prepare(self, username): self.username = username self.path = "/home/" + username + "/store" for folder, subfolders, files in os.walk(self.path): os.chown(folder, 0, 0) os.chmod(folder, 0o555) for s in subfolders: os.chown(os.path.join(folder, s), 0, 0) os.chmod(os.path.join(folder, s), 0o555) Loading