Skip to content
JobService.java 11.3 KiB
Newer Older
Sonia Zorba's avatar
Sonia Zorba committed
/*
 * This file is part of vospace-rest
 * Copyright (C) 2021 Istituto Nazionale di Astrofisica
 * SPDX-License-Identifier: GPL-3.0-or-later
 */
package it.inaf.oats.vospace;

Sonia Zorba's avatar
Sonia Zorba committed
import it.inaf.ia2.aa.data.User;
Sonia Zorba's avatar
Sonia Zorba committed
import it.inaf.oats.vospace.datamodel.Views;
import it.inaf.oats.vospace.exception.InternalFaultException;
import it.inaf.oats.vospace.persistence.JobDAO;
import net.ivoa.xml.uws.v1.ExecutionPhase;
import net.ivoa.xml.uws.v1.JobSummary;
import net.ivoa.xml.vospace.v2.Protocol;
import net.ivoa.xml.vospace.v2.Transfer;
import it.inaf.oats.vospace.exception.ErrorSummaryFactory;
import it.inaf.oats.vospace.exception.InvalidArgumentException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import it.inaf.oats.vospace.exception.VoSpaceErrorSummarizableException;
import it.inaf.oats.vospace.persistence.NodeDAO;
Sonia Zorba's avatar
Sonia Zorba committed
import java.util.concurrent.CompletableFuture;
Sonia Zorba's avatar
Sonia Zorba committed
import javax.servlet.http.HttpServletRequest;
import net.ivoa.xml.uws.v1.ResultReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Service
public class JobService {

    private static final Logger LOG = LoggerFactory.getLogger(JobService.class);

    @Autowired
    private JobDAO jobDAO;

    @Autowired
    private UriService uriService;
    @Autowired
    private MoveService moveService;
    @Autowired
    private CopyService copyService;
Sonia Zorba's avatar
Sonia Zorba committed
    private AsyncTransferService asyncTransfService;
Sonia Zorba's avatar
Sonia Zorba committed
    @Autowired
    private HttpServletRequest servletRequest;
    
    @Autowired
    private FileServiceClient fileServiceClient;
        pullToVoSpace,
        pullFromVoSpace,
        copyNode;

        public static JobDirection getJobDirectionEnumFromTransfer(Transfer transfer) {
            String direction = transfer.getDirection();
            switch (direction) {
                case "pullToVoSpace":
                case "pullFromVoSpace":
                case "pushToVoSpace":
                case "pushFromVoSpace":
                    return JobDirection.valueOf(direction);

                default:
                    if (transfer.isKeepBytes()) {
                        return JobDirection.copyNode;
                    } else {
                        return JobDirection.moveNode;
                    }
            }
        }
    public void setJobPhase(JobSummary job, String phase) {

        LOG.trace("Job " + job.getJobId() + " phase set to " + phase);

        // TODO: check allowed job phase transitions
        switch (phase) {
            case "RUN":
                startJob(job);
                break;
            case "ABORT":
                throw new UnsupportedOperationException("Not implemented yet");
            default:
                throw new IllegalArgumentException("Invalid phase parameter: " + phase);
        }
    }

Sonia Zorba's avatar
Sonia Zorba committed
    private void startJob(JobSummary jobSummary) {
Sonia Zorba's avatar
Sonia Zorba committed
        handleJobErrors(jobSummary, job -> {
            Transfer transfer = uriService.getTransfer(job);

            ExecutionPhase phase;
                // Async recall from tape jobs are queued. They will be started by VOSpace transfer service
                phase = ExecutionPhase.QUEUED;
            } else {
                phase = ExecutionPhase.EXECUTING;
            }
            job.setPhase(phase);
            switch (getJobDirection(transfer)) {
                    negotiatedTransfer = handlePullToVoSpace(job, transfer);
                    break;
                case pullFromVoSpace:
                case pushToVoSpace:
                    negotiatedTransfer = uriService.getNegotiatedTransfer(job, transfer);
                    setJobResults(job, transfer);
Sonia Zorba's avatar
Sonia Zorba committed
                    handleMoveNode(job, transfer);
                    handleCopyNode(job, transfer);
                default:
                    throw new UnsupportedOperationException("Not implemented yet");
            }
Sonia Zorba's avatar
Sonia Zorba committed
            // Note: ExecutionPhase can't be set to COMPLETED here because all
            // the previous job are asynchronous. Each job has to set its
            // completion independently. Only jobs started from the /synctrans
            // endpoints are completed immediately (see createSyncJobResult() method)
Sonia Zorba's avatar
Sonia Zorba committed
        });
    private Transfer handlePullToVoSpace(JobSummary job, Transfer transfer) {
        if (isAsyncRecall(transfer)) {
            asyncTransfService.startJob(job);
            return transfer;
        }

        for (Protocol protocol : transfer.getProtocols()) {
            switch (protocol.getUri()) {
                case "ivo://ivoa.net/vospace/core#httpget":
                    String nodeUri = transfer.getTarget();
                    String contentUri = protocol.getEndpoint();
                    uriService.setNodeRemoteLocation(nodeUri, contentUri);
                    Transfer negotiatedTransfer = uriService.getNegotiatedTransfer(job, transfer);
                    setJobResults(job, transfer);
Sonia Zorba's avatar
Sonia Zorba committed
                    // Special case: import of a node from a portal file server
                    // doesn't imply file transfer, so it can be set to completed
                    job.setPhase(ExecutionPhase.COMPLETED);
                    throw new InvalidArgumentException("Unsupported pullToVoSpace protocol: " + protocol.getUri());
        throw new InvalidArgumentException("Transfer contains no protocols");
    private boolean isAsyncRecall(Transfer transfer) {
        return transfer.getView() != null
Sonia Zorba's avatar
Sonia Zorba committed
                && Views.ASYNC_RECALL_VIEW_URI.equals(transfer.getView().getUri());
Sonia Zorba's avatar
Sonia Zorba committed
    private void handleMoveNode(JobSummary jobSummary, Transfer transfer) {
        // User data must be extracted before starting the new thread
        // to avoid the "No thread-bound request found" exception
        User user = (User) servletRequest.getUserPrincipal();
        CompletableFuture.runAsync(() -> {
            handleJobErrors(jobSummary, job -> {
                moveService.processMoveJob(transfer, user);
                job.setPhase(ExecutionPhase.COMPLETED);
    private void handleCopyNode(JobSummary jobSummary, Transfer transfer) {
        // User data must be extracted before starting the new thread
        // to avoid the "No thread-bound request found" exception
        User user = (User) servletRequest.getUserPrincipal();
        CompletableFuture.runAsync(() -> {
            handleJobErrors(jobSummary, job -> {
                String jobId = jobSummary.getJobId();
                List<String> sourceAndDestination = copyService.processCopyNodes(transfer, jobId, user);
                // Call file service and command copy
Sonia Zorba's avatar
Sonia Zorba committed
                    fileServiceClient.startFileCopyJob(sourceAndDestination.get(0), sourceAndDestination.get(1), jobId, user);
                } catch (Exception e) {
                    // We decided not to purge metadata in case of failure
                    // just release busy nodes setting job_id = null
                    nodeDao.releaseBusyNodesByJobId(jobId);                    
                    throw e;
                }
    private void handleJobErrors(JobSummary job, Function<JobSummary, Transfer> jobConsumer) {
        Transfer negotiatedTransfer = null;        
Sonia Zorba's avatar
Sonia Zorba committed
        try {
            negotiatedTransfer = jobConsumer.apply(job);
Sonia Zorba's avatar
Sonia Zorba committed
        } catch (VoSpaceErrorSummarizableException e) {
            job.setPhase(ExecutionPhase.ERROR);
            job.setErrorSummary(ErrorSummaryFactory.newErrorSummary(e));
        } catch (Exception e) {
            job.setPhase(ExecutionPhase.ERROR);
            job.setErrorSummary(ErrorSummaryFactory.newErrorSummary(
                    new InternalFaultException(e)));
        } finally {
            jobDAO.updateJob(job, negotiatedTransfer);
Sonia Zorba's avatar
Sonia Zorba committed
        }
    private JobDirection getJobDirection(Transfer transfer) {
        return JobDirection.getJobDirectionEnumFromTransfer(transfer);

    /**
     * Synchronous transfer endpoint creates a job that is immediately set to
     * COMPLETED or to ERROR in case some fault occurred.
     *
     * In case of ERROR, Protocols are stripped from job representation in
     * compliance with specifications
     *
    public Optional<VoSpaceErrorSummarizableException> createSyncJobResult(JobSummary job) {
        VoSpaceErrorSummarizableException exception = null;
            Transfer transfer = uriService.getTransfer(job);
            negotiatedTransfer = uriService.getNegotiatedTransfer(job, transfer);
            setJobResults(job, transfer);
            job.setPhase(ExecutionPhase.COMPLETED);
            // Need to catch other exceptions too to avoid inconsistent job status
        } catch (VoSpaceErrorSummarizableException e) {
            job.setPhase(ExecutionPhase.ERROR);
            stripProtocols(job, negotiatedTransfer);
            job.setErrorSummary(ErrorSummaryFactory.newErrorSummary(e));
        } catch (Exception e) {
            job.setPhase(ExecutionPhase.ERROR);
            stripProtocols(job, negotiatedTransfer);
            exception = new InternalFaultException(e);
            job.setErrorSummary(ErrorSummaryFactory.newErrorSummary(exception));
            jobDAO.createJob(job, negotiatedTransfer);
        }
        return Optional.ofNullable(exception);
    
    private void stripProtocols(JobSummary job, Transfer negotiatedTransfer) {
        uriService.getTransfer(job).getProtocols().clear();
        if (negotiatedTransfer != null) {
            negotiatedTransfer.getProtocols().clear();
        }
    }

    private void setJobResults(JobSummary jobSummary, Transfer transfer) {
        String baseUrl = servletRequest.getRequestURL().substring(0,
                servletRequest.getRequestURL().indexOf(servletRequest.getContextPath()));
        String href = baseUrl + servletRequest.getContextPath()
                + "/transfers/" + jobSummary.getJobId() + "/results/transferDetails";
        ResultReference transferDetailsRef = new ResultReference();
        transferDetailsRef.setId("transferDetails");
        transferDetailsRef.setHref(href);
        jobSummary.getResults().add(transferDetailsRef);
        switch (getJobDirection(transfer)) {
            case pullFromVoSpace:
            case pushToVoSpace:
                ResultReference dataNodeRef = new ResultReference();
                dataNodeRef.setId("dataNode");
                dataNodeRef.setHref(transfer.getTarget());
                jobSummary.getResults().add(dataNodeRef);
                break;