Commit 1c8f4dd7 authored by Cristiano Urban's avatar Cristiano Urban
Browse files

Fixed time delta calculation + improved check on stalled jobs.

parent 1fc2fb23
Loading
Loading
Loading
Loading
+15 −11
Original line number Diff line number Diff line
@@ -29,6 +29,7 @@ class Monitor(TaskExecutor):
                                  1,
                                  1)
        params = config.loadSection("monitoring")
        self.delayTime = params.getint("delay_time")
        self.maxJobDuration = json.loads(params["max_job_duration"])
        params = config.loadSection("mail")
        self.adminEmail = params["admin_email"]
@@ -44,7 +45,8 @@ class Monitor(TaskExecutor):
        self.jobTimeoutDays = self.maxJobDuration["days"]
        self.jobTimeoutSeconds = self.maxJobDuration["hours"] * 3600 + self.maxJobDuration["minutes"] * 60 + self.maxJobDuration["seconds"]
        self.executingJobList = []
        self.longJobIds = []
        self.longJobs = []
        self.notifiedJobs = []
        super(Monitor, self).__init__()
    
    def execute(self):
@@ -54,31 +56,33 @@ class Monitor(TaskExecutor):
            except Exception:
                self.logger.exception("FATAL: unable to obtain info about executing jobs.")
                return False
            now = datetime.datetime.now().isoformat()
            now = datetime.datetime.now()
            for job in self.executingJobList:
                delta = now - job["start_time"]
                self.logger.debug(f"delta = {delta}")
                delta = now - datetime.datetime.fromisoformat(job["start_time"])
                if delta.days >= self.jobTimeoutDays and delta.seconds > self.jobTimeoutSeconds:
                    self.longJobIds.append(job["job_id"])
            if self.longJobIds:
                # Send e-mail notification
                    self.longJobs.append(job["job_id"])
            # Notify the user and write the log file only if new jobs get stuck
            if self.longJobs and self.longJobs != self.notifiedJobs:
                m = Mailer(self.logger)
                m.addRecipient(self.adminEmail)
                msg = f"The following jobs are taking too long to complete, jobIDs = {self.longJobIds}"
                msg = f"The following jobs are taking too long to complete, jobIDs = {self.longJobs}"
                self.logger.warning(msg)
                m.setMessage(f"VOSpace job alert notification", msg)
                m.send()
                self.notifiedJobs.clear()
                self.notifiedJobs = self.longJobs.copy()
            return True
        except Exception:
            self.logger.exception("Test")
            self.logger.exception("FATAL: something went wrong during the execution phase.")
            return False
        
    def cleanup(self):
        self.executingJobList.clear()
        self.longJobIds.clear()
        self.longJobs.clear()
    
    def run(self):
        self.logger.info("Starting monitor...")
        while True:
            self.wait(timeout = 10)
            self.wait(delayTime = self.delayTime)
            self.execute()
            self.cleanup()