Commit 1627ba2b authored by Cristiano Urban's avatar Cristiano Urban
Browse files

Added 'RetrieveCleaner' class.

parent d91e0599
Loading
Loading
Loading
Loading
+3 −0
Original line number Diff line number Diff line
@@ -9,6 +9,7 @@ from retrieve_preprocessor import RetrievePreprocessor
from store_preprocessor import StorePreprocessor
from retrieve_executor import RetrieveExecutor
from store_executor import StoreExecutor
from retrieve_cleaner import RetrieveCleaner


class JobScheduler(object):
@@ -29,6 +30,8 @@ class JobScheduler(object):
            self.taskExecutorList.append(RetrieveExecutor())
        elif taskExecType == "store_executor":
            self.taskExecutorList.append(StoreExecutor())
        elif taskExecType == "retrieve_cleaner":
            self.taskExecutorList.append(RetrieveCleaner())
        else:
            sys.exit(f"FATAL: unknown server type {taskExecType}.")

+99 −0
Original line number Diff line number Diff line
#!/usr/bin/env python

import datetime
import json
import logging
import os
import shutil

from config import Config
from db_connector import DbConnector
from mailer import Mailer
from redis_log_handler import RedisLogHandler
from task_executor import TaskExecutor


class RetrieveCleaner(TaskExecutor):

    def __init__(self):
        self.type = "retrieve_cleaner"
        config = Config("/etc/vos_ts/vos_ts.conf")
        params = config.loadSection("file_catalog")
        self.dbConn = DbConnector(params["user"],
                                  params["password"],
                                  params["host"],
                                  params.getint("port"),
                                  params["db"],
                                  1,
                                  1)
        params = config.loadSection("async_recall")
        self.days = params.getint("days")
        self.seconds = params.getint("hours") * 3600 + params.getint("minutes") * 60 + params.getint("seconds")
        params = config.loadSection("mail")
        self.adminEmail = params["admin_email"]
        params = config.loadSection("logging")
        self.logger = logging.getLogger(__name__)
        logLevel = "logging." + params["log_level"]
        logFormat = params["log_format"]
        logFormatter = logging.Formatter(logFormat)
        self.logger.setLevel(eval(logLevel))
        redisLogHandler = RedisLogHandler()
        logStreamHandler = logging.StreamHandler()
        logStreamHandler.setFormatter(logFormatter)
        redisLogHandler.setFormatter(logFormatter)
        self.logger.addHandler(redisLogHandler)
        self.logger.addHandler(logStreamHandler)
        self.jobObj = None
        self.nodeList = []
        super(RetrieveCleaner, self).__init__()
        
    def execute(self):        
        # Avoid "all zero" condition
        if self.days <= 0 and self.seconds < 30:
            self.days = 0
            self.seconds = 30
        elif self.seconds >= 86400:
            self.days += self.seconds // 86400
            self.seconds = self.seconds % 86400
        
        jobEndTime = datetime.datetime.fromisoformat(self.jobObj.endTime)
        currentTime = datetime.datetime.now()
        delta = currentTime - jobEndTime
        if delta.days >= self.days and delta.seconds > self.seconds:
            # while dim lists > 0:
            #   loop over the two lists (nodeList and destPathList):
            #     if the vospace node is not busy: 
            #       set 'async_trans' = True
            #       delete the file/dir in the 'retrieve' directory
            #       remove the corresponding elements on the two lists
            numNodes = len(self.nodeList)
            while numNodes > 0:                    
                i = 0
                while i < numNodes:
                    vospacePath = self.nodeList[i]
                    destPath = self.destPathList[i]
                    if not self.dbConn.nodeIsBusy(vospacePath):
                        self.dbConn.setAsyncTrans(vospacePath, True)
                        if os.path.isfile(destPath):
                            os.remove(destPath)
                        else:
                            shutil.rmtree(destPath)
                        self.nodeList.pop(i)
                        self.destPathList.pop(i)
                        numNodes -= 1
                        i = 0
            self.destQueue.insertJob(self.jobObj)
            self.srcQueue.extractJob()
            self.logger.info(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}")
        
    def run(self):
        self.logger.info("Starting retrieve cleaner...")
        self.setSourceQueueName("read_terminated")
        self.setDestinationQueueName("read_clean")
        while True:
            self.wait()
            if self.destQueue.len() < self.maxReadyJobs and self.srcQueue.len() > 0:
                self.jobObj = self.srcQueue.getJob()
                self.nodeList = self.jobObj.jobInfo["nodeList"].copy()
                self.destPathList = self.jobObj.jobInfo["destPathList"].copy()
                self.execute()
+1 −0
Original line number Diff line number Diff line
@@ -43,6 +43,7 @@ class TransferService(object):
        self.jobScheduler.addTaskExecutor("store_preprocessor")
        self.jobScheduler.addTaskExecutor("retrieve_executor")
        self.jobScheduler.addTaskExecutor("store_executor")
        self.jobScheduler.addTaskExecutor("retrieve_cleaner")

        # Log listener
        self.logListener = LogListener()