Commit bb76aed0 authored by Cristiano Urban's avatar Cristiano Urban
Browse files

Added basic logging and exception handling features.

parent 548e8404
Loading
Loading
Loading
Loading
+68 −25
Original line number Diff line number Diff line
@@ -41,11 +41,13 @@ class GroupRwRPCServer(RedisRPCServer):
        super(GroupRwRPCServer, self).__init__(host, port, db, rpcQueue)

    def callback(self, requestBody):
        # 'requestType' and 'path' attributes are mandatory
        # 'requestType' and 'vospacePath' attributes are mandatory
        if "requestType" not in requestBody or "vospacePath" not in requestBody:
            errorMsg = "Malformed request, missing parameters."
            self.logger.error(errorMsg)
            response = { "responseType": "ERROR",
                         "errorCode": 1,
                         "errorMsg": "Malformed request, missing parameters." }
                         "errorMsg": errorMsg }
        elif (requestBody["requestType"] == "GRPR_ADD" or 
              requestBody["requestType"] == "GRPR_DEL" or
              requestBody["requestType"] == "GRPW_ADD" or 
@@ -54,21 +56,43 @@ class GroupRwRPCServer(RedisRPCServer):
            jobType = requestType.split('_')[0]
            vospacePath = requestBody["vospacePath"]
            groupname = requestBody["groupName"]            

            if not self.dbConn.nodeExists(vospacePath):
            try:
                nodeExists = self.dbConn.nodeExists(vospacePath)
            except Exception:
                errorMsg = "Database error."
                self.logger.exception(errorMsg)
                response = { "responseType": "ERROR",
                             "errorCode": 2,
                             "errorMsg": "VOSpace path not found." }
            elif self.groupRwReadyQueue.len() >= self.maxReadyJobs:
                             "errorMsg": errorMsg }
                return response
            try:
                queueLen = self.groupRwReadyQueue.len()
            except Exception:
                errorMsg = "Cache error."
                self.logger.exception(errorMsg)
                response = { "responseType": "ERROR",
                             "errorCode": 3,
                             "errorMsg": "'group_rw_ready' queue is full, please, retry later." }
                             "errorMsg": errorMsg }
                return response
            if not nodeExists:
                errorMsg = "VOSpace path not found."
                self.logger.error(errorMsg)
                response = { "responseType": "ERROR",
                             "errorCode": 4,
                             "errorMsg": errorMsg }
            elif queueLen >= self.maxReadyJobs:
                errorMsg = "'group_rw_ready' queue is full, please, retry later."
                self.logger.error(errorMsg)
                response = { "responseType": "ERROR",
                             "errorCode": 5,
                             "errorMsg": errorMsg }
            else:
                jobObj = Job()
                jobObj.setType(f"vos_{jobType.lower()}")
                jobInfo = requestBody.copy()
                jobObj.setInfo(jobInfo)
                jobObj.setPhase("QUEUED")
                #try:
                creatorId = self.dbConn.getCreatorId(vospacePath)
                jobObj.setOwnerId(creatorId)
                self.dbConn.insertJob(jobObj)
@@ -76,33 +100,52 @@ class GroupRwRPCServer(RedisRPCServer):
                response = { "responseType": f"{jobType}_STARTED" }
        elif requestBody["requestType"] == "GRPR_LST":
            vospacePath = requestBody["vospacePath"]     
            try:
                if not self.dbConn.nodeExists(vospacePath):
                    errorMsg = "VOSpace path not found."
                    self.logger.error(errorMsg)
                    response = { "responseType": "ERROR",
                             "errorCode": 2,
                             "errorMsg": "VOSpace path not found." }
                                 "errorCode": 4,
                                 "errorMsg": errorMsg }
                else:
                    result = self.dbConn.getGroupRead(vospacePath)

                    response = { "responseType": "GRPR_LST_DONE",
                                 "groupList": result }
            except Exception:
                errorMsg = "Database error."
                self.logger.exception(errorMsg)
                response = { "responseType": "ERROR",
                             "errorCode": 2,
                             "errorMsg": errorMsg }
                return response
        elif requestBody["requestType"] == "GRPW_LST":
            vospacePath = requestBody["vospacePath"]
            try:
                if not self.dbConn.nodeExists(vospacePath):
                    errorMsg = "VOSpace path not found."
                    self.logger.error(errorMsg)
                    response = { "responseType": "ERROR",
                             "errorCode": 2,
                             "errorMsg": "VOSpace path not found." }
                                 "errorCode": 4,
                                 "errorMsg": errorMsg }
                else:
                    result = self.dbConn.getGroupWrite(vospacePath)
                    response = { "responseType": "GRPW_LST_DONE",
                                 "groupList": result }
            except Exception:
                errorMsg = "Database error."
                self.logger.exception(errorMsg)
                response = { "responseType": "ERROR",
                             "errorCode": 2,
                             "errorMsg": errorMsg }
                return response
        else:
            errorMsg = "Unkown request type."
            self.logger.error(errorMsg)
            response = { "responseType": "ERROR",
                         "errorCode": 4,
                         "errorMsg": "Unkown request type." }

                         "errorCode": 5,
                         "errorMsg": errorMsg }
        return response

    def run(self):
        self.logger.info(f"Starting RPC server of type {self.type}...")
        super(GroupRwRPCServer, self).run()
 
+103 −42
Original line number Diff line number Diff line
@@ -47,78 +47,125 @@ class ImportRPCServer(RedisRPCServer):
    def callback(self, requestBody):
        # 'requestType' and 'path' attributes are mandatory
        if "requestType" not in requestBody or "path" not in requestBody:
            errorMsg = "Malformed request, missing parameters."
            self.logger.error(errorMsg)
            response = { "responseType": "ERROR",
                         "errorCode": 1,
                         "errorMsg": "Malformed request, missing parameters." }
                         "errorMsg": errorMsg }
        elif requestBody["requestType"] == "NODE_IMPORT":
            path = os.path.abspath(requestBody["path"])
            username = requestBody["userName"]
            try:
                userInDb = self.dbConn.userExists(username)
            rapCli = RapClient()
            except Exception:
                errorMsg = "Database error."
                self.logger.exception(errorMsg)
                response = { "responseType": "ERROR",
                             "errorCode": 2,
                             "errorMsg": errorMsg }
                return response
            try:
                rapCli = RapClient()
                rapInfo = rapCli.getUserInfo(username)
            except MultipleUsersException:
                errorMsg = "Multiple users with the same email address in RAP."
                response = { "responseType": "ERROR",
                             "errorCode": 2,
                             "errorMsg": "Multiple users with the same email address in RAP." }
                             "errorCode": 3,
                             "errorMsg": errorMsg }
                return response
            except Exception as e:
            except HTTPException:
                errorMsg = "HTTP exception."
                self.logger.error(errorMsg)
                response = { "responseType": "ERROR",
                             "errorCode": 3,
                             "errorMsg": f"{e}" }
                             "errorCode": 4,
                             "errorMsg": errorMsg }
                return response
            
            userInfo = self.systemUtils.userInfo(username)
            #out = open("import_amqp_server_log.txt", "a")

            if not userInfo:
                errorMsg = "The user does not exist on the transfer node."
                self.logger.error(errorMsg)
                response = { "responseType": "ERROR",
                             "errorCode": 4,
                             "errorMsg": "The user does not exist on the transfer node." }
                             "errorCode": 5,
                             "errorMsg": errorMsg }
                return response
            elif not (userInDb or rapInfo):
                # the user cannot be found neither in RAP nor in db
                errorMsg = "The user is not registered neither in RAP nor in the database."
                self.logger.error(errorMsg)
                response = { "responseType": "ERROR",
                             "errorCode": 5,
                             "errorMsg": "The user is not registered neither in RAP nor in the database." }
                             "errorCode": 6,
                             "errorMsg": errorMsg }
                return response
            else:
                if not userInDb and rapInfo:
                    # retrieve data from RAP and insert them in db
                    try:
                        self.dbConn.insertUser(rapInfo["id"], username, rapInfo["email"])

                    except Exception:
                        errorMsg = "Database error."
                        self.logger.exception(errorMsg)
                        response = { "responseType": "ERROR",
                                     "errorCode": 2,
                                     "errorMsg": errorMsg }
                        return response
            try:        
                userId = self.dbConn.getUserId(username)
                pathPrefix = self.dbConn.storageBasePathIsValid(path)

            except Exception:
                errorMsg = "Database error."
                self.logger.exception(errorMsg)
                response = { "responseType": "ERROR",
                             "errorCode": 2,
                             "errorMsg": errorMsg }
                return response
            if pathPrefix:
                storageId = self.dbConn.getStorageId(pathPrefix)
                storageType = self.dbConn.getStorageType(pathPrefix)
            else:
                errorMsg = "Invalid storage mount point."
                self.logger.error(errorMsg)
                response = { "responseType": "ERROR",
                             "errorCode": 6,
                             "errorMsg": "Invalid storage mount point." }
                             "errorCode": 7,
                             "errorMsg": errorMsg }
                return response
            try:
                queueLen = self.importReadyQueue.len()
            except Exception:
                errorMsg = "Cache error."
                self.logger.exception(errorMsg)
                response = { "responseType": "ERROR",
                             "errorCode": 8,
                             "errorMsg": errorMsg }
                return response

            if not os.path.exists(path):
                errorMsg = "Path not found."
                self.logger.error(errorMsg)
                response = { "responseType": "ERROR",
                             "errorCode": 7,
                             "errorMsg": "Path not found." }
                             "errorCode": 9,
                             "errorMsg": errorMsg }
            elif not os.path.isdir(path):
                errorMsg = "Directory path expected."
                self.logger.error(errorMsg)
                response = { "responseType": "ERROR",
                             "errorCode": 8,
                             "errorMsg": "Directory path expected." }
                             "errorCode": 10,
                             "errorMsg": errorMsg }
            elif username not in path:
                errorMsg = "Directory path does not contain the username."
                self.logger.error(errorMsg)
                response = { "responseType": "ERROR",
                             "errorCode": 9,
                             "errorMsg": "Directory path does not contain the username." }
                             "errorCode": 11,
                             "errorMsg": errorMsg }
            elif os.path.dirname(path) != pathPrefix + '/' + username:
                errorMsg = "Invalid path, directory must be located in " + pathPrefix + '/' + username
                response = { "responseType": "ERROR",
                             "errorCode": 10,
                             "errorMsg": "Invalid path, directory must be located in " + pathPrefix + '/' + username  }
            elif self.importReadyQueue.len() >= self.maxReadyJobs:
                             "errorCode": 12,
                             "errorMsg": errorMsg }                
            elif queueLen >= self.maxReadyJobs:
                errorMsg = "Import queue is full, please, retry later."
                self.logger.error(errorMsg)
                response = { "responseType": "ERROR",
                             "errorCode": 11,
                             "errorMsg": "Import queue is full, please, retry later." }
                             "errorCode": 13,
                             "errorMsg": errorMsg }
            else:
                jobObj = Job()
                jobObj.setType("vos_import")
@@ -129,17 +176,31 @@ class ImportRPCServer(RedisRPCServer):
                jobObj.setInfo(jobInfo)
                jobObj.setPhase("QUEUED")
                jobObj.setOwnerId(userId)
                try:
                    self.dbConn.insertJob(jobObj)
                except Exception:
                    errorMsg = "Database error."
                    self.logger.exception(errorMsg)
                    response = { "responseType": "ERROR",
                                 "errorCode": 2,
                                 "errorMsg": errorMsg }
                    return response
                try:
                    self.importReadyQueue.insertJob(jobObj)
                #out.write(requestBody.copy())
                #out.close()

                except Exception:
                    errorMsg = "Cache error."
                    self.logger.exception(errorMsg)
                    response = { "responseType": "ERROR",
                                 "errorCode": 8,
                                 "errorMsg": errorMsg }
                    return response
                response = { "responseType": "IMPORT_STARTED" }
        else:
            errorMsg = "Unkown request type."
            self.logger.error(errorMsg)
            response = { "responseType": "ERROR",
                         "errorCode": 9,
                         "errorMsg": "Unkown request type." }

                         "errorCode": 14,
                         "errorMsg": errorMsg }
        return response

    def run(self):
+63 −17
Original line number Diff line number Diff line
@@ -39,38 +39,84 @@ class JobRPCServer(RedisRPCServer):
    def callback(self, requestBody):
        # 'requestType' attribute is mandatory
        if "requestType" not in requestBody:
            errorMsg = "Malformed request, missing parameters."
            self.logger.error(errorMsg)
            response = { "responseType": "ERROR",
                         "errorCode": 1,
                         "errorMsg": "Malformed request, missing parameters." }
                         "errorMsg": errorMsg }
        elif requestBody["requestType"] == "JOB_LIST":
            try:
                result = self.dbConn.listActiveJobs()
            response = { "responseType": "LST_DONE", "jobList": result }
            except Exception:
                errorMsg = "Database error."
                self.logger.exception(errorMsg)
                response = { "responseType": "ERROR",
                             "errorCode": 2,
                             "errorMsg": errorMsg }
                return response
            else:
                response = { "responseType": "LST_DONE", 
                             "jobList": result }
        elif requestBody["requestType"] == "JOB_BY_PHASE":
            jobPhase = requestBody["jobPhase"]
            try:
                result = self.dbConn.listJobsByPhase(jobPhase)
            response = { "responseType": "LST_BY_PHASE_DONE", "jobList": result }
            except Exception:
                errorMsg = "Database error."
                self.logger.exception(errorMsg)
                response = { "responseType": "ERROR",
                             "errorCode": 2,
                             "errorMsg": errorMsg }
                return response
            else:
                response = { "responseType": "LST_BY_PHASE_DONE", 
                             "jobList": result }
        elif requestBody["requestType"] == "JOB_INFO":
            jobId = requestBody["jobId"]
            try:
                if self.dbConn.jobExists(jobId):
                    result = self.dbConn.getJobInfo(jobId)
                response = { "responseType": "LST_INFO_DONE", "jobInfo": result }
                    response = { "responseType": "LST_INFO_DONE", 
                                 "jobInfo": result }
                else:
                    errorMsg = "Wrong job ID."
                    self.logger.error(errorMsg)
                    response = { "responseType": "ERROR",
                                 "errorCode": 3,
                                 "errorMsg": errorMsg }
            except Exception:
                errorMsg = "Database error."
                self.logger.exception(errorMsg)
                response = { "responseType": "ERROR",
                             "errorCode": 2,
                             "errorMsg": "Wrong job ID." }
                             "errorMsg": errorMsg }
                return response                
        elif requestBody["requestType"] == "JOB_RESULTS":
            jobId = requestBody["jobId"]
            try:
                if self.dbConn.jobExists(jobId):
                    result = self.dbConn.getJobResults(jobId)
                response = { "responseType": "LST_RESULTS_DONE", "jobResults": result }
                    response = { "responseType": "LST_RESULTS_DONE", 
                                 "jobResults": result }
                else:
                    errorMsg = "Wrong job ID."
                    self.logger.error(errorMsg)
                    response = { "responseType": "ERROR",
                                 "errorCode": 3,
                                 "errorMsg": errorMsg }
            except Exception:
                errorMsg = "Database error."
                self.logger.exception(errorMsg)
                response = { "responseType": "ERROR",
                             "errorCode": 2,
                             "errorMsg": "Wrong job ID." }
                             "errorMsg": errorMsg }
                return response
        else:
            errorMsg = "Unkown request type."
            self.logger.error(errorMsg)
            response = { "responseType": "ERROR",
                         "errorCode": 3,
                         "errorMsg": "Unkown request type." }
                         "errorCode": 4,
                         "errorMsg": errrorMsg }
        return response

    def run(self):
+69 −35
Original line number Diff line number Diff line
@@ -39,63 +39,97 @@ class StorageRPCServer(RedisRPCServer):
    def callback(self, requestBody):
        # 'requestType' attribute is mandatory
        if "requestType" not in requestBody:
            errorMsg = "Malformed request, missing parameters."
            response = { "responseType": "ERROR",
                         "errorCode": 1,
                         "errorMsg": "Malformed request, missing parameters." }

                         "errorMsg": errorMsg }
        elif requestBody["requestType"] == "STORAGE_ADD":
            storageType = requestBody["storageType"]
            storageBasePath = requestBody["basePath"]
            storageBaseUrl = requestBody["baseUrl"]
            storageHostname = requestBody["hostname"]

            if storageType != "portal":
                if not os.path.exists(storageBasePath):
                    errorMsg = "Base path doesn't exist."
                    self.logger.error(errorMsg)
                    response = { "responseType": "ERROR",
                                 "errorCode": 2,
                                 "errorMsg": "Base path doesn't exist."}
                                 "errorMsg": errorMsg }
                    return response

            try:
                result = self.dbConn.insertStorage(storageType,
                                                   storageBasePath,
                                                   storageBaseUrl,
                                                   storageHostname)

            except Exception:
                errorMsg = "Database error."
                self.logger.exception(errorMsg)
                response = { "responseType": "ERROR",
                             "errorCode": 3,
                             "errorMsg": errorMsg }
                return response
            else:
                if result:
                    response = { "responseType": "STORAGE_ADD_DONE" }
                else:
                    errorMsg = "Storage point already exists."
                    self.logger.error(errorMsg)
                    response = { "responseType": "ERROR",
                             "errorCode": 3,
                             "errorMsg": "Storage point already exists." }

                                 "errorCode": 4,
                                 "errorMsg": errorMsg }
        elif requestBody["requestType"] == "STORAGE_DEL_REQ":
            try:
                result = self.dbConn.getStorageList()

            except Exception:
                errorMsg = "Database error."
                self.logger.exception(errorMsg)
                response = { "responseType": "ERROR",
                             "errorCode": 3,
                             "errorMsg": errorMsg }
                return response
            else:
                response = { "responseType": "STORAGE_DEL_ACK",
                             "storageList": result }

        elif requestBody["requestType"] == "STORAGE_DEL_CON":
            storageId = requestBody["storageId"]
            try:
                result = self.dbConn.deleteStorage(storageId)

            except Exception:
                errorMsg = "Database error."
                self.logger.exception(errorMsg)
                response = { "responseType": "ERROR",
                             "errorCode": 3,
                             "errorMsg": errorMsg }
                return response
            else:
                if result:
                    response = { "responseType": "STORAGE_DEL_DONE" }
                else:
                    errorMsg = "This storage location contains some VOSpace nodes. Please, move those nodes to another location."
                    self.logger.error(errorMsg)
                    response = { "responseType": "ERROR",
                             "errorCode": 4,
                             "errorMsg": "This storage location contains some VOSpace nodes. Please, move those nodes to another location." }

                                 "errorCode": 5,
                                 "errorMsg": errorMsg }
        elif requestBody["requestType"] == "STORAGE_LST":
            try:
                result = self.dbConn.getStorageList()

            except Exception:
                errorMsg = "Database error."
                self.logger.exception(errorMsg)
                response = { "responseType": "ERROR",
                             "errorCode": 3,
                             "errorMsg": errorMsg }
                return response
            else:
                response = { "responseType": "STORAGE_LST_DONE",
                             "storageList": result }

        else:
            errorMsg = "Unkown request type."
            self.logger.error(errorMsg)
            response = { "responseType": "ERROR",
                         "errorCode": 5,
                         "errorMsg": "Unkown request type." }

                         "errorCode": 6,
                         "errorMsg": errorMsg }
        return response

    def run(self):