Skip to content
JobService.java 6.19 KiB
Newer Older
package it.inaf.oats.vospace;

import it.inaf.oats.vospace.exception.InternalFaultException;
import it.inaf.oats.vospace.persistence.JobDAO;
import net.ivoa.xml.uws.v1.ExecutionPhase;
import net.ivoa.xml.uws.v1.JobSummary;
import net.ivoa.xml.vospace.v2.Protocol;
import net.ivoa.xml.vospace.v2.Transfer;
import it.inaf.oats.vospace.exception.ErrorSummaryFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import it.inaf.oats.vospace.exception.VoSpaceErrorSummarizableException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Service
public class JobService {

    private static final Logger LOG = LoggerFactory.getLogger(JobService.class);

    @Autowired
    private JobDAO jobDAO;

    @Autowired
    private UriService uriService;
    
    @Autowired
    private MoveService moveService;
Sonia Zorba's avatar
Sonia Zorba committed
    private AsyncTransferService asyncTransfService;
        pullToVoSpace,
        pullFromVoSpace,
        copyNode;

        public static JobDirection getJobDirectionEnumFromTransfer(Transfer transfer) {
            String direction = transfer.getDirection();
            switch (direction) {
                case "pullToVoSpace":
                case "pullFromVoSpace":
                case "pushToVoSpace":
                case "pushFromVoSpace":
                    return JobDirection.valueOf(direction);

                default:
                    if (transfer.isKeepBytes()) {
                        return JobDirection.copyNode;
                    } else {
                        return JobDirection.moveNode;
                    }

            }
        }

    public void setJobPhase(JobSummary job, String phase) {

        LOG.trace("Job " + job.getJobId() + " phase set to " + phase);

        // TODO: check allowed job phase transitions
        switch (phase) {
            case "RUN":
                startJob(job);
                break;
            case "ABORT":
                throw new UnsupportedOperationException("Not implemented yet");
            default:
                throw new IllegalArgumentException("Invalid phase parameter: " + phase);
        }
    }

    private void startJob(JobSummary job) {
            Transfer transfer = uriService.getTransfer(job);

            ExecutionPhase phase;
            if (transfer.getProtocols().stream().anyMatch(p -> "ia2:async-recall".equals(p.getUri()))) {
                // Async recall from tape jobs are queued. They will be started by VOSpace transfer service
                phase = ExecutionPhase.QUEUED;
            } else {
                phase = ExecutionPhase.EXECUTING;
            }
            job.setPhase(phase);

            jobDAO.updateJob(job);

            switch (getJobDirection(transfer)) {
                case pullToVoSpace:
                    handlePullToVoSpace(job, transfer);
                    break;
                case pullFromVoSpace:
                case pushToVoSpace:
                    handleVoSpaceUrlsListResult(job, transfer);
                    break;
                case moveNode:
                    throw new UnsupportedOperationException("Not implemented yet");
                    // handleMoveNode(job, transfer);
                    // break;
                default:
                    throw new UnsupportedOperationException("Not implemented yet");
            }

        } catch (VoSpaceErrorSummarizableException e) {
            job.setPhase(ExecutionPhase.ERROR);
            job.setErrorSummary(ErrorSummaryFactory.newErrorSummary(e));
            jobDAO.updateJob(job);
        } catch (Exception e) {
            job.setPhase(ExecutionPhase.ERROR);
            job.setErrorSummary(ErrorSummaryFactory.newErrorSummary(
                    new InternalFaultException(e)));
    private void handlePullToVoSpace(JobSummary job, Transfer transfer) {

        for (Protocol protocol : transfer.getProtocols()) {
            switch (protocol.getUri()) {
                case "ia2:async-recall":
                    asyncTransfService.startJob(job);
                    return;
                case "ivo://ivoa.net/vospace/core#httpget":
                    String nodeUri = transfer.getTarget();
                    String contentUri = protocol.getEndpoint();
                    uriService.setNodeRemoteLocation(nodeUri, contentUri);
                    uriService.setTransferJobResult(job, transfer);
                    return;
                default:
                    throw new InternalFaultException("Unsupported pullToVoSpace protocol: " + protocol.getUri());
            }
        }
    }

    private void handleVoSpaceUrlsListResult(JobSummary job, Transfer transfer) {
        uriService.setTransferJobResult(job, transfer);
    
    private void handleMoveNode(JobSummary job, Transfer transfer)
    {
        moveService.processMoveJob(job, transfer);
    }
    private JobDirection getJobDirection(Transfer transfer) {
        return JobDirection.getJobDirectionEnumFromTransfer(transfer);

    /**
     * Synchronous transfer endpoint creates a job that is immediately set to
     * COMPLETED or to ERROR in case some fault occurred.
     *
     * In case of ERROR, Protocols are stripped from job representation in
     * compliance with specifications
     *
     */
    public void createSyncJobResult(JobSummary job) {
        try {
            uriService.setSyncTransferEndpoints(job);
            job.setPhase(ExecutionPhase.COMPLETED);
            // Need to catch other exceptions too to avoid inconsistent job status
        } catch (VoSpaceErrorSummarizableException e) {
            job.setPhase(ExecutionPhase.ERROR);
            uriService.getTransfer(job).getProtocols().clear();
            job.setErrorSummary(ErrorSummaryFactory.newErrorSummary(e));
        } catch (Exception e) {
            job.setPhase(ExecutionPhase.ERROR);
            uriService.getTransfer(job).getProtocols().clear();
            job.setErrorSummary(ErrorSummaryFactory.newErrorSummary(
                    new InternalFaultException(e)));