Newer
Older
/*
* This file is part of vospace-rest
* Copyright (C) 2021 Istituto Nazionale di Astrofisica
* SPDX-License-Identifier: GPL-3.0-or-later
*/
package it.inaf.oats.vospace;
import it.inaf.oats.vospace.exception.InternalFaultException;
import it.inaf.oats.vospace.persistence.JobDAO;
import net.ivoa.xml.uws.v1.ExecutionPhase;
import net.ivoa.xml.uws.v1.JobSummary;
import net.ivoa.xml.vospace.v2.Protocol;
import net.ivoa.xml.vospace.v2.Transfer;
import it.inaf.oats.vospace.exception.ErrorSummaryFactory;
import it.inaf.oats.vospace.exception.InvalidArgumentException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
Nicola Fulvio Calabria
committed
import it.inaf.oats.vospace.exception.VoSpaceErrorSummarizableException;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import javax.servlet.http.HttpServletRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Service
public class JobService {
private static final Logger LOG = LoggerFactory.getLogger(JobService.class);
@Autowired
private JobDAO jobDAO;
@Autowired
private UriService uriService;
@Autowired
private MoveService moveService;
@Autowired
private CopyService copyService;
@Autowired
private NodeBranchService nodeBranchService;
@Autowired
private HttpServletRequest servletRequest;
public enum JobDirection {
pullToVoSpace,
pullFromVoSpace,
pushFromVoSpace,
copyNode;
public static JobDirection getJobDirectionEnumFromTransfer(Transfer transfer) {
String direction = transfer.getDirection();
switch (direction) {
case "pullToVoSpace":
case "pullFromVoSpace":
case "pushToVoSpace":
case "pushFromVoSpace":
return JobDirection.valueOf(direction);
default:
if (transfer.isKeepBytes()) {
return JobDirection.copyNode;
} else {
return JobDirection.moveNode;
}
}
}
public void setJobPhase(JobSummary job, String phase) {
LOG.trace("Job " + job.getJobId() + " phase set to " + phase);
// TODO: check allowed job phase transitions
switch (phase) {
case "RUN":
startJob(job);
break;
case "ABORT":
throw new UnsupportedOperationException("Not implemented yet");
default:
throw new IllegalArgumentException("Invalid phase parameter: " + phase);
}
}
Transfer transfer = uriService.getTransfer(job);
ExecutionPhase phase;
if (transfer.getProtocols().stream().anyMatch(p -> "ia2:async-recall".equals(p.getUri()))) {
// Async recall from tape jobs are queued. They will be started by VOSpace transfer service
phase = ExecutionPhase.QUEUED;
} else {
phase = ExecutionPhase.EXECUTING;
}
job.setPhase(phase);
jobDAO.updateJob(job);
switch (getJobDirection(transfer)) {
case pullToVoSpace:
handlePullToVoSpace(job, transfer);
break;
case pullFromVoSpace:
case pushToVoSpace:
handleVoSpaceUrlsListResult(job, transfer);
break;
case moveNode:
handleCopyNode(job, transfer);
default:
throw new UnsupportedOperationException("Not implemented yet");
}
// Note: ExecutionPhase can't be set to COMPLETED here because all
// the previous job are asynchronous. Each job has to set its
// completion independently. Only jobs started from the /synctrans
// endpoints are completed immediately (see createSyncJobResult() method)
});
private void handlePullToVoSpace(JobSummary job, Transfer transfer) {
for (Protocol protocol : transfer.getProtocols()) {
switch (protocol.getUri()) {
case "ia2:async-recall":
asyncTransfService.startJob(job);
return;
case "ivo://ivoa.net/vospace/core#httpget":
if (transfer.getTarget().size() != 1) {
throw new InvalidArgumentException("Invalid target size for pullToVoSpace: " + transfer.getTarget().size());
}
String nodeUri = transfer.getTarget().get(0);
String contentUri = protocol.getEndpoint();
uriService.setNodeRemoteLocation(nodeUri, contentUri);
uriService.setTransferJobResult(job, transfer);
// Special case: import of a node from a portal file server
// doesn't imply file transfer, so it can be set to completed
job.setPhase(ExecutionPhase.COMPLETED);
return;
default:
throw new InternalFaultException("Unsupported pullToVoSpace protocol: " + protocol.getUri());
}
}
}
private void handleVoSpaceUrlsListResult(JobSummary job, Transfer transfer) {
uriService.setTransferJobResult(job, transfer);
}
private void handleMoveNode(JobSummary jobSummary, Transfer transfer) {
// User data must be extracted before starting the new thread
// to avoid the "No thread-bound request found" exception
User user = (User) servletRequest.getUserPrincipal();
CompletableFuture.runAsync(() -> {
handleJobErrors(jobSummary, job -> {
moveService.processMoveJob(transfer, user);
job.setPhase(ExecutionPhase.COMPLETED);
});
});
}
private void handleCopyNode(JobSummary jobSummary, Transfer transfer) {
// User data must be extracted before starting the new thread
// to avoid the "No thread-bound request found" exception
User user = (User) servletRequest.getUserPrincipal();
CompletableFuture.runAsync(() -> {
handleJobErrors(jobSummary, job -> {
copyService.processCopyNodes(transfer, jobSummary.getJobId(), user);
// add file service copy logic
// the file service part will unlock nodes and set job phase
// to completed
private void handleJobErrors(JobSummary job, Consumer<JobSummary> jobConsumer) {
try {
jobConsumer.accept(job);
} catch (VoSpaceErrorSummarizableException e) {
job.setPhase(ExecutionPhase.ERROR);
job.setErrorSummary(ErrorSummaryFactory.newErrorSummary(e));
} catch (Exception e) {
job.setPhase(ExecutionPhase.ERROR);
job.setErrorSummary(ErrorSummaryFactory.newErrorSummary(
new InternalFaultException(e)));
} finally {
jobDAO.updateJob(job);
}
private JobDirection getJobDirection(Transfer transfer) {
return JobDirection.getJobDirectionEnumFromTransfer(transfer);
/**
* Synchronous transfer endpoint creates a job that is immediately set to
Nicola Fulvio Calabria
committed
* COMPLETED or to ERROR in case some fault occurred.
*
* In case of ERROR, Protocols are stripped from job representation in
* compliance with specifications
*
*/
public void createSyncJobResult(JobSummary job) {
Nicola Fulvio Calabria
committed
try {
uriService.setSyncTransferEndpoints(job);
job.setPhase(ExecutionPhase.COMPLETED);
// Need to catch other exceptions too to avoid inconsistent job status
} catch (VoSpaceErrorSummarizableException e) {
job.setPhase(ExecutionPhase.ERROR);
uriService.getTransfer(job).getProtocols().clear();
job.setErrorSummary(ErrorSummaryFactory.newErrorSummary(e));
} catch (Exception e) {
job.setPhase(ExecutionPhase.ERROR);
uriService.getTransfer(job).getProtocols().clear();
job.setErrorSummary(ErrorSummaryFactory.newErrorSummary(
new InternalFaultException(e)));
Nicola Fulvio Calabria
committed
} finally {
jobDAO.createJob(job);
}