/* * This file is part of vospace-rest * Copyright (C) 2021 Istituto Nazionale di Astrofisica * SPDX-License-Identifier: GPL-3.0-or-later */ package it.inaf.oats.vospace; import it.inaf.ia2.aa.data.User; import it.inaf.oats.vospace.datamodel.Views; import it.inaf.oats.vospace.exception.InternalFaultException; import it.inaf.oats.vospace.persistence.JobDAO; import net.ivoa.xml.uws.v1.ExecutionPhase; import net.ivoa.xml.uws.v1.JobSummary; import net.ivoa.xml.vospace.v2.Protocol; import net.ivoa.xml.vospace.v2.Transfer; import it.inaf.oats.vospace.exception.ErrorSummaryFactory; import it.inaf.oats.vospace.exception.InvalidArgumentException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import it.inaf.oats.vospace.exception.VoSpaceErrorSummarizableException; import it.inaf.oats.vospace.persistence.NodeDAO; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.function.Function; import javax.servlet.http.HttpServletRequest; import net.ivoa.xml.uws.v1.ResultReference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @Service public class JobService { private static final Logger LOG = LoggerFactory.getLogger(JobService.class); @Autowired private JobDAO jobDAO; @Autowired private UriService uriService; @Autowired private MoveService moveService; @Autowired private CopyService copyService; @Autowired private AsyncTransferService asyncTransfService; @Autowired private HttpServletRequest servletRequest; @Autowired private FileServiceClient fileServiceClient; @Autowired private NodeDAO nodeDao; public enum JobDirection { pullToVoSpace, pullFromVoSpace, pushToVoSpace, pushFromVoSpace, moveNode, copyNode; public static JobDirection getJobDirectionEnumFromTransfer(Transfer transfer) { String direction = transfer.getDirection(); switch (direction) { case "pullToVoSpace": case "pullFromVoSpace": case "pushToVoSpace": case "pushFromVoSpace": return JobDirection.valueOf(direction); default: if (transfer.isKeepBytes()) { return JobDirection.copyNode; } else { return JobDirection.moveNode; } } } } public void setJobPhase(JobSummary job, String phase) { LOG.trace("Job " + job.getJobId() + " phase set to " + phase); // TODO: check allowed job phase transitions switch (phase) { case "RUN": startJob(job); break; case "ABORT": throw new UnsupportedOperationException("Not implemented yet"); default: throw new IllegalArgumentException("Invalid phase parameter: " + phase); } } private void startJob(JobSummary jobSummary) { handleJobErrors(jobSummary, job -> { Transfer transfer = uriService.getTransfer(job); ExecutionPhase phase; if (isAsyncRecall(transfer)) { // Async recall from tape jobs are queued. They will be started by VOSpace transfer service phase = ExecutionPhase.QUEUED; } else { phase = ExecutionPhase.EXECUTING; } job.setPhase(phase); jobDAO.updateJob(job, null); Transfer negotiatedTransfer = null; switch (getJobDirection(transfer)) { case pullToVoSpace: negotiatedTransfer = handlePullToVoSpace(job, transfer); break; case pullFromVoSpace: case pushToVoSpace: negotiatedTransfer = uriService.getNegotiatedTransfer(job, transfer); setJobResults(job, transfer); break; case moveNode: handleMoveNode(job, transfer); break; case copyNode: handleCopyNode(job, transfer); break; default: throw new UnsupportedOperationException("Not implemented yet"); } // Note: ExecutionPhase can't be set to COMPLETED here because all // the previous job are asynchronous. Each job has to set its // completion independently. Only jobs started from the /synctrans // endpoints are completed immediately (see createSyncJobResult() method) return negotiatedTransfer; }); } private Transfer handlePullToVoSpace(JobSummary job, Transfer transfer) { if (isAsyncRecall(transfer)) { asyncTransfService.startJob(job); return transfer; } for (Protocol protocol : transfer.getProtocols()) { switch (protocol.getUri()) { case "ivo://ivoa.net/vospace/core#httpget": String nodeUri = transfer.getTarget(); String contentUri = protocol.getEndpoint(); uriService.setNodeRemoteLocation(nodeUri, contentUri); Transfer negotiatedTransfer = uriService.getNegotiatedTransfer(job, transfer); setJobResults(job, transfer); // Special case: import of a node from a portal file server // doesn't imply file transfer, so it can be set to completed job.setPhase(ExecutionPhase.COMPLETED); return negotiatedTransfer; default: throw new InvalidArgumentException("Unsupported pullToVoSpace protocol: " + protocol.getUri()); } } throw new InvalidArgumentException("Transfer contains no protocols"); } private boolean isAsyncRecall(Transfer transfer) { return transfer.getView() != null && Views.ASYNC_RECALL_VIEW_URI.equals(transfer.getView().getUri()); } private void handleMoveNode(JobSummary jobSummary, Transfer transfer) { // User data must be extracted before starting the new thread // to avoid the "No thread-bound request found" exception User user = (User) servletRequest.getUserPrincipal(); CompletableFuture.runAsync(() -> { handleJobErrors(jobSummary, job -> { moveService.processMoveJob(transfer, user); job.setPhase(ExecutionPhase.COMPLETED); return null; }); }); } private void handleCopyNode(JobSummary jobSummary, Transfer transfer) { // User data must be extracted before starting the new thread // to avoid the "No thread-bound request found" exception User user = (User) servletRequest.getUserPrincipal(); CompletableFuture.runAsync(() -> { handleJobErrors(jobSummary, job -> { String jobId = jobSummary.getJobId(); // Index 0: source 1: destination List sourceAndDestination = copyService.processCopyNodes(transfer, jobId, user); // Call file service and command copy try{ fileServiceClient.startFileCopyJob(sourceAndDestination.get(0), sourceAndDestination.get(1), jobId, user); } catch (Exception e) { // We decided not to purge metadata in case of failure // just release busy nodes setting job_id = null nodeDao.releaseBusyNodesByJobId(jobId); throw e; } return null; }); }); } private void handleJobErrors(JobSummary job, Function jobConsumer) { Transfer negotiatedTransfer = null; try { negotiatedTransfer = jobConsumer.apply(job); } catch (VoSpaceErrorSummarizableException e) { job.setPhase(ExecutionPhase.ERROR); job.setErrorSummary(ErrorSummaryFactory.newErrorSummary(e)); } catch (Exception e) { job.setPhase(ExecutionPhase.ERROR); job.setErrorSummary(ErrorSummaryFactory.newErrorSummary( new InternalFaultException(e))); } finally { jobDAO.updateJob(job, negotiatedTransfer); } } private JobDirection getJobDirection(Transfer transfer) { return JobDirection.getJobDirectionEnumFromTransfer(transfer); } /** * Synchronous transfer endpoint creates a job that is immediately set to * COMPLETED or to ERROR in case some fault occurred. * * In case of ERROR, Protocols are stripped from job representation in * compliance with specifications * */ public Optional createSyncJobResult(JobSummary job) { Transfer negotiatedTransfer = null; VoSpaceErrorSummarizableException exception = null; try { Transfer transfer = uriService.getTransfer(job); negotiatedTransfer = uriService.getNegotiatedTransfer(job, transfer); setJobResults(job, transfer); job.setPhase(ExecutionPhase.COMPLETED); // Need to catch other exceptions too to avoid inconsistent job status } catch (VoSpaceErrorSummarizableException e) { job.setPhase(ExecutionPhase.ERROR); stripProtocols(job, negotiatedTransfer); job.setErrorSummary(ErrorSummaryFactory.newErrorSummary(e)); exception = e; } catch (Exception e) { job.setPhase(ExecutionPhase.ERROR); stripProtocols(job, negotiatedTransfer); exception = new InternalFaultException(e); job.setErrorSummary(ErrorSummaryFactory.newErrorSummary(exception)); } finally { jobDAO.createJob(job, negotiatedTransfer); } return Optional.ofNullable(exception); } private void stripProtocols(JobSummary job, Transfer negotiatedTransfer) { uriService.getTransfer(job).getProtocols().clear(); if (negotiatedTransfer != null) { negotiatedTransfer.getProtocols().clear(); } } private void setJobResults(JobSummary jobSummary, Transfer transfer) { String baseUrl = servletRequest.getRequestURL().substring(0, servletRequest.getRequestURL().indexOf(servletRequest.getContextPath())); String href = baseUrl + servletRequest.getContextPath() + "/transfers/" + jobSummary.getJobId() + "/results/transferDetails"; ResultReference transferDetailsRef = new ResultReference(); transferDetailsRef.setId("transferDetails"); transferDetailsRef.setHref(href); jobSummary.getResults().add(transferDetailsRef); switch (getJobDirection(transfer)) { case pullFromVoSpace: case pushToVoSpace: ResultReference dataNodeRef = new ResultReference(); dataNodeRef.setId("dataNode"); dataNodeRef.setHref(transfer.getTarget()); jobSummary.getResults().add(dataNodeRef); break; } } }