Loading transfer_service/store_amqp_server.py +45 −12 Original line number Diff line number Diff line Loading @@ -2,12 +2,14 @@ # - error codes and status codes list and description # # # import os import sys import json #from enum import Enum from amqp_server import AMQPServer from job import Job Loading @@ -21,20 +23,34 @@ class StoreAMQPServer(AMQPServer): def execute_callback(self, requestBody): # requestType and userName attributes are mandatory if "requestType" not in requestBody or "userName" not in requestBody: response = { "errorCode": 1, "errorMsg": "Malformed request." } response = { "errorCode": 1, "errorMsg": "Malformed request, missing parameters." } elif requestBody["requestType"] == "STATUS": folderPath = "/home/" + requestBody["userName"] + "/store" user = requestBody["userName"] folderPath = "/home/" + user + "/store" userInfo = self.userInfo(user) if not userInfo: response = { "responseType": "ERROR", "errorCode": 2, "errorMsg": "The user does not exist on the transfer node." } else: uid = os.stat(folderPath).st_uid gid = os.stat(folderPath).st_gid if os.access(folderPath, os.W_OK) and uid and gid: if uid == userInfo[1] and gid == userInfo[2] and uid != 0 and gid != 0: if os.access(folderPath, os.W_OK) and not os.listdir(folderPath): response = { "responseType": "STATUS", "status": "READY" } else: response = { "responseType": "STATUS", "status": "BUSY" } else: response = { "responseType": "ERROR", "errorCode": 3, "errorMsg": "Permission denied." } elif requestBody["requestType"] == "CSTORE" or requestBody["requestType"] == "HSTORE": # check if the user dir is not empty and write permissions are enabled # if so, generate a Job object, store it in the cache and push it # also to the pending queue folderPath = "/home/" + requestBody["userName"] + "/store" user = requestBody["userName"] folderPath = "/home/" + user + "/store" uid = os.stat(folderPath).st_uid gid = os.stat(folderPath).st_gid if os.access(folderPath, os.W_OK) and os.listdir(folderPath) and uid and gid: Loading Loading @@ -69,11 +85,28 @@ class StoreAMQPServer(AMQPServer): #print(f"Redis response: {redis_res}") #return redis_res def userExists(self, username): # to be removed from store_preprocessor.py def prepare(self, username): self.username = username self.path = "/home/" + username + "/store" for folder, subfolders, files in os.walk(self.path): for s in subfolders: os.chown(os.path.join(folder, s), 0, 0) os.chmod(os.path.join(folder, s), 0o555) for f in files: os.chown(os.path.join(folder, f), 0, 0) os.chmod(os.path.join(folder, f), 0o555) def userInfo(self, username): fp = open("/etc/passwd", 'r') for line in fp: if line.split(':')[0] == username: return True info = line.split(':') user = info[0] uid = int(info[2]) gid = int(info[3]) if user == username: return [ user, uid, gid ] return False def run(self): Loading Loading
transfer_service/store_amqp_server.py +45 −12 Original line number Diff line number Diff line Loading @@ -2,12 +2,14 @@ # - error codes and status codes list and description # # # import os import sys import json #from enum import Enum from amqp_server import AMQPServer from job import Job Loading @@ -21,20 +23,34 @@ class StoreAMQPServer(AMQPServer): def execute_callback(self, requestBody): # requestType and userName attributes are mandatory if "requestType" not in requestBody or "userName" not in requestBody: response = { "errorCode": 1, "errorMsg": "Malformed request." } response = { "errorCode": 1, "errorMsg": "Malformed request, missing parameters." } elif requestBody["requestType"] == "STATUS": folderPath = "/home/" + requestBody["userName"] + "/store" user = requestBody["userName"] folderPath = "/home/" + user + "/store" userInfo = self.userInfo(user) if not userInfo: response = { "responseType": "ERROR", "errorCode": 2, "errorMsg": "The user does not exist on the transfer node." } else: uid = os.stat(folderPath).st_uid gid = os.stat(folderPath).st_gid if os.access(folderPath, os.W_OK) and uid and gid: if uid == userInfo[1] and gid == userInfo[2] and uid != 0 and gid != 0: if os.access(folderPath, os.W_OK) and not os.listdir(folderPath): response = { "responseType": "STATUS", "status": "READY" } else: response = { "responseType": "STATUS", "status": "BUSY" } else: response = { "responseType": "ERROR", "errorCode": 3, "errorMsg": "Permission denied." } elif requestBody["requestType"] == "CSTORE" or requestBody["requestType"] == "HSTORE": # check if the user dir is not empty and write permissions are enabled # if so, generate a Job object, store it in the cache and push it # also to the pending queue folderPath = "/home/" + requestBody["userName"] + "/store" user = requestBody["userName"] folderPath = "/home/" + user + "/store" uid = os.stat(folderPath).st_uid gid = os.stat(folderPath).st_gid if os.access(folderPath, os.W_OK) and os.listdir(folderPath) and uid and gid: Loading Loading @@ -69,11 +85,28 @@ class StoreAMQPServer(AMQPServer): #print(f"Redis response: {redis_res}") #return redis_res def userExists(self, username): # to be removed from store_preprocessor.py def prepare(self, username): self.username = username self.path = "/home/" + username + "/store" for folder, subfolders, files in os.walk(self.path): for s in subfolders: os.chown(os.path.join(folder, s), 0, 0) os.chmod(os.path.join(folder, s), 0o555) for f in files: os.chown(os.path.join(folder, f), 0, 0) os.chmod(os.path.join(folder, f), 0o555) def userInfo(self, username): fp = open("/etc/passwd", 'r') for line in fp: if line.split(':')[0] == username: return True info = line.split(':') user = info[0] uid = int(info[2]) gid = int(info[3]) if user == username: return [ user, uid, gid ] return False def run(self): Loading