Commit 6d5c7eb3 authored by Cristiano Urban's avatar Cristiano Urban
Browse files

Added DOI dummy classess + basic test RPC client.

parent c60d6ee5
Loading
Loading
Loading
Loading
+58 −0
Original line number Diff line number Diff line
import sys

from redis_rpc_client import RedisRPCClient
from config import Config
from tabulate import tabulate


class TestDoiRPCClient(RedisRPCClient):

    def __init__(self):
        config = Config("/etc/vos_cli/vos_cli.conf")
        params = config.loadSection("server")
        self.host = params["host"]
        self.port = params.getint("port")
        self.db = params.getint("db")
        params = config.loadSection("test_doi")
        self.rpcQueue = params["rpc_queue"]
        super(TestDoiRPCClient, self).__init__(self.host, self.port, self.db, self.rpcQueue)

    def test_request(self):
        request = {
          "req_id": "5e6ff54d096246438d483c90c81512f9",
          "job": {
            "jobId": "5e6ff54d096246438d483c90c81512f9",
            "runId": None,
            "ownerId": "3354",
            "phase": "QUEUED",
            "quote": None,
            "creationTime": None,
            "startTime": None,
            "endTime": None,
            "executionDuration": 0,
            "destruction": None,
            "parameters": None,
            "results": [],
            "errorSummary": None,
            "jobInfo": {
              "transfer": {
                "target": "vos://example.com!vospace/cristiano.urban/mydir",
                "direction": "pullToVoSpace",
                "view": {
                  "param": [],
                  "uri": "ivo://ia2.inaf.it/vospace/views#async-recall",
                  "original": True
                },
                "protocols": [],
                "keepBytes": False,
                "version": None,
                "param": []
              }
            },
            "version": None
          }
        }
        response = self.call(request)

t = TestDoiRPCClient()
r = t.test_request()
+120 −0
Original line number Diff line number Diff line
#!/usr/bin/env python
#
# This file is part of vospace-transfer-service
# Copyright (C) 2021 Istituto Nazionale di Astrofisica
# SPDX-License-Identifier: GPL-3.0-or-later
#
#
# This class is responsible to retrieve data from a generic storage point.
#
# The operations performed are the briefly summarized here below:
# * obtain the storage type
# * create a list of files to be retrieved (list of dictionaries)
# * split the list in blocks of a fixed size
# * loop on each block and retrieve data
#   - if the storage type is 'cold' (tape) perform a recall operation
#     before the copy and a migrate operation after the copy
#   - check if data associated to a VOSpace node has been copied
#     every time a block is retrieved
#   - recursively update the 'async_trans' flag
# * cleanup
#
#


import datetime
import json
import os
import logging
import subprocess
import sys

from checksum import Checksum
from config import Config
from db_connector import DbConnector
from mailer import Mailer
from redis_log_handler import RedisLogHandler
from system_utils import SystemUtils
from tape_client import TapeClient
from task_executor import TaskExecutor


class DOIExecutor(TaskExecutor):

    def __init__(self):
        self.type = "doi_executor"
        self.systemUtils = SystemUtils()
        config = Config("/etc/vos_ts/vos_ts.conf")
        params = config.loadSection("transfer_node")
        self.storageRetrievePath = params["retrieve_path"]
        params = config.loadSection("transfer")
        self.maxBlockSize = self.systemUtils.convertSizeToBytes(params["block_size"])
        params = config.loadSection("scheduling")
        self.maxTerminatedJobs = params.getint("max_terminated_jobs")
        params = config.loadSection("mail")
        self.adminEmail = params["admin_email"]
        params = config.loadSection("logging")
        self.logger = logging.getLogger(__name__)
        logLevel = "logging." + params["log_level"]
        logFormat = params["log_format"]
        logFormatter = logging.Formatter(logFormat)
        self.logger.setLevel(eval(logLevel))
        redisLogHandler = RedisLogHandler()
        redisLogHandler.setFormatter(logFormatter)
        self.logger.addHandler(redisLogHandler)
        params = config.loadSection("file_catalog")
        self.dbConn = DbConnector(params["user"],
                                  params["password"],
                                  params["host"],
                                  params.getint("port"),
                                  params["db"],
                                  1,
                                  1,
                                  self.logger)
        params = config.loadSection("spectrum_archive")
        self.tapePool = params["tape_pool"]
        self.tapeClient = TapeClient(params["host"],
                                     params.getint("port"),
                                     params["user"],
                                     params["pkey_file_path"],
                                     self.logger)
        self.storageType = None
        self.jobObj = None
        self.jobId = None
        self.nodeList = []
        self.fileList = []
        self.destPathList = []
        self.numBlocks = 0
        self.procBlocks = 0
        self.totalSize = 0
        super(DOIExecutor, self).__init__()

    def run(self):
        self.logger.info("Starting DOI executor...")
        self.setSourceQueueName("doi_ready")
        self.setDestinationQueueName("doi_terminated")
        while True:
            self.wait()
            try:
                srcQueueLen = self.srcQueue.len()
                destQueueLen = self.destQueue.len()
            except Exception:
                self.logger.exception("Cache error: failed to retrieve queue length.")
            else:
                if srcQueueLen > 0:
                    self.jobObj = self.srcQueue.getJob()
                    self.jobId = self.jobObj.jobId
                    self.nodeList = self.jobObj.nodeList.copy()

                    ### do something here...

                    try:
                        if destQueueLen >= self.maxTerminatedJobs:
                            self.destQueue.extractJob()
                        self.destQueue.insertJob(self.jobObj)
                        self.srcQueue.extractJob()
                    except Exception:
                        self.logger.exception(f"Failed to move job {self.jobObj.jobId} from '{self.srcQueue.name()}' to '{self.destQueue.name()}'")
                    else:
                        self.logger.info(f"Job {self.jobObj.jobId} MOVED from '{self.srcQueue.name()}' to '{self.destQueue.name()}'")
+136 −0
Original line number Diff line number Diff line
#!/usr/bin/env python
#
# This file is part of vospace-transfer-service
# Copyright (C) 2021 Istituto Nazionale di Astrofisica
# SPDX-License-Identifier: GPL-3.0-or-later
#

import json
import logging

from redis_log_handler import RedisLogHandler
from redis_rpc_server import RedisRPCServer
from config import Config
from db_connector import DbConnector
from job import Job
from job_queue import JobQueue


class DOIJobRPCServer(RedisRPCServer):

    def __init__(self, host, port, db, rpcQueue):
        self.type = "doi"
        config = Config("/etc/vos_ts/vos_ts.conf")
        params = config.loadSection("scheduling")
        self.maxPendingJobs = params.getint("max_pending_jobs")
        self.maxTerminatedJobs = params.getint("max_terminated_jobs")
        params = config.loadSection("transfer_node")
        self.storageStorePath = params["store_path"]
        params = config.loadSection("scheduling")
        self.maxPendingJobs = params.getint("max_pending_jobs")
        params = config.loadSection("logging")
        self.logger = logging.getLogger(__name__)
        logLevel = "logging." + params["log_level"]
        logFormat = params["log_format"]
        logFormatter = logging.Formatter(logFormat)
        self.logger.setLevel(eval(logLevel))
        redisLogHandler = RedisLogHandler()
        redisLogHandler.setFormatter(logFormatter)
        self.logger.addHandler(redisLogHandler)
        params = config.loadSection("file_catalog")
        self.dbConn = DbConnector(params["user"],
                                  params["password"],
                                  params["host"],
                                  params.getint("port"),
                                  params["db"],
                                  1,
                                  2,
                                  self.logger)
        self.pendingQueueDOI = JobQueue("doi_pending")
        self.terminatedQueueDOI = JobQueue("doi_terminated")
        super(DOIJobRPCServer, self).__init__(host, port, db, rpcQueue)

    def callback(self, requestBody):
        # debug block...
        out = open("doi_job_rpc_server_log.txt", "a")
        out.write(json.dumps(requestBody))

        job = Job()
        job.setId(requestBody["job"]["jobId"])
        job.setType(requestBody["job"]["jobInfo"]["transfer"]["direction"])
        job.setInfo(requestBody["job"]["jobInfo"])
        job.setOwnerId(requestBody["job"]["ownerId"])

        try:
            pendingQueueLen = self.pendingQueueDOI.len()
            terminatedQueueLen = self.terminatedQueueDOI.len()
        except Exception:
            errorMsg = "Cache error."
            self.logger.exception(errorMsg)
            response = { "responseType": "ERROR",
                         "errorCode": 3,
                         "errorMsg": errorMsg }
            return response
        if pendingQueueLen >= self.maxPendingJobs:
            job.setPhase("ERROR")
            job.setErrorType("transient")
            job.setErrorMessage("Pending queue is full, please, retry later.")
            try:
                self.dbConn.insertJob(job)
            except Exception:
                errorMsg = "Database error."
                self.logger.exception(errorMsg)
                response = { "responseType": "ERROR",
                             "errorCode": 2,
                             "errorMsg": errorMsg }
                return response
            try:
                if terminatedQueueLen >= self.maxTerminatedJobs:
                    self.terminatedQueueDOI.extractJob()
                self.terminatedQueueDOI.insertJob(job)
            except Exception:
                errorMsg = "Cache error."
                self.logger.exception(errorMsg)
                response = { "responseType": "ERROR",
                             "errorCode": 3,
                             "errorMsg": errorMsg }
                return response
        else:
            job.setPhase(requestBody["job"]["phase"])
            try:
                self.dbConn.insertJob(job)
            except Exception:
                errorMsg = "Database error."
                self.logger.exception(errorMsg)
                response = { "responseType": "ERROR",
                             "errorCode": 2,
                             "errorMsg": errorMsg }
                return response
            try:
                self.pendingQueueDOI.insertJob(job)
            except Exception:
                errorMsg = "Cache error."
                self.logger.exception(errorMsg)
                response = { "responseType": "ERROR",
                             "errorCode": 3,
                             "errorMsg": errorMsg }
                return response
        try:
            response = self.dbConn.getJob(job.jobId)
        except Exception:
            errorMsg = "Database error."
            self.logger.exception(errorMsg)
            response = { "responseType": "ERROR",
                         "errorCode": 2,
                         "errorMsg": errorMsg }

        # debug block...
        out.write(f"Db response: {response}")
        out.close()

        return response

    def run(self):
        self.logger.info(f"Starting RPC server of type {self.type}...")
        super(DOIJobRPCServer, self).run()
+120 −0
Original line number Diff line number Diff line
#!/usr/bin/env python
#
# This file is part of vospace-transfer-service
# Copyright (C) 2021 Istituto Nazionale di Astrofisica
# SPDX-License-Identifier: GPL-3.0-or-later
#

import json
import logging
import os

from config import Config
from db_connector import DbConnector
from mailer import Mailer
from redis_log_handler import RedisLogHandler
from task_executor import TaskExecutor


class DOIPreprocessor(TaskExecutor):

    def __init__(self):
        self.type = "retrieve_preprocessor"
        config = Config("/etc/vos_ts/vos_ts.conf")
        params = config.loadSection("mail")
        self.adminEmail = params["admin_email"]
        params = config.loadSection("logging")
        self.logger = logging.getLogger(__name__)
        logLevel = "logging." + params["log_level"]
        logFormat = params["log_format"]
        logFormatter = logging.Formatter(logFormat)
        self.logger.setLevel(eval(logLevel))
        redisLogHandler = RedisLogHandler()
        redisLogHandler.setFormatter(logFormatter)
        self.logger.addHandler(redisLogHandler)
        params = config.loadSection("file_catalog")
        self.dbConn = DbConnector(params["user"],
                                  params["password"],
                                  params["host"],
                                  params.getint("port"),
                                  params["db"],
                                  1,
                                  1,
                                  self.logger)
        self.jobObj = None
        self.nodeList = []
        super(DOIPreprocessor, self).__init__()

    def execute(self):
        self.logger.info("Generating VOSpace node list")
        target = self.jobObj.jobInfo["transfer"]["target"].split("!vospace")[1]
        params = self.jobObj.jobInfo["transfer"]["view"]["param"]
        if not params:
            self.nodeList.append(target)
        else:
            for el in params:
                self.nodeList.append(target + '/' + el["value"])
        self.jobObj.nodeList = self.nodeList.copy()
        self.dbConn.insertJob(self.jobObj)

    def update(self, status):
        m = Mailer(self.logger)
        m.addRecipient(self.adminEmail)
        userEmail = self.dbConn.getUserEmail(self.jobObj.ownerId)
        if userEmail != self.adminEmail:
            m.addRecipient(userEmail)

        msg = f"""
        Dear user,
        your job has been {status}.

        Job ID: {self.jobObj.jobId}
        Job type: {self.jobObj.type}
        Owner ID: {self.jobObj.ownerId}

        """

        # Send e-mail notification
        m.setMessage(f"VOSpace DOI notification: Job {status}", msg)
        m.send()

    def cleanup(self):
        self.nodeList.clear()
        self.setDestinationQueueName("doi_ready")

    def run(self):
        self.logger.info("Starting DOI preprocessor...")
        self.setSourceQueueName("doi_pending")
        self.setDestinationQueueName("doi_ready")
        while True:
            self.wait()
            try:
                srcQueueLen = self.srcQueue.len()
                destQueueLen = self.destQueue.len()
            except Exception:
                self.logger.exception("Cache error: unable to retrieve queue length.")
            else:
                if destQueueLen < self.maxReadyJobs and srcQueueLen > 0:
                    self.jobObj = self.srcQueue.getJob()
                    jobId = self.jobObj.jobId
                    self.execute()
                    try:
                        jobPhase = self.dbConn.getJobPhase(jobId)
                    except Exception:
                        self.logger.exception(f"Database error: unable to retrieve job phase for job {jobId}.")
                    else:
                        if jobPhase == "ABORTED":
                            self.jobObj.setPhase("ABORTED")
                            self.setDestinationQueueName("read_terminated")
                            self.update("ABORTED")
                        else:
                            self.update("QUEUED")
                        try:
                            self.destQueue.insertJob(self.jobObj)
                            self.srcQueue.extractJob()
                        except Exception:
                            self.logger.exception(f"Failed to move job {self.jobObj.jobId} from '{self.srcQueue.name()}' to '{self.destQueue.name()}'")
                        else:
                            self.logger.info(f"Job {self.jobObj.jobId} MOVED from '{self.srcQueue.name()}' to '{self.destQueue.name()}'")
            finally:
                self.cleanup()
+7 −1
Original line number Diff line number Diff line
@@ -8,6 +8,8 @@
import time
import sys

from doi_executor import DOIExecutor
from doi_preprocessor import DOIPreprocessor
from group_rw_executor import GroupRwExecutor
from import_executor import ImportExecutor
from retrieve_preprocessor import RetrievePreprocessor
@@ -23,7 +25,11 @@ class JobScheduler(object):
        self.taskExecutorList = []

    def addTaskExecutor(self, taskExecType):
        if taskExecType == "group_rw_executor":
        if taskExecType == "doi_preprocessor":
            self.taskExecutorList.append(DOIPreprocessor())
        elif taskExecType == "doi_executor":
            self.taskExecutorList.append(DOIExecutor())
        elif taskExecType == "group_rw_executor":
            self.taskExecutorList.append(GroupRwExecutor())
        elif taskExecType == "import_executor":
            self.taskExecutorList.append(ImportExecutor())
Loading