Commit 92254cc5 authored by Cristiano Urban's avatar Cristiano Urban
Browse files

Improved full pending queue error handling for pullToVoSpace job type.

parent c407e135
Loading
Loading
Loading
Loading
+15 −6
Original line number Diff line number Diff line
@@ -472,23 +472,30 @@ class DbConnector(object):
                                    start_time,
                                    end_time,
                                    job_info,
                                    results)
                    VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
                                    results,
                                    error_message,
                                    error_type)
                    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                    ON CONFLICT (job_id)
                    DO UPDATE SET
                    (owner_id,
                     job_type, phase,
                     job_type, 
                     phase,
                     start_time,
                     end_time,
                     job_info,
                     results)
                     results,
                     error_message,
                     error_type)
                    = (EXCLUDED.owner_id,
                       EXCLUDED.job_type,
                       EXCLUDED.phase,
                       EXCLUDED.start_time,
                       EXCLUDED.end_time,
                       EXCLUDED.job_info,
                       EXCLUDED.results);
                       EXCLUDED.results,
                       EXCLUDED.error_message,
                       EXCLUDED.error_type);
                    """,
                    (jobObj.jobId,
                     jobObj.ownerId,
@@ -497,7 +504,9 @@ class DbConnector(object):
                     jobObj.startTime,
                     jobObj.endTime,
                     json.dumps(jobObj.jobInfo),
                     json.dumps(jobObj.results),))
                     json.dumps(jobObj.results),
                     jobObj.errorMessage,
                     jobObj.errorType,))
                conn.commit()
            except Exception as e:
                if not conn.closed:
+8 −1
Original line number Diff line number Diff line
@@ -18,7 +18,8 @@ class Job(object):
        self.destruction = None
        self.parameters = None
        self.results = None
        self.errorSummary = None
        self.errorMessage = None
        self.errorType = None
        self.jobInfo = None
    
    """
@@ -45,3 +46,9 @@ class Job(object):

    def setInfo(self, jobInfo):
        self.jobInfo = jobInfo
        
    def setErrorMessage(self, errMsg):
        self.errorMessage = errMsg
        
    def setErrorType(self, errType):
        self.errorType = errType
+24 −13
Original line number Diff line number Diff line
@@ -24,28 +24,39 @@ class StartJobAMQPServer(AMQPServer):
                                  16)
        self.params = config.loadSection("scheduling")
        self.maxPendingJobs = self.params.getint("max_pending_jobs")
        self.maxTerminatedJobs = self.params.getint("max_terminated_jobs")
        self.pendingQueueRead = JobQueue("read_pending")
        self.terminatedQueueRead = JobQueue("read_terminated")
        super(StartJobAMQPServer, self).__init__(host, port, queue)      

    def execute_callback(self, requestBody):
        if self.pendingQueueRead.len() >= self.maxPendingJobs:
            response = { "responseType": "ERROR",
                         "errorCode": 1, 
                         "errorMsg": "Pending queue is full, please, retry later." }
        else:
        # debug block...
        out = open("start_job_amqp_server_log.txt", "a")
        out.write(json.dumps(requestBody))
        out.close()
        
        job = Job()
        job.setId(requestBody["jobId"])
        job.setType(requestBody["jobInfo"]["transfer"]["direction"])        
            job.setPhase(requestBody["phase"])
        job.setInfo(requestBody["jobInfo"])
        job.setOwnerId(requestBody["ownerId"])
        if self.pendingQueueRead.len() >= self.maxPendingJobs:
            job.setPhase("ERROR")
            job.setErrorType("transient")
            job.setErrorMessage("Pending queue is full, please, retry later.")
            self.dbConn.insertJob(job)            
            if self.terminatedQueueRead.len() == self.maxTerminatedJobs:
                    self.terminatedQueueRead.extractJob()
            self.terminatedQueueRead.insertJob(job)            
        else:
            job.setPhase(requestBody["phase"])
            self.dbConn.insertJob(job)
            self.pendingQueueRead.insertJob(job)
        response = self.dbConn.getJob(job.jobId)
        
        # debug block...
        print(f"Db response: {response}")
            self.pendingQueueRead.insertJob(job)
        
        return response
      
    def run(self):