Loading transfer_service/data_rpc_server.py +30 −6 Original line number Diff line number Diff line Loading @@ -11,6 +11,7 @@ import logging import os import sys from rap_client import RapClient, MultipleUsersException from redis_log_handler import RedisLogHandler from redis_rpc_server import RedisRPCServer from config import Config Loading Loading @@ -66,6 +67,20 @@ class DataRPCServer(RedisRPCServer): elif requestBody["requestType"] == "CSTORE" or requestBody["requestType"] == "HSTORE": user = requestBody["userName"] userInDb = self.dbConn.userExists(user) rapCli = RapClient() try: rapInfo = rapCli.getUserInfo(user) except MultipleUsersException: response = { "responseType": "ERROR", "errorCode": 3, "errorMsg": "Multiple users with the same email address in RAP." } return response except Exception as e: response = { "responseType": "ERROR", "errorCode": 4, "errorMsg": f"{e}" } return response if requestBody["requestType"] == "CSTORE": storageList = self.dbConn.getStorageListByType("cold") else: Loading @@ -74,11 +89,20 @@ class DataRPCServer(RedisRPCServer): folderPath = self.storageStorePath.replace("{username}", user) userInfo = self.systemUtils.userInfo(user) # Check if the user exists on the transfer node and is registered in the database if not userInfo or not userInDb: if not userInfo: response = { "responseType": "ERROR", "errorCode": 3, "errorMsg": "The user does not exist on the transfer node or is not registered in the database." } "errorCode": 5, "errorMsg": "The user does not exist on the transfer node." } elif not (userInDb or rapInfo): # the user cannot be found neither in RAP nor in db response = { "responseType": "ERROR", "errorCode": 6, "errorMsg": "The user is not registered neither in RAP nor in the database." } else: if not userInDb and rapInfo: # retrieve data from RAP and insert them in db self.dbConn.insertUser(rapInfo["id"], user, rapInfo["email"]) uid = os.stat(folderPath).st_uid gid = os.stat(folderPath).st_gid # Check if uid and gid match and avoid privilege escalation Loading @@ -92,15 +116,15 @@ class DataRPCServer(RedisRPCServer): "storageList": storageList } elif os.access(folderPath, os.W_OK) and not os.listdir(folderPath): response = { "responseType": "ERROR", "errorCode": 4, "errorCode": 7, "errorMsg": "The 'store' directory on the transfer node is empty." } else: response = { "responseType": "ERROR", "errorCode": 5, "errorCode": 8, "errorMsg": "Service busy. Please, retry later." } else: response = { "responseType": "ERROR", "errorCode": 6, "errorCode": 9, "errorMsg": "Permission denied." } elif requestBody["requestType"] == "STORE_CON": username = requestBody["userName"] Loading transfer_service/exceptions.py 0 → 100644 +13 −0 Original line number Diff line number Diff line #!/usr/bin/env python class Error(Exception): """Base class for exceptions.""" pass # RAP exceptions class MultipleUsersException(Error): def __init__(self): self.message = "Multiple users found with the same email address." super(MultipleUsersException, self).__init__(self.message) transfer_service/import_rpc_server.py +35 −10 Original line number Diff line number Diff line Loading @@ -7,6 +7,7 @@ from config import Config from db_connector import DbConnector from job_queue import JobQueue from job import Job from rap_client import RapClient, MultipleUsersException from redis_log_handler import RedisLogHandler from redis_rpc_server import RedisRPCServer from system_utils import SystemUtils Loading Loading @@ -53,16 +54,40 @@ class ImportRPCServer(RedisRPCServer): path = os.path.abspath(requestBody["path"]) username = requestBody["userName"] userInDb = self.dbConn.userExists(username) rapCli = RapClient() try: rapInfo = rapCli.getUserInfo(username) except MultipleUsersException: response = { "responseType": "ERROR", "errorCode": 2, "errorMsg": "Multiple users with the same email address in RAP." } return response except Exception as e: response = { "responseType": "ERROR", "errorCode": 3, "errorMsg": f"{e}" } return response userInfo = self.systemUtils.userInfo(username) #out = open("import_amqp_server_log.txt", "a") if not userInfo or not userInDb: if not userInfo: response = { "responseType": "ERROR", "errorCode": 2, "errorMsg": "The user does not exist or is not registered in the database." } "errorCode": 4, "errorMsg": "The user does not exist on the transfer node." } return response elif not (userInDb or rapInfo): # the user cannot be found neither in RAP nor in db response = { "responseType": "ERROR", "errorCode": 5, "errorMsg": "The user is not registered neither in RAP nor in the database." } return response else: if not userInDb and rapInfo: # retrieve data from RAP and insert them in db self.dbConn.insertUser(rapInfo["id"], username, rapInfo["email"]) userId = self.dbConn.getUserId(username) userId = rapInfo["id"] pathPrefix = self.dbConn.storageBasePathIsValid(path) if pathPrefix: Loading @@ -70,29 +95,29 @@ class ImportRPCServer(RedisRPCServer): storageType = self.dbConn.getStorageType(pathPrefix) else: response = { "responseType": "ERROR", "errorCode": 3, "errorCode": 6, "errorMsg": "Invalid storage mount point." } return response if not os.path.exists(path): response = { "responseType": "ERROR", "errorCode": 4, "errorCode": 7, "errorMsg": "Path not found." } elif not os.path.isdir(path): response = { "responseType": "ERROR", "errorCode": 5, "errorCode": 8, "errorMsg": "Directory path expected." } elif username not in path: response = { "responseType": "ERROR", "errorCode": 6, "errorCode": 9, "errorMsg": "Directory path does not contain the username." } elif os.path.dirname(path) != pathPrefix + '/' + username: response = { "responseType": "ERROR", "errorCode": 7, "errorCode": 10, "errorMsg": "Invalid path, directory must be located in " + pathPrefix + '/' + username } elif self.importReadyQueue.len() >= self.maxReadyJobs: response = { "responseType": "ERROR", "errorCode": 8, "errorCode": 11, "errorMsg": "Import queue is full, please, retry later." } else: jobObj = Job() Loading transfer_service/rap_client.py 0 → 100644 +70 −0 Original line number Diff line number Diff line #!/usr/bin/env python import json import urllib from base64 import b64encode from http.client import HTTPSConnection from config import Config from exceptions import MultipleUsersException class RapClient(object): def __init__(self): config = Config("/etc/vos_ts/vos_ts.conf") params = config.loadSection("authentication") self.client = params["rap_client"] self.secret = params["rap_secret"] self.ssoServer = params["sso_server"] def getUserInfo(self, username): # Obtain OAuth2 token from client credentials conn = HTTPSConnection(self.ssoServer) params = urllib.parse.urlencode({"grant_type": "client_credentials"}) headers = { "Content-type": "application/x-www-form-urlencoded", "Authorization": "Basic " + b64encode(str.encode(self.client + ":" + self.secret)).decode() } conn.request("POST", "/rap-ia2/auth/oauth2/token", params, headers) response = conn.getresponse() response_body = response.read() conn.close() tokenInfo = json.loads(response_body.decode()) accessToken = tokenInfo["access_token"] # Obtain user info from RAP service email = username + "@inaf.it" headers = { "Authorization": "Bearer " + accessToken } conn.request("GET", "/rap-ia2/ws/user?search=" + urllib.parse.quote(email), None, headers) response = conn.getresponse() responseBody = response.read() conn.close() userInfo = json.loads(responseBody.decode()) if len(userInfo) == 0: return None if len(userInfo) > 1: raise MultipleUsersException for idn in userInfo[0]["identities"]: if idn["institution"] and "INAF" in idn["institution"]: eppn = idn["eppn"] if eppn != email: return None userInfo[0]["email"] = email return userInfo[0] #rapClient = RapClient() #userInfo = rapClient.getUserInfo("") #print(userInfo["id"]) #print(userInfo) #print(userInfo is None) Loading
transfer_service/data_rpc_server.py +30 −6 Original line number Diff line number Diff line Loading @@ -11,6 +11,7 @@ import logging import os import sys from rap_client import RapClient, MultipleUsersException from redis_log_handler import RedisLogHandler from redis_rpc_server import RedisRPCServer from config import Config Loading Loading @@ -66,6 +67,20 @@ class DataRPCServer(RedisRPCServer): elif requestBody["requestType"] == "CSTORE" or requestBody["requestType"] == "HSTORE": user = requestBody["userName"] userInDb = self.dbConn.userExists(user) rapCli = RapClient() try: rapInfo = rapCli.getUserInfo(user) except MultipleUsersException: response = { "responseType": "ERROR", "errorCode": 3, "errorMsg": "Multiple users with the same email address in RAP." } return response except Exception as e: response = { "responseType": "ERROR", "errorCode": 4, "errorMsg": f"{e}" } return response if requestBody["requestType"] == "CSTORE": storageList = self.dbConn.getStorageListByType("cold") else: Loading @@ -74,11 +89,20 @@ class DataRPCServer(RedisRPCServer): folderPath = self.storageStorePath.replace("{username}", user) userInfo = self.systemUtils.userInfo(user) # Check if the user exists on the transfer node and is registered in the database if not userInfo or not userInDb: if not userInfo: response = { "responseType": "ERROR", "errorCode": 3, "errorMsg": "The user does not exist on the transfer node or is not registered in the database." } "errorCode": 5, "errorMsg": "The user does not exist on the transfer node." } elif not (userInDb or rapInfo): # the user cannot be found neither in RAP nor in db response = { "responseType": "ERROR", "errorCode": 6, "errorMsg": "The user is not registered neither in RAP nor in the database." } else: if not userInDb and rapInfo: # retrieve data from RAP and insert them in db self.dbConn.insertUser(rapInfo["id"], user, rapInfo["email"]) uid = os.stat(folderPath).st_uid gid = os.stat(folderPath).st_gid # Check if uid and gid match and avoid privilege escalation Loading @@ -92,15 +116,15 @@ class DataRPCServer(RedisRPCServer): "storageList": storageList } elif os.access(folderPath, os.W_OK) and not os.listdir(folderPath): response = { "responseType": "ERROR", "errorCode": 4, "errorCode": 7, "errorMsg": "The 'store' directory on the transfer node is empty." } else: response = { "responseType": "ERROR", "errorCode": 5, "errorCode": 8, "errorMsg": "Service busy. Please, retry later." } else: response = { "responseType": "ERROR", "errorCode": 6, "errorCode": 9, "errorMsg": "Permission denied." } elif requestBody["requestType"] == "STORE_CON": username = requestBody["userName"] Loading
transfer_service/exceptions.py 0 → 100644 +13 −0 Original line number Diff line number Diff line #!/usr/bin/env python class Error(Exception): """Base class for exceptions.""" pass # RAP exceptions class MultipleUsersException(Error): def __init__(self): self.message = "Multiple users found with the same email address." super(MultipleUsersException, self).__init__(self.message)
transfer_service/import_rpc_server.py +35 −10 Original line number Diff line number Diff line Loading @@ -7,6 +7,7 @@ from config import Config from db_connector import DbConnector from job_queue import JobQueue from job import Job from rap_client import RapClient, MultipleUsersException from redis_log_handler import RedisLogHandler from redis_rpc_server import RedisRPCServer from system_utils import SystemUtils Loading Loading @@ -53,16 +54,40 @@ class ImportRPCServer(RedisRPCServer): path = os.path.abspath(requestBody["path"]) username = requestBody["userName"] userInDb = self.dbConn.userExists(username) rapCli = RapClient() try: rapInfo = rapCli.getUserInfo(username) except MultipleUsersException: response = { "responseType": "ERROR", "errorCode": 2, "errorMsg": "Multiple users with the same email address in RAP." } return response except Exception as e: response = { "responseType": "ERROR", "errorCode": 3, "errorMsg": f"{e}" } return response userInfo = self.systemUtils.userInfo(username) #out = open("import_amqp_server_log.txt", "a") if not userInfo or not userInDb: if not userInfo: response = { "responseType": "ERROR", "errorCode": 2, "errorMsg": "The user does not exist or is not registered in the database." } "errorCode": 4, "errorMsg": "The user does not exist on the transfer node." } return response elif not (userInDb or rapInfo): # the user cannot be found neither in RAP nor in db response = { "responseType": "ERROR", "errorCode": 5, "errorMsg": "The user is not registered neither in RAP nor in the database." } return response else: if not userInDb and rapInfo: # retrieve data from RAP and insert them in db self.dbConn.insertUser(rapInfo["id"], username, rapInfo["email"]) userId = self.dbConn.getUserId(username) userId = rapInfo["id"] pathPrefix = self.dbConn.storageBasePathIsValid(path) if pathPrefix: Loading @@ -70,29 +95,29 @@ class ImportRPCServer(RedisRPCServer): storageType = self.dbConn.getStorageType(pathPrefix) else: response = { "responseType": "ERROR", "errorCode": 3, "errorCode": 6, "errorMsg": "Invalid storage mount point." } return response if not os.path.exists(path): response = { "responseType": "ERROR", "errorCode": 4, "errorCode": 7, "errorMsg": "Path not found." } elif not os.path.isdir(path): response = { "responseType": "ERROR", "errorCode": 5, "errorCode": 8, "errorMsg": "Directory path expected." } elif username not in path: response = { "responseType": "ERROR", "errorCode": 6, "errorCode": 9, "errorMsg": "Directory path does not contain the username." } elif os.path.dirname(path) != pathPrefix + '/' + username: response = { "responseType": "ERROR", "errorCode": 7, "errorCode": 10, "errorMsg": "Invalid path, directory must be located in " + pathPrefix + '/' + username } elif self.importReadyQueue.len() >= self.maxReadyJobs: response = { "responseType": "ERROR", "errorCode": 8, "errorCode": 11, "errorMsg": "Import queue is full, please, retry later." } else: jobObj = Job() Loading
transfer_service/rap_client.py 0 → 100644 +70 −0 Original line number Diff line number Diff line #!/usr/bin/env python import json import urllib from base64 import b64encode from http.client import HTTPSConnection from config import Config from exceptions import MultipleUsersException class RapClient(object): def __init__(self): config = Config("/etc/vos_ts/vos_ts.conf") params = config.loadSection("authentication") self.client = params["rap_client"] self.secret = params["rap_secret"] self.ssoServer = params["sso_server"] def getUserInfo(self, username): # Obtain OAuth2 token from client credentials conn = HTTPSConnection(self.ssoServer) params = urllib.parse.urlencode({"grant_type": "client_credentials"}) headers = { "Content-type": "application/x-www-form-urlencoded", "Authorization": "Basic " + b64encode(str.encode(self.client + ":" + self.secret)).decode() } conn.request("POST", "/rap-ia2/auth/oauth2/token", params, headers) response = conn.getresponse() response_body = response.read() conn.close() tokenInfo = json.loads(response_body.decode()) accessToken = tokenInfo["access_token"] # Obtain user info from RAP service email = username + "@inaf.it" headers = { "Authorization": "Bearer " + accessToken } conn.request("GET", "/rap-ia2/ws/user?search=" + urllib.parse.quote(email), None, headers) response = conn.getresponse() responseBody = response.read() conn.close() userInfo = json.loads(responseBody.decode()) if len(userInfo) == 0: return None if len(userInfo) > 1: raise MultipleUsersException for idn in userInfo[0]["identities"]: if idn["institution"] and "INAF" in idn["institution"]: eppn = idn["eppn"] if eppn != email: return None userInfo[0]["email"] = email return userInfo[0] #rapClient = RapClient() #userInfo = rapClient.getUserInfo("") #print(userInfo["id"]) #print(userInfo) #print(userInfo is None)