Commit 89c664d4 authored by Cristiano Urban's avatar Cristiano Urban
Browse files

Added 'GroupRwExecutor' class.

parent 431580e1
Loading
Loading
Loading
Loading
+91 −0
Original line number Diff line number Diff line
#!/usr/bin/env python

import logging
import os

from config import Config
from datetime import datetime as dt
from db_connector import DbConnector
from mailer import Mailer
from node import Node
from system_utils import SystemUtils
from tabulate import tabulate
from task_executor import TaskExecutor


class GroupRwExecutor(TaskExecutor):

    def __init__(self):
        self.type = "group_rw_executor"
        config = Config("/etc/vos_ts/vos_ts.conf")
        params = config.loadSection("file_catalog")
        self.dbConn = DbConnector(params["user"],
                                  params["password"],
                                  params["host"],
                                  params.getint("port"),
                                  params["db"],
                                  1,
                                  1)
        params = config.loadSection("logging")
        self.logger = logging.getLogger("GroupRwExecutor")
        logLevel = "logging." + params["log_level"]
        logDir = params["log_dir"]
        logFile = logDir + '/' + "group_rw_executor.log"
        self.logger.setLevel(eval(logLevel))
        logFileHandler = logging.FileHandler(logFile)
        logStreamHandler = logging.StreamHandler()
        logFormatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
        logFileHandler.setFormatter(logFormatter)
        logStreamHandler.setFormatter(logFormatter)
        self.logger.addHandler(logFileHandler)
        self.logger.addHandler(logStreamHandler)
        self.resDir = params["res_dir"]
        self.systemUtils = SystemUtils()
        self.jobObj = None
        self.jobId = None
        self.requestType = None
        self.vospacePath = None
        self.groupName = None
        super(GroupRwExecutor, self).__init__()

    def updateGroupRw(self):
        """This method adds/removes groups to group_read and group_write."""

        self.dbConn.setPhase(self.jobId, "EXECUTING")
        self.dbConn.setStartTime(self.jobId)
        
        if self.requestType == "GRPR_ADD":
            self.dbConn.updateGroupRead(self.groupName, [], self.vospacePath)
        elif self.requestType == "GRPR_DEL":
            self.dbConn.updateGroupRead([], self.groupName, self.vospacePath)
        elif self.requestType == "GRPW_ADD":
            self.dbConn.updateGroupWrite(self.groupName, [], self.vospacePath)
        elif self.requestType == "GRPW_DEL":
            self.dbConn.updateGroupWrite([], self.groupName, self.vospacePath)

    def cleanup(self):
        self.jobObj.setPhase("COMPLETED")
        self.dbConn.setPhase(self.jobId, "COMPLETED")
        self.dbConn.setEndTime(self.jobId)

    def run(self):
        self.logger.info("Starting group_rw executor...")
        self.setSourceQueueName("group_rw_ready")
        self.setDestinationQueueName("group_rw_terminated")
        while True:
            self.wait()
            if self.srcQueue.len() > 0:
                self.jobObj = self.srcQueue.getJob()
                self.jobId = self.jobObj.jobId
                self.userId = self.jobObj.ownerId
                self.requestType = self.jobObj.jobInfo["requestType"]
                self.vospacePath = self.jobObj.jobInfo["vospacePath"]
                self.groupName = [ self.jobObj.jobInfo["groupName"] ]
                self.updateGroupRw()
                self.cleanup()
                if self.destQueue.len() >= self.maxTerminatedJobs:
                    self.destQueue.extractJob()
                self.destQueue.insertJob(self.jobObj)
                self.srcQueue.extractJob()
                self.logger.info(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}")
+4 −1
Original line number Diff line number Diff line
@@ -3,6 +3,7 @@
import time
import sys

from group_rw_executor import GroupRwExecutor
from import_executor import ImportExecutor
from retrieve_preprocessor import RetrievePreprocessor
from store_preprocessor import StorePreprocessor
@@ -16,8 +17,10 @@ class JobScheduler(object):
        self.taskExecutorList = []
        
    def addTaskExecutor(self, taskExecType):
        if taskExecType == "import_executor":
        if taskExecType == "group_rw_executor":
            self.taskExecutorList.append(ImportExecutor())
        elif taskExecType == "import_executor":
            self.taskExecutorList.append(GroupRwExecutor())
        elif taskExecType == "retrieve_preprocessor":
            self.taskExecutorList.append(RetrievePreprocessor())
        elif taskExecType == "store_preprocessor":
+1 −0
Original line number Diff line number Diff line
@@ -36,6 +36,7 @@ class TransferService(object):
        self.cliHandler.addRPCServer("storage", "storage_queue")
        
        #
        self.jobScheduler.addTaskExecutor("group_rw_executor")
        self.jobScheduler.addTaskExecutor("import_executor")
        self.jobScheduler.addTaskExecutor("retrieve_preprocessor")
        self.jobScheduler.addTaskExecutor("store_preprocessor")