Commit 2e1759d0 authored by Cristiano Urban's avatar Cristiano Urban
Browse files

Removed RabbitMQ + started Redis integration.

parent 3191c580
Loading
Loading
Loading
Loading
+8 −24
Original line number Original line Diff line number Diff line
@@ -3,39 +3,23 @@ services:
  base:
  base:
    image: git.ia2.inaf.it:5050/vospace/vospace-transfer-service/base
    image: git.ia2.inaf.it:5050/vospace/vospace-transfer-service/base
    container_name: base
    container_name: base
  postgres:
    image: git.ia2.inaf.it:5050/vospace/vospace-file-catalog
    build: ../vospace-file-catalog
    container_name: file_catalog
    networks:
    - backend_net
    ports:
    - "5432:5432"
  redis:
  redis:
    image: git.ia2.inaf.it:5050/vospace/vospace-transfer-service/job_cache
    image: git.ia2.inaf.it:5050/vospace/vospace-transfer-service/job_cache
    container_name: job_cache
    container_name: job_cache
    depends_on:
    - postgres
    networks:
    networks:
    - backend_net
    - backend_net
    ports:
    ports:
    - "6379:6379"
    - "6379:6379"
  rabbitmq:
  postgres:
    image: git.ia2.inaf.it:5050/vospace/vospace-transfer-service/rabbitmq
    image: git.ia2.inaf.it:5050/vospace/vospace-file-catalog
    volumes:
    build: ../vospace-file-catalog
      - ./conf/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
    container_name: file_catalog
    container_name: rabbitmq
    environment:
    - RABBITMQ_LOGS=/var/log/rabbitmq/rabbit.log
    depends_on: 
    depends_on: 
    - redis
    - redis
    networks:
    networks:
    - backend_net
    - backend_net
    ports:
    ports:
    - "5672:5672"
    - "5432:5432"
    - "15672:15672"
    stdin_open: true
    tty: true
  transfer_service:
  transfer_service:
    image: git.ia2.inaf.it:5050/vospace/vospace-transfer-service/transfer_service
    image: git.ia2.inaf.it:5050/vospace/vospace-transfer-service/transfer_service
    volumes:
    volumes:
@@ -45,12 +29,12 @@ services:
    build: ./transfer_service
    build: ./transfer_service
    container_name: transfer_service
    container_name: transfer_service
    depends_on:
    depends_on:
    - rabbitmq
    - postgres
    networks:
    networks:
    - backend_net
    - backend_net
    stdin_open: true
    stdin_open: true
    tty: true
    tty: true
    command: ["./wait-for-it.sh", "rabbitmq:5672", "--timeout=30", "--", "bash", "start.sh"]
    command: ["./wait-for-it.sh", "postgres:5432", "--timeout=30", "--", "bash", "start.sh"]
  client:
  client:
    image: git.ia2.inaf.it:5050/vospace/vospace-transfer-service/client
    image: git.ia2.inaf.it:5050/vospace/vospace-transfer-service/client
    build: ./client
    build: ./client
+5 −5
Original line number Original line Diff line number Diff line
#!/usr/bin/env python
#!/usr/bin/env python


from amqp_server import AMQPServer
from redis_rpc_server import RedisRpcServer
from db_connector import DbConnector
from db_connector import DbConnector
from config import Config
from config import Config




class AbortJobAMQPServer(AMQPServer):
class AbortJobAMQPServer(RedisRpcServer):
  
  
    def __init__(self, host, port, queue):
    def __init__(self, host, port, db, rpcQueue):
        self.type = "abort"
        self.type = "abort"
        config = Config("/etc/vos_ts/vos_ts.conf")        
        config = Config("/etc/vos_ts/vos_ts.conf")        
        self.params = config.loadSection("file_catalog")
        self.params = config.loadSection("file_catalog")
@@ -18,9 +18,9 @@ class AbortJobAMQPServer(AMQPServer):
                                  self.params["db"],
                                  self.params["db"],
                                  8,
                                  8,
                                  16)
                                  16)
        super(AbortJobAMQPServer, self).__init__(host, port, queue)      
        super(AbortJobAMQPServer, self).__init__(host, port, db, rpcQueue)      


    def execute_callback(self, requestBody):
    def callback(self, requestBody):
        #TODO
        #TODO
        # do something...
        # do something...
        return 42
        return 42
+6 −5
Original line number Original line Diff line number Diff line
@@ -10,20 +10,21 @@ from storage_amqp_server import StorageAMQPServer


class CliHandler(object):
class CliHandler(object):


    def __init__(self, host, port):
    def __init__(self, host, port, db):
        self.host = host
        self.host = host
        self.port = port
        self.port = port
        self.db = db
        self.amqpServerList = []
        self.amqpServerList = []


    def addAMQPServer(self, srvType, rpcQueue):
    def addAMQPServer(self, srvType, rpcQueue):
        if srvType == 'data':
        if srvType == 'data':
            self.amqpServerList.append(DataAMQPServer(self.host, self.port, rpcQueue))
            self.amqpServerList.append(DataAMQPServer(self.host, self.port, self.db, rpcQueue))
        elif srvType == 'import':
        elif srvType == 'import':
            self.amqpServerList.append(ImportAMQPServer(self.host, self.port, rpcQueue))
            self.amqpServerList.append(ImportAMQPServer(self.host, self.port, self.db, rpcQueue))
        elif srvType == 'storage':
        elif srvType == 'storage':
            self.amqpServerList.append(StorageAMQPServer(self.host, self.port, rpcQueue))
            self.amqpServerList.append(StorageAMQPServer(self.host, self.port, self.db, rpcQueue))
        elif srvType == 'job':
        elif srvType == 'job':
            self.amqpServerList.append(JobAMQPServer(self.host, self.port, rpcQueue))
            self.amqpServerList.append(JobAMQPServer(self.host, self.port, self.db, rpcQueue))
        else:
        else:
            sys.exit(f"FATAL: unknown server type {srvType}.")
            sys.exit(f"FATAL: unknown server type {srvType}.")
            
            
+1 −1
Original line number Original line Diff line number Diff line
@@ -18,7 +18,7 @@ password = postgres
# Redis
# Redis
[job_cache]
[job_cache]
; hostname or IP address of the machine that hosts the Redis cache system
; hostname or IP address of the machine that hosts the Redis cache system
host = redis
host = job_cache
; port at which the cache service is available, default is 6379 TCP
; port at which the cache service is available, default is 6379 TCP
port = 6379
port = 6379
; db index representing the db that stores the scheduling queues, default is 0
; db index representing the db that stores the scheduling queues, default is 0
+6 −5
Original line number Original line Diff line number Diff line
@@ -10,7 +10,7 @@ import json
import os
import os
import sys
import sys


from amqp_server import AMQPServer
from redis_rpc_server import RedisRpcServer
from config import Config
from config import Config
from db_connector import DbConnector
from db_connector import DbConnector
from job import Job
from job import Job
@@ -18,9 +18,9 @@ from job_queue import JobQueue
from system_utils import SystemUtils
from system_utils import SystemUtils




class DataAMQPServer(AMQPServer):
class DataAMQPServer(RedisRpcServer):


    def __init__(self, host, port, queue):
    def __init__(self, host, port, db, rpcQueue):
        self.type = "data"
        self.type = "data"
        self.storeAck = False
        self.storeAck = False
        config = Config("/etc/vos_ts/vos_ts.conf")       
        config = Config("/etc/vos_ts/vos_ts.conf")       
@@ -38,9 +38,9 @@ class DataAMQPServer(AMQPServer):
        self.maxPendingJobs = self.params.getint("max_pending_jobs")
        self.maxPendingJobs = self.params.getint("max_pending_jobs")
        self.pendingQueueWrite = JobQueue("write_pending")
        self.pendingQueueWrite = JobQueue("write_pending")
        self.systemUtils = SystemUtils()
        self.systemUtils = SystemUtils()
        super(DataAMQPServer, self).__init__(host, port, queue)
        super(DataAMQPServer, self).__init__(host, port, db, rpcQueue)


    def execute_callback(self, requestBody):
    def callback(self, requestBody):
        # 'requestType' and 'userName' attributes are mandatory
        # 'requestType' and 'userName' attributes are mandatory
        if "requestType" not in requestBody or "userName" not in requestBody:
        if "requestType" not in requestBody or "userName" not in requestBody:
            response = { "responseType":"ERROR", 
            response = { "responseType":"ERROR", 
@@ -134,3 +134,4 @@ class DataAMQPServer(AMQPServer):
    def run(self):
    def run(self):
        print(f"Starting AMQP server of type {self.type}...")
        print(f"Starting AMQP server of type {self.type}...")
        super(DataAMQPServer, self).run()
        super(DataAMQPServer, self).run()
 
Loading