Commit 6278d6f2 authored by Cristiano Urban's avatar Cristiano Urban
Browse files

Added basic monitoring process (only for jobs for now).

parent ca3e3f7a
Loading
Loading
Loading
Loading
+84 −0
Original line number Diff line number Diff line
#!/usr/bin/env python
#
# This file is part of vospace-transfer-service
# Copyright (C) 2021 Istituto Nazionale di Astrofisica
# SPDX-License-Identifier: GPL-3.0-or-later
#

import datetime
import json
import logging

from config import Config
from db_connector import DbConnector
from mailer import Mailer
from redis_log_handler import RedisLogHandler
from task_executor import TaskExecutor


class Monitor(TaskExecutor):

    def __init__(self):
        config = Config("/etc/vos_ts/vos_ts.conf")
        params = config.loadSection("file_catalog")
        self.dbConn = DbConnector(params["user"],
                                  params["password"],
                                  params["host"],
                                  params.getint("port"),
                                  params["db"],
                                  1,
                                  1)
        params = config.loadSection("monitoring")
        self.maxJobDuration = json.loads(params["max_job_duration"])
        params = config.loadSection("mail")
        self.adminEmail = params["admin_email"]
        params = config.loadSection("logging")
        self.logger = logging.getLogger(__name__)
        logLevel = "logging." + params["log_level"]
        logFormat = params["log_format"]
        logFormatter = logging.Formatter(logFormat)
        self.logger.setLevel(eval(logLevel))
        redisLogHandler = RedisLogHandler()
        redisLogHandler.setFormatter(logFormatter)
        self.logger.addHandler(redisLogHandler)
        self.jobTimeoutDays = self.maxJobDuration["days"]
        self.jobTimeoutSeconds = self.maxJobDuration["hours"] * 3600 + self.maxJobDuration["minutes"] * 60 + self.maxJobDuration["seconds"]
        self.executingJobList = []
        self.longJobIds = []
        super(Monitor, self).__init__()
    
    def execute(self):
        try:
            try:
                self.executingJobList = self.dbConn.getJobsByPhase("EXECUTING")
            except Exception:
                self.logger.exception("FATAL: unable to obtain info about executing jobs.")
                return False
            now = datetime.datetime.now().isoformat()
            for job in self.executingJobList:
                delta = now - job["start_time"]
                self.logger.debug(f"delta = {delta}")
                if delta.days >= self.jobTimeoutDays and delta.seconds > self.jobTimeoutSeconds:
                    self.longJobIds.append(job["job_id"])
            if self.longJobIds:
                # Send e-mail notification
                m = Mailer(self.logger)
                m.addRecipient(self.adminEmail)
                msg = f"The following jobs are taking too long to complete, jobIDs = {self.longJobIds}"
                m.setMessage(f"VOSpace job alert notification", msg)
                m.send()
            return True
        except Exception:
            self.logger.exception("Test")
            return False
        
    def cleanup(self):
        self.executingJobList.clear()
        self.longJobIds.clear()
    
    def run(self):
        self.logger.info("Starting monitor...")
        while True:
            self.wait(timeout = 10)
            self.execute()
            self.cleanup()
+5 −0
Original line number Diff line number Diff line
@@ -13,6 +13,7 @@ from config import Config
from cli_handler import CliHandler
from job_scheduler import JobScheduler
from log_listener import LogListener
from monitor import Monitor
from redis_log_handler import RedisLogHandler
from vospace_rest_handler import VOSpaceRestHandler

@@ -66,12 +67,16 @@ class TransferService(object):
        self.jobScheduler.addTaskExecutor("store_executor")
        self.jobScheduler.addTaskExecutor("retrieve_cleaner")

        # Monitor
        self.monitor = Monitor()

        # Log listener
        self.logListener = LogListener()

    def start(self):
        #if "SUDO_UID" in os.environ.keys():
        # Startup
        self.monitor.start()
        self.logListener.start()
        self.jobScheduler.start()
        self.vosRestHandler.start()