Commit ca746ed7 authored by Cristiano Urban's avatar Cristiano Urban
Browse files

Check if user is present in 'users' table of the VOSpace database and handle job accordingly.

parent 17ced41c
Loading
Loading
Loading
Loading
Loading
+25 −10
Original line number Diff line number Diff line
@@ -5,6 +5,7 @@
# SPDX-License-Identifier: GPL-3.0-or-later
#

import datetime
import json
import logging
import os
@@ -99,17 +100,30 @@ class RetrievePreprocessor(TaskExecutor):
                    jobId = self.jobObj.jobId
                    self.execute()
                    try:
                        jobPhase = self.dbConn.getJobPhase(jobId)
                        # Check if the user name is present into the 'users' table of the VOSpace database
                        if not self.dbConn.getUserName(self.jobObj.ownerId):
                            self.jobObj.setPhase("ERROR")
                            self.jobObj.setEndTime(datetime.datetime.now().isoformat())
                            self.jobObj.setErrorType("transient")
                            self.jobObj.setErrorMessage("The user is registered in the authentication system (RAP), but is not present into the 'users' table of the VOSpace database.")
                            self.dbConn.insertJob(self.jobObj)
                            self.logger.info("Job phase updated to ERROR.")
                            errorFlag = True
                        else:
                            self.logger.info("Job phase updated to QUEUED.")
                            self.update("QUEUED")
                            errorFlag = False
                    except Exception:
                        self.logger.exception(f"Database error: unable to retrieve job phase for job {jobId}.")
                        self.logger.exception(f"Database error: unable to retrieve user name using the job ownerID {self.jobObj.ownerId}.")
                    else:
                        if jobPhase == "ABORTED":
                            self.jobObj.setPhase("ABORTED")
                        try:
                            if errorFlag:
                                self.setDestinationQueueName("read_terminated")
                            self.update("ABORTED")
                                if self.destQueue.len() >= self.maxTerminatedJobs:
                                    self.destQueue.extractJob()
                                self.destQueue.insertJob(self.jobObj)
                                self.srcQueue.extractJob()
                            else:
                            self.update("QUEUED")
                        try:
                                self.destQueue.insertJob(self.jobObj)
                                self.srcQueue.extractJob()
                        except Exception:
@@ -117,4 +131,5 @@ class RetrievePreprocessor(TaskExecutor):
                        else:                        
                            self.logger.info(f"Job {self.jobObj.jobId} MOVED from '{self.srcQueue.name()}' to '{self.destQueue.name()}'")
            finally:
                self.setDestinationQueueName("read_ready")
                self.cleanup()