Commit 3514a72b authored by Sonia Zorba's avatar Sonia Zorba
Browse files

Replaced RabbitMQ with Redis RPC for communicating with async transfer service

parent 1ad272c3
Loading
Loading
Loading
Loading
+8 −3
Original line number Diff line number Diff line
@@ -5,7 +5,7 @@
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.6.RELEASE</version>
        <version>2.4.5</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>it.inaf.oats</groupId>
@@ -85,8 +85,13 @@
        </dependency>
                
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
        </dependency>
        
        <dependency>
            <groupId>org.mockito</groupId>
            <artifactId>mockito-inline</artifactId>
        </dependency>
        
        <!-- Embedded PostgreSQL: -->
+35 −10
Original line number Diff line number Diff line
@@ -8,33 +8,58 @@ package it.inaf.oats.vospace;
import com.fasterxml.jackson.databind.ObjectMapper;
import it.inaf.oats.vospace.exception.InternalFaultException;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import net.ivoa.xml.uws.v1.JobSummary;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import redis.clients.jedis.Jedis;

@Service
public class AsyncTransferService {

    private static final Logger LOG = LoggerFactory.getLogger(AsyncTransferService.class);

    @Autowired
    private RabbitTemplate template;

    private static final ObjectMapper MAPPER = new ObjectMapper();

    public JobSummary startJob(JobSummary job) {
    // WARNING: this instance is not thread safe
    private final Jedis jedis;

    public AsyncTransferService(@Value("${spring.redis.host}") String redisHost, @Value("${spring.redis.port}") int redisPort) {
        jedis = new Jedis(redisHost, redisPort);
    }

    public synchronized JobSummary startJob(JobSummary job) {
        try {
            byte[] message = MAPPER.writeValueAsBytes(job);
            byte[] result = (byte[]) template.convertSendAndReceive("start_job_queue", message);

            String requestId = job.getJobId();

            Map<String, Object> data = new HashMap<>();
            data.put("id", requestId);
            data.put("job", job);

            String message = MAPPER.writeValueAsString(data);

            jedis.lpush("start_job_queue", message);

            List<String> popData = jedis.brpop(30, requestId);

            String result = null;
            for (String value : popData) {
                // first result is requestId, second is JSON payload
                if (!requestId.equals(value)) {
                    result = value;
                }
            }

            if (result == null) {
                throw new InternalFaultException("Transfer service returned an empty response");
                throw new IllegalStateException("Job data not found in redis");
            }

            LOG.trace("Tape transfer service answered:\n{}", new String(result));
            LOG.trace("Transfer service answered:\n{}", result);

            return MAPPER.readValue(result, JobSummary.class);
        } catch (IOException ex) {
+5 −22
Original line number Diff line number Diff line
# To change this license header, choose License Headers in Project Properties.
# To change this template file, choose Tools | Templates
# and open the template in the editor.

# show sql statements issued by JPA
#spring.jpa.show-sql=true

# enable debug logging for spring boot and hibernate classes
# this is equivalent to passing '--debug' as command line argument
#logging.level.org.springframework.boot=DEBUG
#logging.level.org.hibernate.SQL=DEBUG

# log to file (absolute/relative path of log file)
#logging.file=path/to/log/file.log

server.port=8083
server.servlet.context-path=/vospace
#spring.profiles.active=@spring.profiles.active@

# For development only:
spring.profiles.active=dev

@@ -23,15 +8,13 @@ spring.datasource.url=jdbc:postgresql://127.0.0.1:5432/vospace_testdb
spring.datasource.username=postgres
spring.datasource.password=postgres

# enable debug logging 
# this is equivalent to passing '--debug' as command line argument
spring.redis.host=127.0.0.1
spring.redis.port=6379

logging.level.it.inaf=TRACE
logging.level.org.springframework.security=DEBUG
logging.level.org.springframework.jdbc=TRACE
logging.level.org.springframework.web=TRACE
#logging.level.org.springframework.boot=DEBUG
# log to file (absolute/relative path of log file)
#logging.file=path/to/log/file.log

vospace-authority=example.com!vospace

+78 −0
Original line number Diff line number Diff line
/*
 * This file is part of vospace-rest
 * Copyright (C) 2021 Istituto Nazionale di Astrofisica
 * SPDX-License-Identifier: GPL-3.0-or-later
 */
package it.inaf.oats.vospace;

import java.util.ArrayList;
import java.util.List;
import net.ivoa.xml.uws.v1.JobSummary;
import net.ivoa.xml.vospace.v2.Protocol;
import net.ivoa.xml.vospace.v2.Transfer;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import org.mockito.MockedConstruction;
import org.mockito.Mockito;
import static org.mockito.Mockito.doAnswer;
import org.mockito.junit.jupiter.MockitoExtension;
import redis.clients.jedis.Jedis;

@ExtendWith(MockitoExtension.class)
public class AsyncTransferServiceTest {

    private static final String JSON_JOB = "{\"jobId\":\"job_id\",\"runId\":null,\"ownerId\":null,\"phase\":null,\"quote\":null,\"creationTime\":null,\"startTime\":null,\"endTime\":null,\"executionDuration\":0,\"destruction\":null,\"parameters\":null,\"results\":[],\"errorSummary\":null,\"jobInfo\":{\"transfer\":{\"target\":\"vos://example.com!vospace/my-node\",\"direction\":\"pullToVoSpace\",\"view\":null,\"protocols\":[{\"endpoint\":null,\"param\":[],\"uri\":\"ia2:async-recall\"}],\"keepBytes\":null,\"version\":null,\"param\":[]}},\"version\":null}\n";

    @Test
    public void testRedisRpc() {
        try (MockedConstruction<Jedis> staticMock = Mockito.mockConstruction(Jedis.class,
                (mockedJedis, context) -> {
                    doAnswer(invocation -> {
                        String requestId = invocation.getArgument(1);
                        List<String> result = new ArrayList<>();
                        result.add(requestId);
                        result.add(JSON_JOB);
                        return result;
                    }).when(mockedJedis).brpop(anyInt(), anyString());
                })) {
            AsyncTransferService asyncTransferService = new AsyncTransferService("localhost", 6379);
            JobSummary result = asyncTransferService.startJob(getFakeJob());
            assertEquals("job_id", result.getJobId());
        }
    }

    @Test
    public void testRedisError() {
        try (MockedConstruction<Jedis> staticMock = Mockito.mockConstruction(Jedis.class)) {
            AsyncTransferService asyncTransferService = new AsyncTransferService("localhost", 6379);
            try {
                asyncTransferService.startJob(getFakeJob());
                fail();
            } catch(IllegalStateException ex) {
            }
        }
    }
    
    private JobSummary getFakeJob() {
        Transfer transfer = new Transfer();
        transfer.setDirection("pullToVoSpace");
        Protocol protocol = new Protocol();
        protocol.setUri("ia2:async-recall");
        transfer.getProtocols().add(protocol);
        transfer.setTarget("vos://example.com!vospace/my-node");

        JobSummary job = new JobSummary();
        job.setJobId("job_id");

        JobSummary.JobInfo jobInfo = new JobSummary.JobInfo();
        jobInfo.getAny().add(transfer);

        job.setJobInfo(jobInfo);

        return job;
    }
}