package it.inaf.oats.vospace.persistence; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.ObjectMapper; import java.sql.Timestamp; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Types; import java.util.List; import java.util.Optional; import javax.sql.DataSource; import javax.xml.datatype.DatatypeFactory; import javax.xml.datatype.XMLGregorianCalendar; import net.ivoa.xml.uws.v1.ExecutionPhase; import net.ivoa.xml.uws.v1.JobSummary; import net.ivoa.xml.uws.v1.ErrorSummary; import net.ivoa.xml.uws.v1.ShortJobDescription; import net.ivoa.xml.uws.v1.ResultReference; import net.ivoa.xml.uws.v1.Jobs; import it.inaf.oats.vospace.JobService; import net.ivoa.xml.vospace.v2.Transfer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Repository; import java.util.ArrayList; import java.time.LocalDateTime; import java.math.BigDecimal; import net.ivoa.xml.uws.v1.ErrorType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @Repository public class JobDAO { private static final Logger LOG = LoggerFactory.getLogger(JobDAO.class); private static final ObjectMapper MAPPER = new ObjectMapper(); private final JdbcTemplate jdbcTemplate; @Autowired public JobDAO(DataSource dataSource) { jdbcTemplate = new JdbcTemplate(dataSource); } public void createJob(JobSummary jobSummary) { String sql = "INSERT INTO job(job_id, owner_id, job_type, phase, job_info," + " error_message, error_type, error_has_detail, error_detail) " + "VALUES (?, ?, ?, ?, ?, ? ,? ,? ,?)"; jdbcTemplate.update(sql, ps -> { int i = 0; ps.setString(++i, jobSummary.getJobId()); ps.setString(++i, jobSummary.getOwnerId()); ps.setObject(++i, getJobDirection(jobSummary), Types.VARCHAR); ps.setObject(++i, jobSummary.getPhase().value(), Types.OTHER); ps.setObject(++i, toJson(jobSummary.getJobInfo()), Types.OTHER); ErrorSummary errorSummary = jobSummary.getErrorSummary(); if(errorSummary != null) { ps.setString(++i, errorSummary.getMessage()); ps.setObject(++i, errorSummary.getType().value(), Types.OTHER); ps.setBoolean(++i, errorSummary.isHasDetail()); ps.setString(++i, errorSummary.getDetailMessage()); } else { ps.setNull(++i, Types.VARCHAR); ps.setNull(++i, Types.OTHER); ps.setNull(++i, Types.BOOLEAN); ps.setNull(++i, Types.VARCHAR); } }); } private String getJobDirection(JobSummary jobSummary) { List payload = jobSummary.getJobInfo().getAny(); if (payload.isEmpty()) { throw new IllegalArgumentException("Empty JobInfo"); } if (payload.size() > 1) { throw new UnsupportedOperationException("JobInfo as list not supported"); } Object jobPayload = payload.get(0); if (jobPayload instanceof Transfer) { Transfer transfer = (Transfer) jobPayload; return transfer.getDirection(); } throw new UnsupportedOperationException("JobInfo of type " + jobPayload + " not supported"); } public Optional getJob(String jobId) { String sql = "SELECT * FROM job WHERE job_id = ?"; JobSummary jobSummary = jdbcTemplate.query(sql, ps -> { ps.setString(1, jobId); }, rs -> { if (rs.next()) { return getJobSummaryFromResultSet(rs); } return null; }); return Optional.ofNullable(jobSummary); } private JobSummary getJobSummaryFromResultSet(ResultSet rs) throws SQLException { JobSummary jobSummary = new JobSummary(); jobSummary.setJobId(rs.getString("job_id")); jobSummary.setOwnerId(rs.getString("owner_id")); jobSummary.setPhase(ExecutionPhase.fromValue(rs.getString("phase"))); jobSummary.setJobInfo(getJobPayload(rs.getString("job_info"))); jobSummary.setResults(getResults(rs.getString("results"))); // Retrieve error information if any String errorType = rs.getString("error_type"); if (errorType != null) { ErrorSummary errorSummary = new ErrorSummary(); errorSummary.setMessage(rs.getString("error_message")); errorSummary.setType(ErrorType.fromValue(rs.getString("error_type"))); errorSummary.setHasDetail(rs.getBoolean("error_has_detail")); errorSummary.setDetailMessage(rs.getString("error_detail")); jobSummary.setErrorSummary(errorSummary); } return jobSummary; } public Jobs getJobs(String userId, List phaseList, List directionList, Optional after, Optional last ) { Jobs jobs = new Jobs(); jobs.setVersion("1.1"); List sjdList = jobs.getJobref(); // Query db to fill ShortJobDescription list ArrayList queryParams = new ArrayList<>(); ArrayList queryParamTypes = new ArrayList<>(); StringBuilder sb = new StringBuilder(); sb.append("SELECT * FROM job"); sb.append(" WHERE owner_id = ?"); queryParams.add(userId); queryParamTypes.add(Types.VARCHAR); // Fill conditions on execution phase if (phaseList.isEmpty()) { sb.append(" AND phase NOT IN (?)"); queryParams.add(ExecutionPhase.ARCHIVED); queryParamTypes.add(Types.OTHER); } else { sb.append(" AND phase IN ("); for (int i = 0; i < phaseList.size(); i++) { sb.append("?"); queryParams.add(phaseList.get(i)); queryParamTypes.add(Types.OTHER); if (i < phaseList.size() - 1) { sb.append(","); } } sb.append(")"); } // Fill conditions on type list if (!directionList.isEmpty()) { sb.append(" AND job_type IN ("); for (int i = 0; i < directionList.size(); i++) { sb.append("?"); queryParams.add(directionList.get(i)); queryParamTypes.add(Types.OTHER); if (i < directionList.size() - 1) { sb.append(","); } } sb.append(")"); } // Fill conditions on creation date if (after.isPresent()) { sb.append(" AND creation_time > ?"); queryParams.add(after.get()); queryParamTypes.add(Types.TIMESTAMP); } sb.append(" ORDER BY creation_time DESC"); if (last.isPresent()) { sb.append(" LIMIT ?"); queryParams.add(last.get()); queryParamTypes.add(Types.INTEGER); } String sql = sb.toString(); // Perform query jdbcTemplate.query(sql, (ps) -> { for (int i = 0; i < queryParams.size(); i++) { ps.setObject(i + 1, queryParams.get(i), queryParamTypes.get(i)); } }, (rs) -> { sjdList.add(getShortJobDescriptionFromCurrentRow(rs)); } ); return jobs; } private ShortJobDescription getShortJobDescriptionFromCurrentRow(ResultSet rs) throws SQLException { ShortJobDescription sjd = new ShortJobDescription(); sjd.setId(rs.getString("job_id")); sjd.setOwnerId(rs.getString("owner_id")); sjd.setType(rs.getString("job_type")); sjd.setPhase(ExecutionPhase.fromValue(rs.getString("phase"))); sjd.setCreationTime( toXMLGregorianCalendar(rs.getTimestamp("creation_time"))); return sjd; } private JobSummary.JobInfo getJobPayload(String json) { try { return MAPPER.readValue(json, JobSummary.JobInfo.class); } catch (JsonProcessingException ex) { throw new RuntimeException(ex); } } private List getResults(String json) { if (json == null) { return null; } try { JavaType type = MAPPER.getTypeFactory().constructCollectionType(List.class, ResultReference.class); return MAPPER.readValue(json, type); } catch (JsonProcessingException ex) { throw new RuntimeException(ex); } } public void updateJob(JobSummary job) { String sql = "UPDATE job SET (phase, results"; ErrorSummary errorSummary = job.getErrorSummary(); if(errorSummary != null) { sql += ", error_message, error_type, error_has_detail, error_detail"; } sql += ") = (?, ?"; if(errorSummary != null) { sql += ", ?, ?, ?, ?"; } sql += ") WHERE job_id = ?"; jdbcTemplate.update(sql, ps -> { int i = 0; ps.setObject(++i, job.getPhase().name(), Types.OTHER); ps.setObject(++i, toJson(job.getResults()), Types.OTHER); if(errorSummary != null) { ps.setString(++i, errorSummary.getMessage()); ps.setObject(++i, errorSummary.getType().value(), Types.OTHER); ps.setBoolean(++i, errorSummary.isHasDetail()); ps.setString(++i, errorSummary.getDetailMessage()); } ps.setString(++i, job.getJobId()); }); } private String toJson(Object data) { try { return MAPPER.writeValueAsString(data); } catch (JsonProcessingException ex) { throw new RuntimeException(ex); } } public static XMLGregorianCalendar toXMLGregorianCalendar(Timestamp t) { XMLGregorianCalendar cal = null; try { cal = DatatypeFactory.newInstance().newXMLGregorianCalendar(); LocalDateTime ldt = t.toLocalDateTime(); cal.setYear(ldt.getYear()); cal.setMonth(ldt.getMonthValue()); cal.setDay(ldt.getDayOfMonth()); cal.setHour(ldt.getHour()); cal.setMinute(ldt.getMinute()); cal.setSecond(ldt.getSecond()); cal.setFractionalSecond(new BigDecimal("0." + ldt.getNano())); } catch (Exception e) { LOG.error("Error while generating XMLGregorianCalendar", e); } return cal; } }