Commit b0c19c23 authored by Cristiano Urban's avatar Cristiano Urban
Browse files

Added code to handle 'pending' and 'terminated' queues limits.

parent 2c69073c
Loading
Loading
Loading
Loading
+15 −7
Original line number Diff line number Diff line
@@ -34,6 +34,8 @@ class DataAMQPServer(AMQPServer):
                                  16)
        self.params = config.loadSection("transfer_node")
        self.storageStorePath = self.params["store_path"]
        self.params = config.loadSection("scheduling")
        self.maxPendingJobs = self.params.getint("max_pending_jobs")
        self.pendingQueueWrite = JobQueue("write_pending")
        self.systemUtils = SystemUtils()
        super(DataAMQPServer, self).__init__(host, port, queue)
@@ -41,7 +43,13 @@ class DataAMQPServer(AMQPServer):
    def execute_callback(self, requestBody):
        # 'requestType' and 'userName' attributes are mandatory
        if "requestType" not in requestBody or "userName" not in requestBody:
            response = { "errorCode": 1, "errorMsg": "Malformed request, missing parameters." }
            response = { "responseType":"ERROR", 
                         "errorCode": 1, 
                         "errorMsg": "Malformed request, missing parameters." }
        elif self.pendingQueueWrite.len() >= self.maxPendingJobs:
            response = { "responseType": "ERROR",
                         "errorCode": 2, 
                         "errorMsg": "Pending queue is full, please, retry later." }
        elif requestBody["requestType"] == "CSTORE" or requestBody["requestType"] == "HSTORE":
            user = requestBody["userName"]            
            userInDb = self.dbConn.userExists(user)
@@ -55,7 +63,7 @@ class DataAMQPServer(AMQPServer):
            # Check if the user exists on the transfer node and is registered in the database
            if not userInfo or not userInDb:
                response = { "responseType": "ERROR",
                             "errorCode": 2,
                             "errorCode": 3,
                             "errorMsg": "The user does not exist on the transfer node or is not registered in the database." }
            else:
                uid = os.stat(folderPath).st_uid
@@ -71,15 +79,15 @@ class DataAMQPServer(AMQPServer):
                                     "storageList": storageList }
                    elif os.access(folderPath, os.W_OK) and not os.listdir(folderPath):
                        response = { "responseType": "ERROR",
                                     "errorCode": 3,
                                     "errorCode": 4,
                                     "errorMsg": "The 'store' directory on the transfer node is empty." }
                    else:
                        response = { "responseType": "ERROR",
                                     "errorCode": 4,
                                     "errorCode": 5,
                                     "errorMsg": "Service busy. Please, retry later." }
                else:
                    response = { "responseType": "ERROR",
                                 "errorCode": 5,
                                 "errorCode": 6,
                                 "errorMsg": "Permission denied." }
        elif requestBody["requestType"] == "STORE_CON":
            username = requestBody["userName"]
@@ -95,14 +103,14 @@ class DataAMQPServer(AMQPServer):
            self.pendingQueueWrite.insertJob(job)
            if "error" in dbResponse:
                response = { "responseType": "ERROR",
                             "errorCode": 6,
                             "errorCode": 7,
                             "errorMsg": "Job creation failed." }
            else:
                response = { "responseType": "STORE_RUN", 
                             "jobId": job.jobId }
        else:
            response = { "responseType": "ERROR",
                         "errorCode": 7,
                         "errorCode": 8,
                         "errorMsg": "Unkown request type." }

        return response
+5 −0
Original line number Diff line number Diff line
@@ -53,6 +53,8 @@ class RetrieveExecutor(TaskExecutor):
                                  1)
        self.params = config.loadSection("transfer")
        self.maxBlockSize = self.systemUtils.convertSizeToBytes(self.params["block_size"])
        self.params = config.loadSection("scheduling")
        self.maxTerminatedJobs = self.params.getint("max_terminated_jobs")
        self.storageType = None
        self.jobObj = None
        self.jobId = None
@@ -279,6 +281,9 @@ class RetrieveExecutor(TaskExecutor):
                    print(f"nodeList = {self.nodeList}")
                else:
                    sys.exit("Failed to retrieve data!")
                if self.destQueue.len() == self.maxTerminatedJobs:
                    self.destQueue.extractJob()
                self.destQueue.insertJob(self.jobObj)
                self.srcQueue.extractJob()
                    
                print(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}")                                
+21 −14
Original line number Diff line number Diff line
@@ -22,10 +22,17 @@ class StartJobAMQPServer(AMQPServer):
                                  self.params["db"],
                                  8,
                                  16)
        self.params = config.loadSection("scheduling")
        self.maxPendingJobs = self.params.getint("max_pending_jobs")
        self.pendingQueueRead = JobQueue("read_pending")
        super(StartJobAMQPServer, self).__init__(host, port, queue)      

    def execute_callback(self, requestBody):
        if self.pendingQueueRead.len() >= self.maxPendingJobs:
            response = { "responseType": "ERROR",
                         "errorCode": 1, 
                         "errorMsg": "Pending queue is full, please, retry later." }
        else:
            out = open("start_job_amqp_server_log.txt", "a")
            out.write(json.dumps(requestBody))
            out.close()
@@ -36,10 +43,10 @@ class StartJobAMQPServer(AMQPServer):
            job.setInfo(requestBody["jobInfo"])
            job.setOwnerId(requestBody["ownerId"])
            self.dbConn.insertJob(job)
        dbResponse = self.dbConn.getJob(job.jobId)
        print(f"Db response: {dbResponse}")
            response = self.dbConn.getJob(job.jobId)
            print(f"Db response: {response}")
            self.pendingQueueRead.insertJob(job)
        return dbResponse
        return response
      
    def run(self):
        print(f"Starting AMQP server of type {self.type}...")
+4 −0
Original line number Diff line number Diff line
@@ -31,6 +31,8 @@ class StoreExecutor(TaskExecutor):
                                  self.params["db"],
                                  1,
                                  1)
        self.params = config.loadSection("scheduling")
        self.maxTerminatedJobs = self.params.getint("max_terminated_jobs")
        self.jobObj = None
        self.jobId = None
        self.username = None
@@ -110,6 +112,8 @@ class StoreExecutor(TaskExecutor):
                self.copyData()
                self.cleanup()
                self.update()
                if self.destQueue.len() == self.maxTerminatedJobs:
                    self.destQueue.extractJob()
                self.destQueue.insertJob(self.jobObj)
                self.srcQueue.extractJob()
                print(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}")