Newer
Older
/*
* This file is part of vospace-rest
* Copyright (C) 2021 Istituto Nazionale di Astrofisica
* SPDX-License-Identifier: GPL-3.0-or-later
*/
package it.inaf.oats.vospace;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
Sonia Zorba
committed
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import net.ivoa.xml.uws.v1.JobSummary;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Sonia Zorba
committed
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
Sonia Zorba
committed
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
private static final Logger LOG = LoggerFactory.getLogger(AsyncTransferService.class);
private static final ObjectMapper MAPPER = new ObjectMapper();
private final JedisPool jedisPool;
Sonia Zorba
committed
public AsyncTransferService(@Value("${spring.redis.host}") String redisHost, @Value("${spring.redis.port}") int redisPort) {
// To avoid receiving a JedisConnectionException when redis server is
// restarted a JedisPool with proper configuration is used.
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setTestOnBorrow(true); // sends a ping request when asking for the resource
poolConfig.setTestWhileIdle(true); // sends periodic pings to idle resources in the pool
jedisPool = new JedisPool(poolConfig, redisHost, redisPort);
Sonia Zorba
committed
}
public JobSummary startJob(JobSummary job) {
try (Jedis jedis = jedisPool.getResource()) {
Sonia Zorba
committed
String requestId = job.getJobId();
Map<String, Object> data = new HashMap<>();
Sonia Zorba
committed
data.put("job", job);
String message = MAPPER.writeValueAsString(data);
jedis.lpush("start_job_queue", message);
List<String> popData = jedis.brpop(30, requestId);
String result = null;
for (String value : popData) {
// first result is requestId, second is JSON payload
if (!requestId.equals(value)) {
result = value;
}
}
Sonia Zorba
committed
throw new IllegalStateException("Job data not found in redis");
Sonia Zorba
committed
LOG.trace("Transfer service answered:\n{}", result);
return MAPPER.readValue(result, JobSummary.class);
} catch (IOException ex) {