Loading transfer_service/data_rpc_server.py +90 −39 Original line number Diff line number Diff line Loading @@ -5,7 +5,7 @@ import logging import os import sys from rap_client import RapClient, MultipleUsersException from rap_client import RapClient from redis_log_handler import RedisLogHandler from redis_rpc_server import RedisRPCServer from config import Config Loading Loading @@ -51,53 +51,87 @@ class DataRPCServer(RedisRPCServer): def callback(self, requestBody): # 'requestType' and 'userName' attributes are mandatory if "requestType" not in requestBody or "userName" not in requestBody: errorMsg = "Malformed request, missing parameters." self.logger.error(errorMsg) response = { "responseType":"ERROR", "errorCode": 1, "errorMsg": "Malformed request, missing parameters." } "errorMsg": errorMsg } elif self.pendingQueueWrite.len() >= self.maxPendingJobs: errorMsg = "Pending queue is full, please, retry later." self.logger.error(errorMsg) response = { "responseType": "ERROR", "errorCode": 2, "errorMsg": "Pending queue is full, please, retry later." } "errorMsg": errorMsg } elif requestBody["requestType"] == "CSTORE" or requestBody["requestType"] == "HSTORE": user = requestBody["userName"] try: userInDb = self.dbConn.userExists(user) rapCli = RapClient() except Exception: errorMsg = "Database error." self.logger.exception(errorMsg) response = { "responseType": "ERROR", "errorCode": 3, "errorMsg": errorMsg } return response try: rapCli = RapClient() rapInfo = rapCli.getUserInfo(user) except MultipleUsersException: errorMsg = "Multiple users with the same email address in RAP." self.logger.exception(errorMsg) response = { "responseType": "ERROR", "errorCode": 3, "errorMsg": "Multiple users with the same email address in RAP." } "errorCode": 4, "errorMsg": errorMsg } return response except Exception as e: except HTTPException: errorMsg = "HTTP exception." self.logger.exception(errorMsg) response = { "responseType": "ERROR", "errorCode": 4, "errorMsg": f"{e}" } "errorCode": 5, "errorMsg": errorMsg } return response try: if requestBody["requestType"] == "CSTORE": storageList = self.dbConn.getStorageListByType("cold") else: storageList = self.dbConn.getStorageListByType("hot") except Exception: errorMsg = "Database error." self.logger.exception(errorMsg) response = { "responseType": "ERROR", "errorCode": 3, "errorMsg": errorMsg } return response #folderPath = "/home/" + user + "/store" folderPath = self.storageStorePath.replace("{username}", user) userInfo = self.systemUtils.userInfo(user) # Check if the user exists on the transfer node and is registered in the database if not userInfo: # the user does not exist on the system errorMsg = "The user does not exist on the transfer node." self.logger.error(errorMsg) response = { "responseType": "ERROR", "errorCode": 5, "errorMsg": "The user does not exist on the transfer node." } "errorCode": 6, "errorMsg": errorMsg } elif not (userInDb or rapInfo): # the user cannot be found neither in RAP nor in db errorMsg = "The user is not registered neither in RAP nor in the database." self.logger.error(errorMsg) response = { "responseType": "ERROR", "errorCode": 6, "errorMsg": "The user is not registered neither in RAP nor in the database." } "errorCode": 7, "errorMsg": errorMsg } else: if not userInDb and rapInfo: # retrieve data from RAP and insert them in db try: self.dbConn.insertUser(rapInfo["id"], user, rapInfo["email"]) except Exception: errorMsg = "Database error." self.logger.exception(errorMsg) response = { "responseType": "ERROR", "errorCode": 3, "errorMsg": errorMsg } return response uid = os.stat(folderPath).st_uid gid = os.stat(folderPath).st_gid # Check if uid and gid match and avoid privilege escalation Loading @@ -110,17 +144,23 @@ class DataRPCServer(RedisRPCServer): response = { "responseType": "STORE_ACK", "storageList": storageList } elif os.access(folderPath, os.W_OK) and not os.listdir(folderPath): response = { "responseType": "ERROR", "errorCode": 7, "errorMsg": "The 'store' directory on the transfer node is empty." } else: errorMsg = "The 'store' directory on the transfer node is empty." self.logger.error(errorMsg) response = { "responseType": "ERROR", "errorCode": 8, "errorMsg": "Service busy. Please, retry later." } "errorMsg": errorMsg } else: errorMsg = "Service busy. Please, retry later." self.logger.error(errorMsg) response = { "responseType": "ERROR", "errorCode": 9, "errorMsg": "Permission denied." } "errorMsg": errorMsg } else: errorMsg = "Permission denied." self.logger.error(errorMsg) response = { "responseType": "ERROR", "errorCode": 10, "errorMsg": errorMsg } elif requestBody["requestType"] == "STORE_CON": username = requestBody["userName"] self.prepare(username) Loading @@ -128,24 +168,36 @@ class DataRPCServer(RedisRPCServer): job.setType("vos_data") job.setInfo(requestBody) job.setPhase("PENDING") try: job.setOwnerId(self.dbConn.getUserId(username)) self.dbConn.insertJob(job) dbResponse = self.dbConn.getJob(job.jobId) except Exception: errorMsg = "Database error." self.logger.exception(errorMsg) response = { "responseType": "ERROR", "errorCode": 3, "errorMsg": errorMsg } return response job.jobInfo["storageId"] = requestBody["storageId"] job.jobInfo["storageType"] = requestBody["storageType"] try: self.pendingQueueWrite.insertJob(job) if "error" in dbResponse: except Exception: errorMsg = "Unable to insert the job in the 'write_pending' queue." self.logger.exception(errorMsg) response = { "responseType": "ERROR", "errorCode": 7, "errorMsg": "Job creation failed." } "errorCode": 11, "errorMsg": errorMsg } return response else: response = { "responseType": "STORE_RUN", "jobId": job.jobId } else: errorMsg = "Unkown request type." self.logger.error(errorMsg) response = { "responseType": "ERROR", "errorCode": 8, "errorMsg": "Unkown request type." } "errorCode": 12, "errorMsg": errorMsg } return response def prepare(self, username): Loading @@ -159,4 +211,3 @@ class DataRPCServer(RedisRPCServer): def run(self): self.logger.info(f"Starting RPC server of type {self.type}...") super(DataRPCServer, self).run() transfer_service/rap_client.py +54 −44 Original line number Diff line number Diff line Loading @@ -4,6 +4,7 @@ import json import urllib from base64 import b64encode from http.client import HTTPSConnection from http.client import HTTPException from config import Config from exceptions import MultipleUsersException Loading @@ -19,6 +20,7 @@ class RapClient(object): self.ssoServer = params["sso_server"] def getUserInfo(self, username): try: # Obtain OAuth2 token from client credentials conn = HTTPSConnection(self.ssoServer) params = urllib.parse.urlencode({"grant_type": "client_credentials"}) Loading @@ -29,7 +31,10 @@ class RapClient(object): conn.request("POST", "/rap-ia2/auth/oauth2/token", params, headers) response = conn.getresponse() response_body = response.read() conn.close() except HTTPException: raise else: try: tokenInfo = json.loads(response_body.decode()) accessToken = tokenInfo["access_token"] Loading @@ -41,7 +46,9 @@ class RapClient(object): conn.request("GET", "/rap-ia2/ws/user?search=" + urllib.parse.quote(email), None, headers) response = conn.getresponse() responseBody = response.read() conn.close() except HTTPException: raise else: userInfo = json.loads(responseBody.decode()) if len(userInfo) == 0: Loading @@ -60,6 +67,10 @@ class RapClient(object): userInfo[0]["email"] = email return userInfo[0] finally: conn.close() finally: conn.close() #rapClient = RapClient() Loading @@ -67,4 +78,3 @@ class RapClient(object): #print(userInfo["id"]) #print(userInfo) #print(userInfo is None) No newline at end of file Loading
transfer_service/data_rpc_server.py +90 −39 Original line number Diff line number Diff line Loading @@ -5,7 +5,7 @@ import logging import os import sys from rap_client import RapClient, MultipleUsersException from rap_client import RapClient from redis_log_handler import RedisLogHandler from redis_rpc_server import RedisRPCServer from config import Config Loading Loading @@ -51,53 +51,87 @@ class DataRPCServer(RedisRPCServer): def callback(self, requestBody): # 'requestType' and 'userName' attributes are mandatory if "requestType" not in requestBody or "userName" not in requestBody: errorMsg = "Malformed request, missing parameters." self.logger.error(errorMsg) response = { "responseType":"ERROR", "errorCode": 1, "errorMsg": "Malformed request, missing parameters." } "errorMsg": errorMsg } elif self.pendingQueueWrite.len() >= self.maxPendingJobs: errorMsg = "Pending queue is full, please, retry later." self.logger.error(errorMsg) response = { "responseType": "ERROR", "errorCode": 2, "errorMsg": "Pending queue is full, please, retry later." } "errorMsg": errorMsg } elif requestBody["requestType"] == "CSTORE" or requestBody["requestType"] == "HSTORE": user = requestBody["userName"] try: userInDb = self.dbConn.userExists(user) rapCli = RapClient() except Exception: errorMsg = "Database error." self.logger.exception(errorMsg) response = { "responseType": "ERROR", "errorCode": 3, "errorMsg": errorMsg } return response try: rapCli = RapClient() rapInfo = rapCli.getUserInfo(user) except MultipleUsersException: errorMsg = "Multiple users with the same email address in RAP." self.logger.exception(errorMsg) response = { "responseType": "ERROR", "errorCode": 3, "errorMsg": "Multiple users with the same email address in RAP." } "errorCode": 4, "errorMsg": errorMsg } return response except Exception as e: except HTTPException: errorMsg = "HTTP exception." self.logger.exception(errorMsg) response = { "responseType": "ERROR", "errorCode": 4, "errorMsg": f"{e}" } "errorCode": 5, "errorMsg": errorMsg } return response try: if requestBody["requestType"] == "CSTORE": storageList = self.dbConn.getStorageListByType("cold") else: storageList = self.dbConn.getStorageListByType("hot") except Exception: errorMsg = "Database error." self.logger.exception(errorMsg) response = { "responseType": "ERROR", "errorCode": 3, "errorMsg": errorMsg } return response #folderPath = "/home/" + user + "/store" folderPath = self.storageStorePath.replace("{username}", user) userInfo = self.systemUtils.userInfo(user) # Check if the user exists on the transfer node and is registered in the database if not userInfo: # the user does not exist on the system errorMsg = "The user does not exist on the transfer node." self.logger.error(errorMsg) response = { "responseType": "ERROR", "errorCode": 5, "errorMsg": "The user does not exist on the transfer node." } "errorCode": 6, "errorMsg": errorMsg } elif not (userInDb or rapInfo): # the user cannot be found neither in RAP nor in db errorMsg = "The user is not registered neither in RAP nor in the database." self.logger.error(errorMsg) response = { "responseType": "ERROR", "errorCode": 6, "errorMsg": "The user is not registered neither in RAP nor in the database." } "errorCode": 7, "errorMsg": errorMsg } else: if not userInDb and rapInfo: # retrieve data from RAP and insert them in db try: self.dbConn.insertUser(rapInfo["id"], user, rapInfo["email"]) except Exception: errorMsg = "Database error." self.logger.exception(errorMsg) response = { "responseType": "ERROR", "errorCode": 3, "errorMsg": errorMsg } return response uid = os.stat(folderPath).st_uid gid = os.stat(folderPath).st_gid # Check if uid and gid match and avoid privilege escalation Loading @@ -110,17 +144,23 @@ class DataRPCServer(RedisRPCServer): response = { "responseType": "STORE_ACK", "storageList": storageList } elif os.access(folderPath, os.W_OK) and not os.listdir(folderPath): response = { "responseType": "ERROR", "errorCode": 7, "errorMsg": "The 'store' directory on the transfer node is empty." } else: errorMsg = "The 'store' directory on the transfer node is empty." self.logger.error(errorMsg) response = { "responseType": "ERROR", "errorCode": 8, "errorMsg": "Service busy. Please, retry later." } "errorMsg": errorMsg } else: errorMsg = "Service busy. Please, retry later." self.logger.error(errorMsg) response = { "responseType": "ERROR", "errorCode": 9, "errorMsg": "Permission denied." } "errorMsg": errorMsg } else: errorMsg = "Permission denied." self.logger.error(errorMsg) response = { "responseType": "ERROR", "errorCode": 10, "errorMsg": errorMsg } elif requestBody["requestType"] == "STORE_CON": username = requestBody["userName"] self.prepare(username) Loading @@ -128,24 +168,36 @@ class DataRPCServer(RedisRPCServer): job.setType("vos_data") job.setInfo(requestBody) job.setPhase("PENDING") try: job.setOwnerId(self.dbConn.getUserId(username)) self.dbConn.insertJob(job) dbResponse = self.dbConn.getJob(job.jobId) except Exception: errorMsg = "Database error." self.logger.exception(errorMsg) response = { "responseType": "ERROR", "errorCode": 3, "errorMsg": errorMsg } return response job.jobInfo["storageId"] = requestBody["storageId"] job.jobInfo["storageType"] = requestBody["storageType"] try: self.pendingQueueWrite.insertJob(job) if "error" in dbResponse: except Exception: errorMsg = "Unable to insert the job in the 'write_pending' queue." self.logger.exception(errorMsg) response = { "responseType": "ERROR", "errorCode": 7, "errorMsg": "Job creation failed." } "errorCode": 11, "errorMsg": errorMsg } return response else: response = { "responseType": "STORE_RUN", "jobId": job.jobId } else: errorMsg = "Unkown request type." self.logger.error(errorMsg) response = { "responseType": "ERROR", "errorCode": 8, "errorMsg": "Unkown request type." } "errorCode": 12, "errorMsg": errorMsg } return response def prepare(self, username): Loading @@ -159,4 +211,3 @@ class DataRPCServer(RedisRPCServer): def run(self): self.logger.info(f"Starting RPC server of type {self.type}...") super(DataRPCServer, self).run()
transfer_service/rap_client.py +54 −44 Original line number Diff line number Diff line Loading @@ -4,6 +4,7 @@ import json import urllib from base64 import b64encode from http.client import HTTPSConnection from http.client import HTTPException from config import Config from exceptions import MultipleUsersException Loading @@ -19,6 +20,7 @@ class RapClient(object): self.ssoServer = params["sso_server"] def getUserInfo(self, username): try: # Obtain OAuth2 token from client credentials conn = HTTPSConnection(self.ssoServer) params = urllib.parse.urlencode({"grant_type": "client_credentials"}) Loading @@ -29,7 +31,10 @@ class RapClient(object): conn.request("POST", "/rap-ia2/auth/oauth2/token", params, headers) response = conn.getresponse() response_body = response.read() conn.close() except HTTPException: raise else: try: tokenInfo = json.loads(response_body.decode()) accessToken = tokenInfo["access_token"] Loading @@ -41,7 +46,9 @@ class RapClient(object): conn.request("GET", "/rap-ia2/ws/user?search=" + urllib.parse.quote(email), None, headers) response = conn.getresponse() responseBody = response.read() conn.close() except HTTPException: raise else: userInfo = json.loads(responseBody.decode()) if len(userInfo) == 0: Loading @@ -60,6 +67,10 @@ class RapClient(object): userInfo[0]["email"] = email return userInfo[0] finally: conn.close() finally: conn.close() #rapClient = RapClient() Loading @@ -67,4 +78,3 @@ class RapClient(object): #print(userInfo["id"]) #print(userInfo) #print(userInfo is None) No newline at end of file