/* * This file is part of vospace-rest * Copyright (C) 2021 Istituto Nazionale di Astrofisica * SPDX-License-Identifier: GPL-3.0-or-later */ package it.inaf.oats.vospace; import com.fasterxml.jackson.databind.ObjectMapper; import it.inaf.oats.vospace.exception.InternalFaultException; import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; import net.ivoa.xml.uws.v1.JobSummary; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; @Service public class AsyncTransferService { private static final Logger LOG = LoggerFactory.getLogger(AsyncTransferService.class); private static final ObjectMapper MAPPER = new ObjectMapper(); private final JedisPool jedisPool; public AsyncTransferService(@Value("${spring.redis.host}") String redisHost, @Value("${spring.redis.port}") int redisPort) { // To avoid receiving a JedisConnectionException when redis server is // restarted a JedisPool with proper configuration is used. JedisPoolConfig poolConfig = new JedisPoolConfig(); poolConfig.setTestOnBorrow(true); // sends a ping request when asking for the resource poolConfig.setTestWhileIdle(true); // sends periodic pings to idle resources in the pool jedisPool = new JedisPool(poolConfig, redisHost, redisPort); } public JobSummary startJob(JobSummary job) { try (Jedis jedis = jedisPool.getResource()) { String requestId = job.getJobId(); Map data = new HashMap<>(); data.put("req_id", requestId); data.put("job", job); String message = MAPPER.writeValueAsString(data); jedis.lpush("start_job_queue", message); List popData = jedis.brpop(30, requestId); String result = null; for (String value : popData) { // first result is requestId, second is JSON payload if (!requestId.equals(value)) { result = value; } } if (result == null) { throw new IllegalStateException("Job data not found in redis"); } LOG.trace("Transfer service answered:\n{}", result); return MAPPER.readValue(result, JobSummary.class); } catch (IOException ex) { throw new InternalFaultException(ex); } } }