Newer
Older
/*
* This file is part of vospace-rest
* Copyright (C) 2021 Istituto Nazionale di Astrofisica
* SPDX-License-Identifier: GPL-3.0-or-later
*/
package it.inaf.oats.vospace;
import it.inaf.oats.vospace.exception.InternalFaultException;
import it.inaf.oats.vospace.persistence.JobDAO;
import net.ivoa.xml.uws.v1.ExecutionPhase;
import net.ivoa.xml.uws.v1.JobSummary;
import net.ivoa.xml.vospace.v2.Protocol;
import net.ivoa.xml.vospace.v2.Transfer;
import it.inaf.oats.vospace.exception.ErrorSummaryFactory;
import it.inaf.oats.vospace.exception.InvalidArgumentException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
Nicola Fulvio Calabria
committed
import it.inaf.oats.vospace.exception.VoSpaceErrorSummarizableException;
import it.inaf.oats.vospace.persistence.NodeDAO;
import java.util.List;
import java.util.Optional;
Sonia Zorba
committed
import java.util.function.Function;
Sonia Zorba
committed
import net.ivoa.xml.uws.v1.ResultReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Service
public class JobService {
private static final Logger LOG = LoggerFactory.getLogger(JobService.class);
@Autowired
private JobDAO jobDAO;
@Autowired
private UriService uriService;
@Autowired
private MoveService moveService;
@Autowired
private CopyService copyService;
@Autowired
private HttpServletRequest servletRequest;
@Autowired
private FileServiceClient fileServiceClient;
@Autowired
private NodeDAO nodeDao;
public enum JobDirection {
pullToVoSpace,
pullFromVoSpace,
pushFromVoSpace,
copyNode;
public static JobDirection getJobDirectionEnumFromTransfer(Transfer transfer) {
String direction = transfer.getDirection();
switch (direction) {
case "pullToVoSpace":
case "pullFromVoSpace":
case "pushToVoSpace":
case "pushFromVoSpace":
return JobDirection.valueOf(direction);
default:
if (transfer.isKeepBytes()) {
return JobDirection.copyNode;
} else {
return JobDirection.moveNode;
}
}
}
public void setJobPhase(JobSummary job, String phase) {
LOG.trace("Job " + job.getJobId() + " phase set to " + phase);
// TODO: check allowed job phase transitions
switch (phase) {
case "RUN":
startJob(job);
break;
case "ABORT":
throw new UnsupportedOperationException("Not implemented yet");
default:
throw new IllegalArgumentException("Invalid phase parameter: " + phase);
}
}
Transfer transfer = uriService.getTransfer(job);
Sonia Zorba
committed
if (isAsyncRecall(transfer)) {
// Async recall from tape jobs are queued. They will be started by VOSpace transfer service
phase = ExecutionPhase.QUEUED;
} else {
phase = ExecutionPhase.EXECUTING;
}
job.setPhase(phase);
Sonia Zorba
committed
jobDAO.updateJob(job, null);
Sonia Zorba
committed
Transfer negotiatedTransfer = null;
switch (getJobDirection(transfer)) {
case pullToVoSpace:
Sonia Zorba
committed
negotiatedTransfer = handlePullToVoSpace(job, transfer);
break;
case pullFromVoSpace:
case pushToVoSpace:
Sonia Zorba
committed
negotiatedTransfer = uriService.getNegotiatedTransfer(job, transfer);
setJobResults(job, transfer);
break;
case moveNode:
handleCopyNode(job, transfer);
default:
throw new UnsupportedOperationException("Not implemented yet");
}
// Note: ExecutionPhase can't be set to COMPLETED here because all
// the previous job are asynchronous. Each job has to set its
// completion independently. Only jobs started from the /synctrans
// endpoints are completed immediately (see createSyncJobResult() method)
Sonia Zorba
committed
return negotiatedTransfer;
Sonia Zorba
committed
private Transfer handlePullToVoSpace(JobSummary job, Transfer transfer) {
Sonia Zorba
committed
if (isAsyncRecall(transfer)) {
asyncTransfService.startJob(job);
return transfer;
}
for (Protocol protocol : transfer.getProtocols()) {
switch (protocol.getUri()) {
case "ivo://ivoa.net/vospace/core#httpget":
Sonia Zorba
committed
String nodeUri = transfer.getTarget();
String contentUri = protocol.getEndpoint();
uriService.setNodeRemoteLocation(nodeUri, contentUri);
Sonia Zorba
committed
Transfer negotiatedTransfer = uriService.getNegotiatedTransfer(job, transfer);
setJobResults(job, transfer);
// Special case: import of a node from a portal file server
// doesn't imply file transfer, so it can be set to completed
job.setPhase(ExecutionPhase.COMPLETED);
Sonia Zorba
committed
return negotiatedTransfer;
default:
Sonia Zorba
committed
throw new InvalidArgumentException("Unsupported pullToVoSpace protocol: " + protocol.getUri());
}
}
Sonia Zorba
committed
throw new InvalidArgumentException("Transfer contains no protocols");
}
Sonia Zorba
committed
private boolean isAsyncRecall(Transfer transfer) {
return transfer.getView() != null
&& Views.ASYNC_RECALL_VIEW_URI.equals(transfer.getView().getUri());
Sonia Zorba
committed
}
private void handleMoveNode(JobSummary jobSummary, Transfer transfer) {
// User data must be extracted before starting the new thread
// to avoid the "No thread-bound request found" exception
User user = (User) servletRequest.getUserPrincipal();
CompletableFuture.runAsync(() -> {
handleJobErrors(jobSummary, job -> {
moveService.processMoveJob(transfer, user);
job.setPhase(ExecutionPhase.COMPLETED);
Sonia Zorba
committed
return null;
private void handleCopyNode(JobSummary jobSummary, Transfer transfer) {
// User data must be extracted before starting the new thread
// to avoid the "No thread-bound request found" exception
User user = (User) servletRequest.getUserPrincipal();
CompletableFuture.runAsync(() -> {
handleJobErrors(jobSummary, job -> {
String jobId = jobSummary.getJobId();
// Index 0: source 1: destination
List<String> sourceAndDestination = copyService.processCopyNodes(transfer, jobId, user);
// Call file service and command copy
try{
fileServiceClient.startFileCopyJob(sourceAndDestination.get(0), sourceAndDestination.get(1), jobId, user);
} catch (Exception e) {
// We decided not to purge metadata in case of failure
// just release busy nodes setting job_id = null
nodeDao.releaseBusyNodesByJobId(jobId);
throw e;
}
Sonia Zorba
committed
private void handleJobErrors(JobSummary job, Function<JobSummary, Transfer> jobConsumer) {
Transfer negotiatedTransfer = null;
Sonia Zorba
committed
negotiatedTransfer = jobConsumer.apply(job);
} catch (VoSpaceErrorSummarizableException e) {
job.setPhase(ExecutionPhase.ERROR);
job.setErrorSummary(ErrorSummaryFactory.newErrorSummary(e));
} catch (Exception e) {
job.setPhase(ExecutionPhase.ERROR);
job.setErrorSummary(ErrorSummaryFactory.newErrorSummary(
new InternalFaultException(e)));
} finally {
Sonia Zorba
committed
jobDAO.updateJob(job, negotiatedTransfer);
private JobDirection getJobDirection(Transfer transfer) {
return JobDirection.getJobDirectionEnumFromTransfer(transfer);
/**
* Synchronous transfer endpoint creates a job that is immediately set to
Nicola Fulvio Calabria
committed
* COMPLETED or to ERROR in case some fault occurred.
*
* In case of ERROR, Protocols are stripped from job representation in
* compliance with specifications
*
public Optional<VoSpaceErrorSummarizableException> createSyncJobResult(JobSummary job) {
Sonia Zorba
committed
Transfer negotiatedTransfer = null;
VoSpaceErrorSummarizableException exception = null;
Nicola Fulvio Calabria
committed
try {
Sonia Zorba
committed
Transfer transfer = uriService.getTransfer(job);
negotiatedTransfer = uriService.getNegotiatedTransfer(job, transfer);
setJobResults(job, transfer);
Nicola Fulvio Calabria
committed
job.setPhase(ExecutionPhase.COMPLETED);
// Need to catch other exceptions too to avoid inconsistent job status
} catch (VoSpaceErrorSummarizableException e) {
job.setPhase(ExecutionPhase.ERROR);
Sonia Zorba
committed
stripProtocols(job, negotiatedTransfer);
job.setErrorSummary(ErrorSummaryFactory.newErrorSummary(e));
exception = e;
} catch (Exception e) {
job.setPhase(ExecutionPhase.ERROR);
Sonia Zorba
committed
stripProtocols(job, negotiatedTransfer);
exception = new InternalFaultException(e);
job.setErrorSummary(ErrorSummaryFactory.newErrorSummary(exception));
Nicola Fulvio Calabria
committed
} finally {
Sonia Zorba
committed
jobDAO.createJob(job, negotiatedTransfer);
}
return Optional.ofNullable(exception);
Sonia Zorba
committed
}
Sonia Zorba
committed
private void stripProtocols(JobSummary job, Transfer negotiatedTransfer) {
uriService.getTransfer(job).getProtocols().clear();
if (negotiatedTransfer != null) {
negotiatedTransfer.getProtocols().clear();
}
}
Sonia Zorba
committed
private void setJobResults(JobSummary jobSummary, Transfer transfer) {
String baseUrl = servletRequest.getRequestURL().substring(0,
servletRequest.getRequestURL().indexOf(servletRequest.getContextPath()));
String href = baseUrl + servletRequest.getContextPath()
+ "/transfers/" + jobSummary.getJobId() + "/results/transferDetails";
ResultReference transferDetailsRef = new ResultReference();
transferDetailsRef.setId("transferDetails");
transferDetailsRef.setHref(href);
jobSummary.getResults().add(transferDetailsRef);
switch (getJobDirection(transfer)) {
case pullFromVoSpace:
case pushToVoSpace:
ResultReference dataNodeRef = new ResultReference();
dataNodeRef.setId("dataNode");
Sonia Zorba
committed
dataNodeRef.setHref(transfer.getTarget());
Sonia Zorba
committed
jobSummary.getResults().add(dataNodeRef);
break;
Nicola Fulvio Calabria
committed
}