Skip to content
AsyncTransferService.java 2.59 KiB
Newer Older
Sonia Zorba's avatar
Sonia Zorba committed
/*
 * This file is part of vospace-rest
 * Copyright (C) 2021 Istituto Nazionale di Astrofisica
 * SPDX-License-Identifier: GPL-3.0-or-later
 */
package it.inaf.oats.vospace;

import com.fasterxml.jackson.databind.ObjectMapper;
Sonia Zorba's avatar
Sonia Zorba committed
import it.inaf.oats.vospace.exception.InternalFaultException;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import net.ivoa.xml.uws.v1.JobSummary;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
Sonia Zorba's avatar
Sonia Zorba committed
public class AsyncTransferService {
Sonia Zorba's avatar
Sonia Zorba committed
    private static final Logger LOG = LoggerFactory.getLogger(AsyncTransferService.class);

    private static final ObjectMapper MAPPER = new ObjectMapper();

    private final JedisPool jedisPool;

    public AsyncTransferService(@Value("${spring.redis.host}") String redisHost, @Value("${spring.redis.port}") int redisPort) {

        // To avoid receiving a JedisConnectionException when redis server is
        // restarted a JedisPool with proper configuration is used.
        JedisPoolConfig poolConfig = new JedisPoolConfig();

        poolConfig.setTestOnBorrow(true); // sends a ping request when asking for the resource
        poolConfig.setTestWhileIdle(true); // sends periodic pings to idle resources in the pool

        jedisPool = new JedisPool(poolConfig, redisHost, redisPort);
    public JobSummary startJob(JobSummary job) {
        try (Jedis jedis = jedisPool.getResource()) {

            String requestId = job.getJobId();

            Map<String, Object> data = new HashMap<>();
Sonia Zorba's avatar
Sonia Zorba committed
            data.put("req_id", requestId);
            data.put("job", job);

            String message = MAPPER.writeValueAsString(data);

            jedis.lpush("start_job_queue", message);

            List<String> popData = jedis.brpop(30, requestId);

            String result = null;
            for (String value : popData) {
                // first result is requestId, second is JSON payload
                if (!requestId.equals(value)) {
                    result = value;
                }
            }
Sonia Zorba's avatar
Sonia Zorba committed
            if (result == null) {
                throw new IllegalStateException("Job data not found in redis");
Sonia Zorba's avatar
Sonia Zorba committed
            }

            LOG.trace("Transfer service answered:\n{}", result);

            return MAPPER.readValue(result, JobSummary.class);
        } catch (IOException ex) {
Sonia Zorba's avatar
Sonia Zorba committed
            throw new InternalFaultException(ex);