Commit 65975bb2 authored by Cristiano Urban's avatar Cristiano Urban
Browse files

Added job phase check (ABORTED) + minor changes.

parent 4f7bc47f
Loading
Loading
Loading
Loading
+22 −12
Original line number Diff line number Diff line
@@ -53,7 +53,7 @@ class RetrievePreprocessor(TaskExecutor):
                self.nodeList.append(target + '/' + el["value"])
        self.jobObj.jobInfo["nodeList"] = self.nodeList.copy()
        
    def update(self):
    def update(self, status):
        # Send e-mail notification
        m = Mailer(self.logger)
                
@@ -64,20 +64,19 @@ class RetrievePreprocessor(TaskExecutor):
        
        msg = f"""
        Dear user,
        your job has been QUEUED.
        your job has been {status}.

        Job ID: {self.jobObj.jobId}
        Job type: {self.jobObj.type}
        Owner ID: {self.jobObj.ownerId}

        You will be notified by email once the job is completed.

        """
        m.setMessage("VOSpace data retrieve notification: Job QUEUED", msg)
        m.setMessage(f"VOSpace data retrieve notification: Job {status}", msg)
        m.send()

    def cleanup(self):
        self.nodeList.clear()
        self.setDestinationQueueName("read_ready")

    def run(self):
        self.logger.info("Starting retrieve preprocessor...")
@@ -89,12 +88,23 @@ class RetrievePreprocessor(TaskExecutor):
                srcQueueLen = self.srcQueue.len()
                destQueueLen = self.destQueue.len()
            except:
                self.logger.exception("Cache error: failed to retrieve queue length.")
                self.logger.exception("Cache error: unable to retrieve queue length.")
            else:
                if destQueueLen < self.maxReadyJobs and srcQueueLen > 0:
                    self.jobObj = self.srcQueue.getJob()
                    jobId = self.jobObj.jobId
                    self.execute()
                    self.update()
                    try:
                        jobPhase = self.dbConn.getPhase(jobId)
                    except Exception:
                        self.logger.exception(f"Database error: unable to retrieve job phase for job {jobId}.")
                    else:
                        if jobPhase == "ABORTED":
                            self.jobObj.setPhase("ABORTED")
                            self.setDestinationQueueName("read_terminated")
                            self.update("ABORTED")
                        else:
                            self.update("QUEUED")
                        try:
                            self.destQueue.insertJob(self.jobObj)
                            self.srcQueue.extractJob()