Skip to content
JobDAO.java 10.9 KiB
Newer Older
Sonia Zorba's avatar
Sonia Zorba committed
package it.inaf.oats.vospace.persistence;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.sql.Timestamp;
Sonia Zorba's avatar
Sonia Zorba committed
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
import java.util.List;
import java.util.Optional;
import javax.sql.DataSource;
import javax.xml.datatype.DatatypeFactory;
import javax.xml.datatype.XMLGregorianCalendar;
Sonia Zorba's avatar
Sonia Zorba committed
import net.ivoa.xml.uws.v1.ExecutionPhase;
import net.ivoa.xml.uws.v1.JobSummary;
import net.ivoa.xml.uws.v1.ErrorSummary;
import net.ivoa.xml.uws.v1.ShortJobDescription;
Sonia Zorba's avatar
Sonia Zorba committed
import net.ivoa.xml.uws.v1.ResultReference;
import net.ivoa.xml.uws.v1.Jobs;
import it.inaf.oats.vospace.JobService;
Sonia Zorba's avatar
Sonia Zorba committed
import net.ivoa.xml.vospace.v2.Transfer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
import java.util.ArrayList;
import java.time.LocalDateTime;
import java.math.BigDecimal;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Sonia Zorba's avatar
Sonia Zorba committed

@Repository
public class JobDAO {

    private static final Logger LOG = LoggerFactory.getLogger(JobDAO.class);
Sonia Zorba's avatar
Sonia Zorba committed
    private static final ObjectMapper MAPPER = new ObjectMapper();

    private final JdbcTemplate jdbcTemplate;

    @Autowired
    public JobDAO(DataSource dataSource) {
        jdbcTemplate = new JdbcTemplate(dataSource);
    }

    public void createJob(JobSummary jobSummary) {

        String sql = 
                "INSERT INTO job(job_id, owner_id, job_type, phase, job_info,"
                + " error_message, error_type, error_has_detail, error_detail) "
                + "VALUES (?, ?, ?, ?, ?, ? ,? ,? ,?)";
Sonia Zorba's avatar
Sonia Zorba committed

        jdbcTemplate.update(sql, ps -> {
            int i = 0;
            ps.setString(++i, jobSummary.getJobId());
            ps.setString(++i, jobSummary.getOwnerId());
            ps.setObject(++i, getJobDirection(jobSummary), Types.VARCHAR);
Sonia Zorba's avatar
Sonia Zorba committed
            ps.setObject(++i, jobSummary.getPhase().value(), Types.OTHER);
            ps.setObject(++i, toJson(jobSummary.getJobInfo()), Types.OTHER);
            
            ErrorSummary errorSummary = jobSummary.getErrorSummary();
            if(errorSummary != null) {
                ps.setString(++i, errorSummary.getMessage());
                ps.setObject(++i, errorSummary.getType().value(), Types.OTHER);
                ps.setBoolean(++i, errorSummary.isHasDetail());
                ps.setString(++i, errorSummary.getDetailMessage());
            } else {
                ps.setNull(++i, Types.VARCHAR);
                ps.setNull(++i, Types.OTHER);
                ps.setNull(++i, Types.BOOLEAN);
                ps.setNull(++i, Types.VARCHAR);
    private String getJobDirection(JobSummary jobSummary) {
Sonia Zorba's avatar
Sonia Zorba committed

        List<Object> payload = jobSummary.getJobInfo().getAny();
        if (payload.isEmpty()) {
            throw new IllegalArgumentException("Empty JobInfo");
        }
        if (payload.size() > 1) {
            throw new UnsupportedOperationException("JobInfo as list not supported");
        }

        Object jobPayload = payload.get(0);
Sonia Zorba's avatar
Sonia Zorba committed

        if (jobPayload instanceof Transfer) {
            Transfer transfer = (Transfer) jobPayload;
            return transfer.getDirection();
        }

        throw new UnsupportedOperationException("JobInfo of type " + jobPayload + " not supported");
    }

    public Optional<JobSummary> getJob(String jobId) {

        String sql = "SELECT * FROM job WHERE job_id = ?";

        JobSummary jobSummary = jdbcTemplate.query(sql,
                ps -> {
                    ps.setString(1, jobId);
                }, rs -> {
                    if (rs.next()) {
                        return getJobSummaryFromResultSet(rs);
                    }
                    return null;
                });

        return Optional.ofNullable(jobSummary);
    }

    private JobSummary getJobSummaryFromResultSet(ResultSet rs) throws SQLException {

        JobSummary jobSummary = new JobSummary();

        jobSummary.setJobId(rs.getString("job_id"));
        jobSummary.setOwnerId(rs.getString("owner_id"));
        jobSummary.setPhase(ExecutionPhase.fromValue(rs.getString("phase")));
        jobSummary.setJobInfo(getJobPayload(rs.getString("job_info")));
Sonia Zorba's avatar
Sonia Zorba committed
        jobSummary.setResults(getResults(rs.getString("results")));
        
        // Retrieve error information if any
        String errorType = rs.getString("error_type");
        if (errorType != null) {
Sonia Zorba's avatar
Sonia Zorba committed
            ErrorSummary errorSummary = new ErrorSummary();
            errorSummary.setMessage(rs.getString("error_message"));
            errorSummary.setType(ErrorType.fromValue(rs.getString("error_type")));
Sonia Zorba's avatar
Sonia Zorba committed
            errorSummary.setHasDetail(rs.getBoolean("error_has_detail"));
            errorSummary.setDetailMessage(rs.getString("error_detail"));

            jobSummary.setErrorSummary(errorSummary);
Sonia Zorba's avatar
Sonia Zorba committed
        return jobSummary;
    }

    public Jobs getJobs(String userId,
            List<ExecutionPhase> phaseList,
            List<JobService.JobDirection> directionList,
            Optional<LocalDateTime> after,
            Optional<Integer> last
    ) {
        Jobs jobs = new Jobs();
        jobs.setVersion("1.1");

        List<ShortJobDescription> sjdList = jobs.getJobref();
        // Query db to fill ShortJobDescription list
        ArrayList<Object> queryParams = new ArrayList<>();
        ArrayList<Integer> queryParamTypes = new ArrayList<>();

        StringBuilder sb = new StringBuilder();
        sb.append("SELECT * FROM job");

        sb.append(" WHERE owner_id = ?");
        queryParams.add(userId);
        queryParamTypes.add(Types.VARCHAR);

        // Fill conditions on execution phase
        if (phaseList.isEmpty()) {
            sb.append(" AND phase NOT IN (?)");
            queryParams.add(ExecutionPhase.ARCHIVED);
            queryParamTypes.add(Types.OTHER);
        } else {
            for (int i = 0; i < phaseList.size(); i++) {
                queryParams.add(phaseList.get(i));
                queryParamTypes.add(Types.OTHER);
                if (i < phaseList.size() - 1) {
                    sb.append(",");
                }
            }
            sb.append(")");
        }

        // Fill conditions on type list
        if (!directionList.isEmpty()) {
            sb.append(" AND job_type IN (");
            for (int i = 0; i < directionList.size(); i++) {
                sb.append("?");
                queryParams.add(directionList.get(i));
                queryParamTypes.add(Types.OTHER);
                if (i < directionList.size() - 1) {
                    sb.append(",");
                }
            }
            sb.append(")");
        }

        // Fill conditions on creation date
        if (after.isPresent()) {
            sb.append(" AND creation_time > ?");
            queryParams.add(after.get());
            queryParamTypes.add(Types.TIMESTAMP);
        }

        sb.append(" ORDER BY creation_time DESC");

        if (last.isPresent()) {
            sb.append(" LIMIT ?");
            queryParams.add(last.get());
            queryParamTypes.add(Types.INTEGER);
        }

        String sql = sb.toString();

        // Perform query
        jdbcTemplate.query(sql,
                    for (int i = 0; i < queryParams.size(); i++) {
                        ps.setObject(i + 1, queryParams.get(i),
                                queryParamTypes.get(i));
                    }
                },
                (rs) -> {
                    sjdList.add(getShortJobDescriptionFromCurrentRow(rs));
                }
        );

        return jobs;
    }

    private ShortJobDescription getShortJobDescriptionFromCurrentRow(ResultSet rs)
            throws SQLException {
        ShortJobDescription sjd = new ShortJobDescription();
        sjd.setId(rs.getString("job_id"));
        sjd.setOwnerId(rs.getString("owner_id"));
        sjd.setType(rs.getString("job_type"));
        sjd.setPhase(ExecutionPhase.fromValue(rs.getString("phase")));
        sjd.setCreationTime(
                toXMLGregorianCalendar(rs.getTimestamp("creation_time")));

    private JobSummary.JobInfo getJobPayload(String json) {
Sonia Zorba's avatar
Sonia Zorba committed
        try {
            return MAPPER.readValue(json, JobSummary.JobInfo.class);
Sonia Zorba's avatar
Sonia Zorba committed
        } catch (JsonProcessingException ex) {
            throw new RuntimeException(ex);
        }
    }

    private List<ResultReference> getResults(String json) {
        if (json == null) {
            return null;
        }
        try {
            JavaType type = MAPPER.getTypeFactory().constructCollectionType(List.class, ResultReference.class);
            return MAPPER.readValue(json, type);
        } catch (JsonProcessingException ex) {
            throw new RuntimeException(ex);
        }
    }

    public void updateJob(JobSummary job) {

        String sql = "UPDATE job SET (phase, results";
        
        ErrorSummary errorSummary = job.getErrorSummary();
        if(errorSummary != null)
        {
            sql += ", error_message, error_type, error_has_detail, error_detail";
Sonia Zorba's avatar
Sonia Zorba committed

        jdbcTemplate.update(sql, ps -> {
            int i = 0;
            ps.setObject(++i, job.getPhase().name(), Types.OTHER);
            ps.setObject(++i, toJson(job.getResults()), Types.OTHER);
            if(errorSummary != null)
            {
                ps.setString(++i, errorSummary.getMessage());
                ps.setObject(++i, errorSummary.getType().value(), Types.OTHER);                
                ps.setBoolean(++i, errorSummary.isHasDetail());
                ps.setString(++i, errorSummary.getDetailMessage());
Sonia Zorba's avatar
Sonia Zorba committed
        });
    }

    private String toJson(Object data) {
        try {
            return MAPPER.writeValueAsString(data);
        } catch (JsonProcessingException ex) {
            throw new RuntimeException(ex);
        }
    }

    public static XMLGregorianCalendar toXMLGregorianCalendar(Timestamp t) {
        XMLGregorianCalendar cal = null;
            cal = DatatypeFactory.newInstance().newXMLGregorianCalendar();

            LocalDateTime ldt = t.toLocalDateTime();

            cal.setYear(ldt.getYear());
            cal.setMonth(ldt.getMonthValue());
            cal.setDay(ldt.getDayOfMonth());
            cal.setHour(ldt.getHour());
            cal.setMinute(ldt.getMinute());
            cal.setSecond(ldt.getSecond());
            cal.setFractionalSecond(new BigDecimal("0." + ldt.getNano()));

        } catch (Exception e) {
            LOG.error("Error while generating XMLGregorianCalendar", e);