package it.inaf.oats.vospace; import it.inaf.oats.vospace.exception.InternalFaultException; import it.inaf.oats.vospace.persistence.JobDAO; import net.ivoa.xml.uws.v1.ExecutionPhase; import net.ivoa.xml.uws.v1.JobSummary; import net.ivoa.xml.vospace.v2.Protocol; import net.ivoa.xml.vospace.v2.Transfer; import it.inaf.oats.vospace.exception.ErrorSummaryFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import it.inaf.oats.vospace.exception.VoSpaceErrorSummarizableException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @Service public class JobService { private static final Logger LOG = LoggerFactory.getLogger(JobService.class); @Autowired private JobDAO jobDAO; @Autowired private UriService uriService; @Autowired private MoveService moveService; @Autowired private AsyncTransferService asyncTransfService; public enum JobDirection { pullToVoSpace, pullFromVoSpace, pushToVoSpace, pushFromVoSpace, moveNode, copyNode; public static JobDirection getJobDirectionEnumFromTransfer(Transfer transfer) { String direction = transfer.getDirection(); switch (direction) { case "pullToVoSpace": case "pullFromVoSpace": case "pushToVoSpace": case "pushFromVoSpace": return JobDirection.valueOf(direction); default: if (transfer.isKeepBytes()) { return JobDirection.copyNode; } else { return JobDirection.moveNode; } } } } public void setJobPhase(JobSummary job, String phase) { LOG.trace("Job " + job.getJobId() + " phase set to " + phase); // TODO: check allowed job phase transitions switch (phase) { case "RUN": startJob(job); break; case "ABORT": throw new UnsupportedOperationException("Not implemented yet"); default: throw new IllegalArgumentException("Invalid phase parameter: " + phase); } } private void startJob(JobSummary job) { try { Transfer transfer = uriService.getTransfer(job); ExecutionPhase phase; if (transfer.getProtocols().stream().anyMatch(p -> "ia2:async-recall".equals(p.getUri()))) { // Async recall from tape jobs are queued. They will be started by VOSpace transfer service phase = ExecutionPhase.QUEUED; } else { phase = ExecutionPhase.EXECUTING; } job.setPhase(phase); jobDAO.updateJob(job); switch (getJobDirection(transfer)) { case pullToVoSpace: handlePullToVoSpace(job, transfer); break; case pullFromVoSpace: case pushToVoSpace: handleVoSpaceUrlsListResult(job, transfer); break; case moveNode: throw new UnsupportedOperationException("Not implemented yet"); // handleMoveNode(job, transfer); // break; default: throw new UnsupportedOperationException("Not implemented yet"); } } catch (VoSpaceErrorSummarizableException e) { job.setPhase(ExecutionPhase.ERROR); job.setErrorSummary(ErrorSummaryFactory.newErrorSummary(e)); jobDAO.updateJob(job); } catch (Exception e) { job.setPhase(ExecutionPhase.ERROR); job.setErrorSummary(ErrorSummaryFactory.newErrorSummary( new InternalFaultException(e))); jobDAO.updateJob(job); } } private void handlePullToVoSpace(JobSummary job, Transfer transfer) { for (Protocol protocol : transfer.getProtocols()) { switch (protocol.getUri()) { case "ia2:async-recall": asyncTransfService.startJob(job); return; case "ivo://ivoa.net/vospace/core#httpget": String nodeUri = transfer.getTarget(); String contentUri = protocol.getEndpoint(); uriService.setNodeRemoteLocation(nodeUri, contentUri); uriService.setTransferJobResult(job, transfer); return; default: throw new InternalFaultException("Unsupported pullToVoSpace protocol: " + protocol.getUri()); } } } private void handleVoSpaceUrlsListResult(JobSummary job, Transfer transfer) { uriService.setTransferJobResult(job, transfer); } private void handleMoveNode(JobSummary job, Transfer transfer) { moveService.processMoveJob(job, transfer); } private JobDirection getJobDirection(Transfer transfer) { return JobDirection.getJobDirectionEnumFromTransfer(transfer); } /** * Synchronous transfer endpoint creates a job that is immediately set to * COMPLETED or to ERROR in case some fault occurred. * * In case of ERROR, Protocols are stripped from job representation in * compliance with specifications * */ public void createSyncJobResult(JobSummary job) { try { uriService.setSyncTransferEndpoints(job); job.setPhase(ExecutionPhase.COMPLETED); // Need to catch other exceptions too to avoid inconsistent job status } catch (VoSpaceErrorSummarizableException e) { job.setPhase(ExecutionPhase.ERROR); uriService.getTransfer(job).getProtocols().clear(); job.setErrorSummary(ErrorSummaryFactory.newErrorSummary(e)); } catch (Exception e) { job.setPhase(ExecutionPhase.ERROR); uriService.getTransfer(job).getProtocols().clear(); job.setErrorSummary(ErrorSummaryFactory.newErrorSummary( new InternalFaultException(e))); } finally { jobDAO.createJob(job); } } }