Skip to content
JobDAO.java 4.68 KiB
Newer Older
Sonia Zorba's avatar
Sonia Zorba committed
package it.inaf.oats.vospace.persistence;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
import java.util.List;
import java.util.Optional;
import javax.sql.DataSource;
import net.ivoa.xml.uws.v1.ExecutionPhase;
import net.ivoa.xml.uws.v1.JobSummary;
import net.ivoa.xml.uws.v1.ResultReference;
import net.ivoa.xml.vospace.v2.Transfer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;

@Repository
public class JobDAO {

    private static final ObjectMapper MAPPER = new ObjectMapper();

    private final JdbcTemplate jdbcTemplate;

    @Autowired
    public JobDAO(DataSource dataSource) {
        jdbcTemplate = new JdbcTemplate(dataSource);
    }

    public void createJob(JobSummary jobSummary) {

        Object jobPayload = getJobPayload(jobSummary);

        String sql = "INSERT INTO job(job_id, owner_id, job_type, phase, job_info) VALUES (?, ?, ?, ?, ?)";

        jdbcTemplate.update(sql, ps -> {
            int i = 0;
            ps.setString(++i, jobSummary.getJobId());
            ps.setString(++i, jobSummary.getOwnerId());
            ps.setObject(++i, getJobType(jobPayload), Types.OTHER);
            ps.setObject(++i, jobSummary.getPhase().value(), Types.OTHER);
            ps.setObject(++i, toJson(jobPayload), Types.OTHER);
        });
    }

    private Object getJobPayload(JobSummary jobSummary) {

        List<Object> payload = jobSummary.getJobInfo().getAny();
        if (payload.isEmpty()) {
            throw new IllegalArgumentException("Empty JobInfo");
        }
        if (payload.size() > 1) {
            throw new UnsupportedOperationException("JobInfo as list not supported");
        }

        return payload.get(0);
    }

    private String getJobType(Object jobPayload) {

        if (jobPayload instanceof Transfer) {
            Transfer transfer = (Transfer) jobPayload;
            return transfer.getDirection();
        }

        throw new UnsupportedOperationException("JobInfo of type " + jobPayload + " not supported");
    }

    public Optional<JobSummary> getJob(String jobId) {

        String sql = "SELECT * FROM job WHERE job_id = ?";

        JobSummary jobSummary = jdbcTemplate.query(sql,
                ps -> {
                    ps.setString(1, jobId);
                }, rs -> {
                    if (rs.next()) {
                        return getJobSummaryFromResultSet(rs);
                    }
                    return null;
                });

        return Optional.ofNullable(jobSummary);
    }

    private JobSummary getJobSummaryFromResultSet(ResultSet rs) throws SQLException {

        JobSummary jobSummary = new JobSummary();

        jobSummary.setJobId(rs.getString("job_id"));
        jobSummary.setOwnerId(rs.getString("owner_id"));
        jobSummary.setPhase(ExecutionPhase.fromValue(rs.getString("phase")));

        String jobType = rs.getString("job_type");

        Object jobPayload = getJobPayload(jobType, rs.getString("job_info"));
        JobSummary.JobInfo jobInfo = new JobSummary.JobInfo();
        jobInfo.getAny().add(jobPayload);
        jobSummary.setJobInfo(jobInfo);

        jobSummary.setResults(getResults(rs.getString("results")));

        return jobSummary;
    }

    private Object getJobPayload(String jobType, String json) {
        try {
            // TODO: switch on jobType
            return MAPPER.readValue(json, Transfer.class);
        } catch (JsonProcessingException ex) {
            throw new RuntimeException(ex);
        }
    }

    private List<ResultReference> getResults(String json) {
        if (json == null) {
            return null;
        }
        try {
            JavaType type = MAPPER.getTypeFactory().constructCollectionType(List.class, ResultReference.class);
            return MAPPER.readValue(json, type);
        } catch (JsonProcessingException ex) {
            throw new RuntimeException(ex);
        }
    }

    public void updateJob(JobSummary job) {

        String sql = "UPDATE job SET phase = ?, results = ? WHERE job_id = ?";

        jdbcTemplate.update(sql, ps -> {
            int i = 0;
            ps.setObject(++i, job.getPhase().name(), Types.OTHER);
            ps.setObject(++i, toJson(job.getResults()), Types.OTHER);
            ps.setString(++i, job.getJobId());
        });
    }

    private String toJson(Object data) {
        try {
            return MAPPER.writeValueAsString(data);
        } catch (JsonProcessingException ex) {
            throw new RuntimeException(ex);
        }
    }
}