Loading transfer_service/group_rw_rpc_server.py 0 → 100644 +100 −0 Original line number Diff line number Diff line #!/usr/bin/env python import logging from config import Config from db_connector import DbConnector from job_queue import JobQueue from job import Job from redis_rpc_server import RedisRPCServer from system_utils import SystemUtils class GroupRwRPCServer(RedisRPCServer): def __init__(self, host, port, db, rpcQueue): self.type = "group_rw" config = Config("/etc/vos_ts/vos_ts.conf") params = config.loadSection("file_catalog") self.dbConn = DbConnector(params["user"], params["password"], params["host"], params.getint("port"), params["db"], 1, 2) params = config.loadSection("scheduling") self.maxReadyJobs = params.getint("max_ready_jobs") params = config.loadSection("logging") self.logger = logging.getLogger("GroupRwRPCServer") logLevel = "logging." + params["log_level"] logDir = params["log_dir"] logFile = logDir + '/' + "group_rw_rpc_server.log" self.logger.setLevel(eval(logLevel)) logFileHandler = logging.FileHandler(logFile) logStreamHandler = logging.StreamHandler() logFormatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") logFileHandler.setFormatter(logFormatter) logStreamHandler.setFormatter(logFormatter) self.logger.addHandler(logFileHandler) self.logger.addHandler(logStreamHandler) self.groupRwReadyQueue = JobQueue("group_rw_ready") self.systemUtils = SystemUtils() super(GroupRwRPCServer, self).__init__(host, port, db, rpcQueue) def callback(self, requestBody): # 'requestType' and 'path' attributes are mandatory if "requestType" not in requestBody or "vospacePath" not in requestBody: response = { "responseType": "ERROR", "errorCode": 1, "errorMsg": "Malformed request, missing parameters." } elif (requestBody["requestType"] == "GRPR_ADD" or requestBody["requestType"] == "GRPR_DEL" or requestBody["requestType"] == "GRPW_ADD" or requestBody["requestType"] == "GRPW_DEL"): requestType = requestBody["requestType"] jobType = requestType.split('_')[0] vospacePath = requestBody["vospacePath"] groupname = requestBody["groupName"] #groupInDb = self.dbConn.userExists(groupname) #groupInfo = self.systemUtils.userInfo(groupname) #if not groupInfo or not groupInDb: # response = { "responseType": "ERROR", # "errorCode": 2, # "errorMsg": "The group does not exist or is not registered in the database." } # return response #groupId = self.dbConn.getUserId(groupname) # TODO: Check if GMS group exists if not self.dbConn.nodeExists(vospacePath): response = { "responseType": "ERROR", "errorCode": 3, "errorMsg": "VOSpace path not found." } elif self.groupRwReadyQueue.len() >= self.maxReadyJobs: response = { "responseType": "ERROR", "errorCode": 4, "errorMsg": "'group_rw_ready' queue is full, please, retry later." } else: jobObj = Job() jobObj.setType(f"vos_{jobType.lower()}") jobInfo = requestBody.copy() jobObj.setInfo(jobInfo) jobObj.setPhase("QUEUED") jobObj.setOwnerId("3354") self.dbConn.insertJob(jobObj) self.groupRwReadyQueue.insertJob(jobObj) response = { "responseType": f"{jobType}_STARTED" } else: response = { "responseType": "ERROR", "errorCode": 5, "errorMsg": "Unkown request type." } return response def run(self): self.logger.info(f"Starting RPC server of type {self.type}...") super(GroupRwRPCServer, self).run() Loading
transfer_service/group_rw_rpc_server.py 0 → 100644 +100 −0 Original line number Diff line number Diff line #!/usr/bin/env python import logging from config import Config from db_connector import DbConnector from job_queue import JobQueue from job import Job from redis_rpc_server import RedisRPCServer from system_utils import SystemUtils class GroupRwRPCServer(RedisRPCServer): def __init__(self, host, port, db, rpcQueue): self.type = "group_rw" config = Config("/etc/vos_ts/vos_ts.conf") params = config.loadSection("file_catalog") self.dbConn = DbConnector(params["user"], params["password"], params["host"], params.getint("port"), params["db"], 1, 2) params = config.loadSection("scheduling") self.maxReadyJobs = params.getint("max_ready_jobs") params = config.loadSection("logging") self.logger = logging.getLogger("GroupRwRPCServer") logLevel = "logging." + params["log_level"] logDir = params["log_dir"] logFile = logDir + '/' + "group_rw_rpc_server.log" self.logger.setLevel(eval(logLevel)) logFileHandler = logging.FileHandler(logFile) logStreamHandler = logging.StreamHandler() logFormatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") logFileHandler.setFormatter(logFormatter) logStreamHandler.setFormatter(logFormatter) self.logger.addHandler(logFileHandler) self.logger.addHandler(logStreamHandler) self.groupRwReadyQueue = JobQueue("group_rw_ready") self.systemUtils = SystemUtils() super(GroupRwRPCServer, self).__init__(host, port, db, rpcQueue) def callback(self, requestBody): # 'requestType' and 'path' attributes are mandatory if "requestType" not in requestBody or "vospacePath" not in requestBody: response = { "responseType": "ERROR", "errorCode": 1, "errorMsg": "Malformed request, missing parameters." } elif (requestBody["requestType"] == "GRPR_ADD" or requestBody["requestType"] == "GRPR_DEL" or requestBody["requestType"] == "GRPW_ADD" or requestBody["requestType"] == "GRPW_DEL"): requestType = requestBody["requestType"] jobType = requestType.split('_')[0] vospacePath = requestBody["vospacePath"] groupname = requestBody["groupName"] #groupInDb = self.dbConn.userExists(groupname) #groupInfo = self.systemUtils.userInfo(groupname) #if not groupInfo or not groupInDb: # response = { "responseType": "ERROR", # "errorCode": 2, # "errorMsg": "The group does not exist or is not registered in the database." } # return response #groupId = self.dbConn.getUserId(groupname) # TODO: Check if GMS group exists if not self.dbConn.nodeExists(vospacePath): response = { "responseType": "ERROR", "errorCode": 3, "errorMsg": "VOSpace path not found." } elif self.groupRwReadyQueue.len() >= self.maxReadyJobs: response = { "responseType": "ERROR", "errorCode": 4, "errorMsg": "'group_rw_ready' queue is full, please, retry later." } else: jobObj = Job() jobObj.setType(f"vos_{jobType.lower()}") jobInfo = requestBody.copy() jobObj.setInfo(jobInfo) jobObj.setPhase("QUEUED") jobObj.setOwnerId("3354") self.dbConn.insertJob(jobObj) self.groupRwReadyQueue.insertJob(jobObj) response = { "responseType": f"{jobType}_STARTED" } else: response = { "responseType": "ERROR", "errorCode": 5, "errorMsg": "Unkown request type." } return response def run(self): self.logger.info(f"Starting RPC server of type {self.type}...") super(GroupRwRPCServer, self).run()