Commit 64a45e85 authored by Cristiano Urban's avatar Cristiano Urban
Browse files

DbConnector: switched to threaded connection pool.

parent 65d1d375
Loading
Loading
Loading
Loading
+2 −2
Original line number Diff line number Diff line
@@ -17,9 +17,9 @@ class AbortJobAMQPServer(AMQPServer):
        super(AbortJobAMQPServer, self).__init__(host, port, queue)      

    def execute_callback(self, requestBody):
        self.dbConn.connect()
        #self.dbConn.connect()
        # do something...
        self.dbConn.disconnect()
        #self.dbConn.disconnect()
        return 42
      
    def run(self):
+624 −416

File changed.

Preview size limit exceeded, changes collapsed.

+2 −2
Original line number Diff line number Diff line
@@ -20,9 +20,9 @@ class GetJobAMQPServer(AMQPServer):

    def execute_callback(self, requestBody):
        if "jobId" in requestBody:
            self.dbConn.connect()
            #self.dbConn.connect()
            dbResponse = self.dbConn.getJob(requestBody["jobId"])
            self.dbConn.disconnect()
            #self.dbConn.disconnect()
            print(f"Db response: {dbResponse}")
            return dbResponse
        else:
+8 −8
Original line number Diff line number Diff line
@@ -39,7 +39,7 @@ class ImportAMQPServer(AMQPServer):
        elif requestBody["requestType"] == "NODE_IMPORT":
            self.path = os.path.abspath(requestBody["path"])
            self.username = requestBody["userName"]
            self.dbConn.connect()
            #self.dbConn.connect()
            userInDb = self.dbConn.userExists(self.username)
            userInfo = self.systemUtils.userInfo(self.username)
            out = open("import_amqp_server_log.txt", "a")
@@ -48,7 +48,7 @@ class ImportAMQPServer(AMQPServer):
                response = { "responseType": "ERROR",
                             "errorCode": 2,
                             "errorMsg": "The user does not exist or is not registered in the database." }
                self.dbConn.disconnect()
                #self.dbConn.disconnect()
                return response
            
            userId = self.dbConn.getRapId(self.username)
@@ -61,32 +61,32 @@ class ImportAMQPServer(AMQPServer):
                response = { "responseType": "ERROR",
                             "errorCode": 3,
                             "errorMsg": "Invalid storage mount point." }
                self.dbConn.disconnect()
                #self.dbConn.disconnect()
                return response
            
            if not os.path.exists(self.path):
                response = { "responseType": "ERROR",
                             "errorCode": 4,
                             "errorMsg": "Path not found." }
                self.dbConn.disconnect()
                #self.dbConn.disconnect()
                return response
            elif not os.path.isdir(self.path):
                response = { "responseType": "ERROR",
                             "errorCode": 5,
                             "errorMsg": "Directory path expected." }
                self.dbConn.disconnect()
                #self.dbConn.disconnect()
                return response
            elif self.username not in self.path:
                response = { "responseType": "ERROR",
                             "errorCode": 6,
                             "errorMsg": "Directory path does not contain the username." }
                self.dbConn.disconnect()
                #self.dbConn.disconnect()
                return response
            elif os.path.dirname(self.path) != pathPrefix + '/' + self.username:
                response = { "responseType": "ERROR",
                             "errorCode": 7,
                             "errorMsg": "Invalid path, directory must be located in " + pathPrefix + '/' + self.username  }
                self.dbConn.disconnect()
                #self.dbConn.disconnect()
                return response            
            else:
                if storageType == "cold":
@@ -163,7 +163,7 @@ class ImportAMQPServer(AMQPServer):
                                self.dbConn.setAsyncTrans(vospacePath, True)
                                self.dbConn.setSticky(vospacePath, True)

            self.dbConn.disconnect()
            #self.dbConn.disconnect()
            # add a counter to track the number of nodes (files and dirs) + log file
            response = { "responseType": "IMPORT_DONE" }
        else:
+8 −8
Original line number Diff line number Diff line
@@ -27,19 +27,19 @@ class JobAMQPServer(AMQPServer):
                         "errorCode": 1, 
                         "errorMsg": "Malformed request, missing parameters." }
        elif requestBody["requestType"] == "JOB_LIST":
            self.dbConn.connect()
            #self.dbConn.connect()
            result = self.dbConn.listActiveJobs()
            self.dbConn.disconnect()
            #self.dbConn.disconnect()
            response = { "responseType": "LST_DONE", "jobList": result }
        elif requestBody["requestType"] == "JOB_BY_PHASE":
            self.jobPhase = requestBody["jobPhase"]
            self.dbConn.connect()
            #self.dbConn.connect()
            result = self.dbConn.listJobsByPhase(self.jobPhase)
            self.dbConn.disconnect()
            #self.dbConn.disconnect()
            response = { "responseType": "LST_BY_PHASE_DONE", "jobList": result }
        elif requestBody["requestType"] == "JOB_INFO":
            self.jobId = requestBody["jobId"]
            self.dbConn.connect()
            #self.dbConn.connect()
            if self.dbConn.jobExists(self.jobId):
                result = self.dbConn.getJobInfo(self.jobId)
                response = { "responseType": "LST_INFO_DONE", "jobInfo": result }
@@ -47,10 +47,10 @@ class JobAMQPServer(AMQPServer):
                response = { "responseType": "ERROR",
                             "errorCode": 2,
                             "errorMsg": "Wrong job ID." }
            self.dbConn.disconnect()
            #self.dbConn.disconnect()
        elif requestBody["requestType"] == "JOB_RESULTS":
            self.jobId = requestBody["jobId"]
            self.dbConn.connect()
            #self.dbConn.connect()
            if self.dbConn.jobExists(self.jobId):
                result = self.dbConn.getJobResults(self.jobId)
                response = { "responseType": "LST_RESULTS_DONE", "jobResults": result }
@@ -58,7 +58,7 @@ class JobAMQPServer(AMQPServer):
                response = { "responseType": "ERROR",
                             "errorCode": 2,
                             "errorMsg": "Wrong job ID." }
            self.dbConn.disconnect()            
            #self.dbConn.disconnect()            
        else:
            response = { "responseType": "ERROR",
                         "errorCode": 3,
Loading