Newer
Older
/*
* This file is part of vospace-rest
* Copyright (C) 2021 Istituto Nazionale di Astrofisica
* SPDX-License-Identifier: GPL-3.0-or-later
*/
package it.inaf.oats.vospace;
import it.inaf.oats.vospace.exception.InternalFaultException;
import it.inaf.oats.vospace.persistence.JobDAO;
import net.ivoa.xml.uws.v1.ExecutionPhase;
import net.ivoa.xml.uws.v1.JobSummary;
import net.ivoa.xml.vospace.v2.Protocol;
import net.ivoa.xml.vospace.v2.Transfer;
import it.inaf.oats.vospace.exception.ErrorSummaryFactory;
import it.inaf.oats.vospace.exception.InvalidArgumentException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
Nicola Fulvio Calabria
committed
import it.inaf.oats.vospace.exception.VoSpaceErrorSummarizableException;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import javax.servlet.http.HttpServletRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Service
public class JobService {
private static final Logger LOG = LoggerFactory.getLogger(JobService.class);
@Autowired
private JobDAO jobDAO;
@Autowired
private UriService uriService;
@Autowired
private MoveService moveService;
@Autowired
private HttpServletRequest servletRequest;
public enum JobDirection {
pullToVoSpace,
pullFromVoSpace,
pushFromVoSpace,
copyNode;
public static JobDirection getJobDirectionEnumFromTransfer(Transfer transfer) {
String direction = transfer.getDirection();
switch (direction) {
case "pullToVoSpace":
case "pullFromVoSpace":
case "pushToVoSpace":
case "pushFromVoSpace":
return JobDirection.valueOf(direction);
default:
if (transfer.isKeepBytes()) {
return JobDirection.copyNode;
} else {
return JobDirection.moveNode;
}
}
}
public void setJobPhase(JobSummary job, String phase) {
LOG.trace("Job " + job.getJobId() + " phase set to " + phase);
// TODO: check allowed job phase transitions
switch (phase) {
case "RUN":
startJob(job);
break;
case "ABORT":
throw new UnsupportedOperationException("Not implemented yet");
default:
throw new IllegalArgumentException("Invalid phase parameter: " + phase);
}
}
Transfer transfer = uriService.getTransfer(job);
ExecutionPhase phase;
if (transfer.getProtocols().stream().anyMatch(p -> "ia2:async-recall".equals(p.getUri()))) {
// Async recall from tape jobs are queued. They will be started by VOSpace transfer service
phase = ExecutionPhase.QUEUED;
} else {
phase = ExecutionPhase.EXECUTING;
}
job.setPhase(phase);
jobDAO.updateJob(job);
switch (getJobDirection(transfer)) {
case pullToVoSpace:
handlePullToVoSpace(job, transfer);
break;
case pullFromVoSpace:
case pushToVoSpace:
handleVoSpaceUrlsListResult(job, transfer);
break;
case moveNode:
default:
throw new UnsupportedOperationException("Not implemented yet");
}
// Note: ExecutionPhase can't be set to COMPLETED here because all
// the previous job are asynchronous. Each job has to set its
// completion independently. Only jobs started from the /synctrans
// endpoints are completed immediately (see createSyncJobResult() method)
});
private void handlePullToVoSpace(JobSummary job, Transfer transfer) {
for (Protocol protocol : transfer.getProtocols()) {
switch (protocol.getUri()) {
case "ia2:async-recall":
asyncTransfService.startJob(job);
return;
case "ivo://ivoa.net/vospace/core#httpget":
if (transfer.getTarget().size() != 1) {
throw new InvalidArgumentException("Invalid target size for pullToVoSpace: " + transfer.getTarget().size());
}
String nodeUri = transfer.getTarget().get(0);
String contentUri = protocol.getEndpoint();
uriService.setNodeRemoteLocation(nodeUri, contentUri);
uriService.setTransferJobResult(job, transfer);
// Special case: import of a node from a portal file server
// doesn't imply file transfer, so it can be set to completed
job.setPhase(ExecutionPhase.COMPLETED);
return;
default:
throw new InternalFaultException("Unsupported pullToVoSpace protocol: " + protocol.getUri());
}
}
}
private void handleVoSpaceUrlsListResult(JobSummary job, Transfer transfer) {
uriService.setTransferJobResult(job, transfer);
}
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
private void handleMoveNode(JobSummary jobSummary, Transfer transfer) {
// User data must be extracted before starting the new thread
// to avoid the "No thread-bound request found" exception
User user = (User) servletRequest.getUserPrincipal();
CompletableFuture.runAsync(() -> {
handleJobErrors(jobSummary, job -> {
moveService.processMoveJob(transfer, user);
job.setPhase(ExecutionPhase.COMPLETED);
});
});
}
private void handleJobErrors(JobSummary job, Consumer<JobSummary> jobConsumer) {
try {
jobConsumer.accept(job);
} catch (VoSpaceErrorSummarizableException e) {
job.setPhase(ExecutionPhase.ERROR);
job.setErrorSummary(ErrorSummaryFactory.newErrorSummary(e));
} catch (Exception e) {
job.setPhase(ExecutionPhase.ERROR);
job.setErrorSummary(ErrorSummaryFactory.newErrorSummary(
new InternalFaultException(e)));
} finally {
jobDAO.updateJob(job);
}
private JobDirection getJobDirection(Transfer transfer) {
return JobDirection.getJobDirectionEnumFromTransfer(transfer);
/**
* Synchronous transfer endpoint creates a job that is immediately set to
Nicola Fulvio Calabria
committed
* COMPLETED or to ERROR in case some fault occurred.
*
* In case of ERROR, Protocols are stripped from job representation in
* compliance with specifications
*
*/
public void createSyncJobResult(JobSummary job) {
Nicola Fulvio Calabria
committed
try {
uriService.setSyncTransferEndpoints(job);
job.setPhase(ExecutionPhase.COMPLETED);
// Need to catch other exceptions too to avoid inconsistent job status
} catch (VoSpaceErrorSummarizableException e) {
job.setPhase(ExecutionPhase.ERROR);
uriService.getTransfer(job).getProtocols().clear();
job.setErrorSummary(ErrorSummaryFactory.newErrorSummary(e));
} catch (Exception e) {
job.setPhase(ExecutionPhase.ERROR);
uriService.getTransfer(job).getProtocols().clear();
job.setErrorSummary(ErrorSummaryFactory.newErrorSummary(
new InternalFaultException(e)));
Nicola Fulvio Calabria
committed
} finally {
jobDAO.createJob(job);
}