Skip to content
JobDAO.java 13.1 KiB
Newer Older
Sonia Zorba's avatar
Sonia Zorba committed
/*
 * This file is part of vospace-rest
 * Copyright (C) 2021 Istituto Nazionale di Astrofisica
 * SPDX-License-Identifier: GPL-3.0-or-later
 */
Sonia Zorba's avatar
Sonia Zorba committed
package it.inaf.oats.vospace.persistence;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.sql.Timestamp;
Sonia Zorba's avatar
Sonia Zorba committed
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
import java.util.List;
import java.util.Optional;
import javax.sql.DataSource;
import javax.xml.datatype.DatatypeFactory;
import javax.xml.datatype.XMLGregorianCalendar;
Sonia Zorba's avatar
Sonia Zorba committed
import net.ivoa.xml.uws.v1.ExecutionPhase;
import net.ivoa.xml.uws.v1.JobSummary;
import net.ivoa.xml.uws.v1.ErrorSummary;
import net.ivoa.xml.uws.v1.ShortJobDescription;
Sonia Zorba's avatar
Sonia Zorba committed
import net.ivoa.xml.uws.v1.ResultReference;
import net.ivoa.xml.uws.v1.Jobs;
import it.inaf.oats.vospace.JobService;
Sonia Zorba's avatar
Sonia Zorba committed
import net.ivoa.xml.vospace.v2.Transfer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
import java.util.ArrayList;
import java.time.LocalDateTime;
import java.math.BigDecimal;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Sonia Zorba's avatar
Sonia Zorba committed

@Repository
public class JobDAO {

    private static final Logger LOG = LoggerFactory.getLogger(JobDAO.class);
Sonia Zorba's avatar
Sonia Zorba committed
    private static final ObjectMapper MAPPER = new ObjectMapper();

    private final JdbcTemplate jdbcTemplate;

    @Autowired
    public JobDAO(DataSource dataSource) {
        jdbcTemplate = new JdbcTemplate(dataSource);
    }

    public void createJob(JobSummary jobSummary, Transfer transferDetails) {
                = "INSERT INTO job(job_id, owner_id, job_type, phase, job_info, transfer_details, "
                + " results, error_message, error_type, error_has_detail, error_detail, start_time, end_time) "
                + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ";

        switch (jobSummary.getPhase()) {
            case EXECUTING:
                sql += "NOW(), NULL)";
                break;
            case ERROR:
            case COMPLETED:
            case ABORTED:
                sql += "NOW(), NOW())";
                break;
            default:
                sql += "NULL, NULL)";
                break;
        }
Sonia Zorba's avatar
Sonia Zorba committed

        jdbcTemplate.update(sql, ps -> {
            int i = 0;
            ps.setString(++i, jobSummary.getJobId());
            ps.setString(++i, jobSummary.getOwnerId());
            ps.setObject(++i, getJobDirection(jobSummary), Types.VARCHAR);
Sonia Zorba's avatar
Sonia Zorba committed
            ps.setObject(++i, jobSummary.getPhase().value(), Types.OTHER);
            ps.setObject(++i, toJson(jobSummary.getJobInfo()), Types.OTHER);
            ps.setObject(++i, toJson(transferDetails), Types.OTHER);
            ps.setObject(++i, toJson(jobSummary.getResults()), Types.OTHER);
            ErrorSummary errorSummary = jobSummary.getErrorSummary();
            if (errorSummary != null) {
                ps.setString(++i, errorSummary.getMessage());
                ps.setObject(++i, errorSummary.getType().value(), Types.OTHER);
                ps.setBoolean(++i, errorSummary.isHasDetail());
                ps.setString(++i, errorSummary.getDetailMessage());
            } else {
                ps.setNull(++i, Types.VARCHAR);
                ps.setNull(++i, Types.OTHER);
                ps.setNull(++i, Types.BOOLEAN);
                ps.setNull(++i, Types.VARCHAR);
    private String getJobDirection(JobSummary jobSummary) {
Sonia Zorba's avatar
Sonia Zorba committed

        List<Object> payload = jobSummary.getJobInfo().getAny();
        if (payload.isEmpty()) {
            throw new IllegalArgumentException("Empty JobInfo");
        }
        if (payload.size() > 1) {
            throw new UnsupportedOperationException("JobInfo as list not supported");
        }

        Object jobPayload = payload.get(0);
Sonia Zorba's avatar
Sonia Zorba committed

        if (jobPayload instanceof Transfer) {
            Transfer transfer = (Transfer) jobPayload;
            return transfer.getDirection();
        }

        throw new UnsupportedOperationException("JobInfo of type " + jobPayload + " not supported");
    }

    public Optional<JobSummary> getJob(String jobId) {

        String sql = "SELECT * FROM job WHERE job_id = ?";

        JobSummary jobSummary = jdbcTemplate.query(sql,
                ps -> {
                    ps.setString(1, jobId);
                }, rs -> {
                    if (rs.next()) {
                        return getJobSummaryFromResultSet(rs);
                    }
                    return null;
                });

        return Optional.ofNullable(jobSummary);
    }

    private JobSummary getJobSummaryFromResultSet(ResultSet rs) throws SQLException {

        JobSummary jobSummary = new JobSummary();

        jobSummary.setJobId(rs.getString("job_id"));
        jobSummary.setOwnerId(rs.getString("owner_id"));
        jobSummary.setPhase(ExecutionPhase.fromValue(rs.getString("phase")));
        jobSummary.setJobInfo(getJobPayload(rs.getString("job_info")));
Sonia Zorba's avatar
Sonia Zorba committed
        jobSummary.setResults(getResults(rs.getString("results")));
        jobSummary.setCreationTime(toXMLGregorianCalendar(rs.getTimestamp("creation_time")));
        jobSummary.setStartTime(toXMLGregorianCalendar(rs.getTimestamp("start_time")));
        jobSummary.setEndTime(toXMLGregorianCalendar(rs.getTimestamp("end_time")));

        // Retrieve error information if any
        String errorType = rs.getString("error_type");
        if (errorType != null) {
Sonia Zorba's avatar
Sonia Zorba committed
            ErrorSummary errorSummary = new ErrorSummary();
            errorSummary.setMessage(rs.getString("error_message"));
            errorSummary.setType(ErrorType.fromValue(rs.getString("error_type")));
Sonia Zorba's avatar
Sonia Zorba committed
            errorSummary.setHasDetail(rs.getBoolean("error_has_detail"));
            errorSummary.setDetailMessage(rs.getString("error_detail"));

            jobSummary.setErrorSummary(errorSummary);
Sonia Zorba's avatar
Sonia Zorba committed
        return jobSummary;
    }

    public Jobs getJobs(String userId,
            List<ExecutionPhase> phaseList,
            List<JobService.JobDirection> directionList,
            Optional<LocalDateTime> after,
            Optional<Integer> last
    ) {
        Jobs jobs = new Jobs();
        jobs.setVersion("1.1");

        List<ShortJobDescription> sjdList = jobs.getJobref();
        // Query db to fill ShortJobDescription list
        ArrayList<Object> queryParams = new ArrayList<>();
        ArrayList<Integer> queryParamTypes = new ArrayList<>();

        StringBuilder sb = new StringBuilder();
        sb.append("SELECT * FROM job");

        sb.append(" WHERE owner_id = ?");
        queryParams.add(userId);
        queryParamTypes.add(Types.VARCHAR);

        // Fill conditions on execution phase
        if (phaseList.isEmpty()) {
            sb.append(" AND phase NOT IN (?)");
            queryParams.add(ExecutionPhase.ARCHIVED);
            queryParamTypes.add(Types.OTHER);
        } else {
            for (int i = 0; i < phaseList.size(); i++) {
                queryParams.add(phaseList.get(i));
                queryParamTypes.add(Types.OTHER);
                if (i < phaseList.size() - 1) {
                    sb.append(",");
                }
            }
            sb.append(")");
        }

        // Fill conditions on type list
        if (!directionList.isEmpty()) {
            sb.append(" AND job_type IN (");
            for (int i = 0; i < directionList.size(); i++) {
                sb.append("?");
                queryParams.add(directionList.get(i));
                queryParamTypes.add(Types.OTHER);
                if (i < directionList.size() - 1) {
                    sb.append(",");
                }
            }
            sb.append(")");
        }
        // Fill conditions on views list
        if (!viewList.isEmpty()) {
            sb.append(" AND (")
                    .append(String.join(" OR ",
                            Collections.nCopies(viewList.size(), "job_info->'transfer'->'view'->>'uri' = ?")))
                    .append(")");
            for (String view : viewList) {
                queryParams.add(view);
                queryParamTypes.add(Types.VARCHAR);
            }
        }

        // Fill conditions on creation date
        if (after.isPresent()) {
            sb.append(" AND creation_time > ?");
            queryParams.add(after.get());
            queryParamTypes.add(Types.TIMESTAMP);
        }

        sb.append(" ORDER BY creation_time DESC");

        if (last.isPresent()) {
            sb.append(" LIMIT ?");
            queryParams.add(last.get());
            queryParamTypes.add(Types.INTEGER);
        }

        String sql = sb.toString();

        // Perform query
        jdbcTemplate.query(sql,
                    for (int i = 0; i < queryParams.size(); i++) {
                        ps.setObject(i + 1, queryParams.get(i),
                                queryParamTypes.get(i));
                    }
                },
                (rs) -> {
                    sjdList.add(getShortJobDescriptionFromCurrentRow(rs));
                }
        );

        return jobs;
    }

    private ShortJobDescription getShortJobDescriptionFromCurrentRow(ResultSet rs)
            throws SQLException {
        ShortJobDescription sjd = new ShortJobDescription();
        sjd.setId(rs.getString("job_id"));
        sjd.setOwnerId(rs.getString("owner_id"));
        sjd.setType(rs.getString("job_type"));
        sjd.setPhase(ExecutionPhase.fromValue(rs.getString("phase")));
        sjd.setCreationTime(
                toXMLGregorianCalendar(rs.getTimestamp("creation_time")));

    private JobSummary.JobInfo getJobPayload(String json) {
Sonia Zorba's avatar
Sonia Zorba committed
        try {
            return MAPPER.readValue(json, JobSummary.JobInfo.class);
Sonia Zorba's avatar
Sonia Zorba committed
        } catch (JsonProcessingException ex) {
            throw new RuntimeException(ex);
        }
    }

    private List<ResultReference> getResults(String json) {
        if (json == null) {
            return null;
        }
        try {
            JavaType type = MAPPER.getTypeFactory().constructCollectionType(List.class, ResultReference.class);
            return MAPPER.readValue(json, type);
        } catch (JsonProcessingException ex) {
            throw new RuntimeException(ex);
        }
    }

    public void updateJob(JobSummary job, Transfer transferDetails) {
        String sql = "UPDATE job SET (phase, results, transfer_details ";
        ErrorSummary errorSummary = job.getErrorSummary();
        if (errorSummary != null) {
            sql += ", error_message, error_type, error_has_detail, error_detail";

        if (errorSummary != null) {
Sonia Zorba's avatar
Sonia Zorba committed

        jdbcTemplate.update(sql, ps -> {
            int i = 0;
            ps.setObject(++i, job.getPhase().name(), Types.OTHER);
            ps.setObject(++i, toJson(job.getResults()), Types.OTHER);
            ps.setObject(++i, toJson(transferDetails), Types.OTHER);
            if (errorSummary != null) {
                ps.setString(++i, errorSummary.getMessage());
                ps.setObject(++i, errorSummary.getType().value(), Types.OTHER);
                ps.setBoolean(++i, errorSummary.isHasDetail());
                ps.setString(++i, errorSummary.getDetailMessage());
    public Transfer getTransferDetails(String jobId) {

        String sql = "SELECT transfer_details FROM job WHERE job_id = ?";

        String json = jdbcTemplate.queryForObject(sql, String.class, new Object[]{jobId});
        if (json == null) {
            return null;
        }

        try {
            return MAPPER.readValue(json, Transfer.class);
        } catch (JsonProcessingException ex) {
            throw new RuntimeException(ex);
        }
    }
Sonia Zorba's avatar
Sonia Zorba committed

    private String toJson(Object data) {
Sonia Zorba's avatar
Sonia Zorba committed
        try {
            return MAPPER.writeValueAsString(data);
        } catch (JsonProcessingException ex) {
            throw new RuntimeException(ex);
        }
    }

    public static XMLGregorianCalendar toXMLGregorianCalendar(Timestamp t) {
        if (t != null) {
            try {
                XMLGregorianCalendar cal = DatatypeFactory.newInstance().newXMLGregorianCalendar();

                LocalDateTime ldt = t.toLocalDateTime();

                cal.setYear(ldt.getYear());
                cal.setMonth(ldt.getMonthValue());
                cal.setDay(ldt.getDayOfMonth());
                cal.setHour(ldt.getHour());
                cal.setMinute(ldt.getMinute());
                cal.setSecond(ldt.getSecond());
                cal.setFractionalSecond(new BigDecimal("0." + ldt.getNano()));

                // return calendar only if it has been fully initialized (otherwise
                // toString issue could appear); return null in other cases.
                return cal;
            } catch (Exception e) {
                LOG.error("Error while generating XMLGregorianCalendar", e);
            }