Commit c040003e authored by Sonia Zorba's avatar Sonia Zorba
Browse files

Updated jobs handling in compliance with specification: UWS result contains...

Updated jobs handling in compliance with specification: UWS result contains transferDetails URL and transfer negotiation is available only on transferDetails endpoint (jobInfo contains original transfer object)
parent ae47c967
Loading
Loading
Loading
Loading
Loading
+47 −17
Original line number Diff line number Diff line
@@ -18,8 +18,9 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import it.inaf.oats.vospace.exception.VoSpaceErrorSummarizableException;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.servlet.http.HttpServletRequest;
import net.ivoa.xml.uws.v1.ResultReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@@ -100,15 +101,18 @@ public class JobService {
            }
            job.setPhase(phase);
            
            jobDAO.updateJob(job);
            jobDAO.updateJob(job, null);

            Transfer negotiatedTransfer = null;

            switch (getJobDirection(transfer)) {
                case pullToVoSpace:
                    handlePullToVoSpace(job, transfer);
                    negotiatedTransfer = handlePullToVoSpace(job, transfer);
                    break;
                case pullFromVoSpace:
                case pushToVoSpace:
                    handleVoSpaceUrlsListResult(job, transfer);
                    negotiatedTransfer = uriService.getNegotiatedTransfer(job, transfer);
                    setJobResults(job, transfer);
                    break;
                case moveNode:
                    handleMoveNode(job, transfer);
@@ -121,16 +125,18 @@ public class JobService {
            // the previous job are asynchronous. Each job has to set its
            // completion independently. Only jobs started from the /synctrans
            // endpoints are completed immediately (see createSyncJobResult() method)
            
            return negotiatedTransfer;
        });
    }

    private void handlePullToVoSpace(JobSummary job, Transfer transfer) {
    private Transfer handlePullToVoSpace(JobSummary job, Transfer transfer) {

        for (Protocol protocol : transfer.getProtocols()) {
            switch (protocol.getUri()) {
                case "ia2:async-recall":
                    asyncTransfService.startJob(job);
                    return;
                    return transfer;
                case "ivo://ivoa.net/vospace/core#httpget":
                    if (transfer.getTarget().size() != 1) {
                        throw new InvalidArgumentException("Invalid target size for pullToVoSpace: " + transfer.getTarget().size());
@@ -138,19 +144,18 @@ public class JobService {
                    String nodeUri = transfer.getTarget().get(0);
                    String contentUri = protocol.getEndpoint();
                    uriService.setNodeRemoteLocation(nodeUri, contentUri);
                    uriService.setTransferJobResult(job, transfer);
                    Transfer negotiatedTransfer = uriService.getNegotiatedTransfer(job, transfer);
                    setJobResults(job, transfer);
                    // Special case: import of a node from a portal file server
                    // doesn't imply file transfer, so it can be set to completed
                    job.setPhase(ExecutionPhase.COMPLETED);
                    return;
                    return negotiatedTransfer;
                default:
                    throw new InternalFaultException("Unsupported pullToVoSpace protocol: " + protocol.getUri());
            }
                    throw new InvalidArgumentException("Unsupported pullToVoSpace protocol: " + protocol.getUri());
            }
        }

    private void handleVoSpaceUrlsListResult(JobSummary job, Transfer transfer) {
        uriService.setTransferJobResult(job, transfer);
        throw new InvalidArgumentException("Transfer contains no protocols");
    }

    private void handleMoveNode(JobSummary jobSummary, Transfer transfer) {
@@ -161,13 +166,15 @@ public class JobService {
            handleJobErrors(jobSummary, job -> {
                moveService.processMoveJob(transfer, user);
                job.setPhase(ExecutionPhase.COMPLETED);
                return null;
            });
        });
    }

    private void handleJobErrors(JobSummary job, Consumer<JobSummary> jobConsumer) {
    private void handleJobErrors(JobSummary job, Function<JobSummary, Transfer> jobConsumer) {
        Transfer negotiatedTransfer = null;
        try {
            jobConsumer.accept(job);
            negotiatedTransfer = jobConsumer.apply(job);
        } catch (VoSpaceErrorSummarizableException e) {
            job.setPhase(ExecutionPhase.ERROR);
            job.setErrorSummary(ErrorSummaryFactory.newErrorSummary(e));
@@ -176,7 +183,7 @@ public class JobService {
            job.setErrorSummary(ErrorSummaryFactory.newErrorSummary(
                    new InternalFaultException(e)));
        } finally {
            jobDAO.updateJob(job);
            jobDAO.updateJob(job, negotiatedTransfer);
        }
    }

@@ -193,8 +200,11 @@ public class JobService {
     *
     */
    public void createSyncJobResult(JobSummary job) {
        Transfer negotiatedTransfer = null;
        try {
            uriService.setSyncTransferEndpoints(job);
            Transfer transfer = uriService.getTransfer(job);
            negotiatedTransfer = uriService.getNegotiatedTransfer(job, transfer);
            setJobResults(job, transfer);
            job.setPhase(ExecutionPhase.COMPLETED);
            // Need to catch other exceptions too to avoid inconsistent job status
        } catch (VoSpaceErrorSummarizableException e) {
@@ -207,7 +217,27 @@ public class JobService {
            job.setErrorSummary(ErrorSummaryFactory.newErrorSummary(
                    new InternalFaultException(e)));
        } finally {
            jobDAO.createJob(job);
            jobDAO.createJob(job, negotiatedTransfer);
        }
    }

    private void setJobResults(JobSummary jobSummary, Transfer transfer) {
        String baseUrl = servletRequest.getRequestURL().substring(0,
                servletRequest.getRequestURL().indexOf(servletRequest.getContextPath()));
        String href = baseUrl + servletRequest.getContextPath()
                + "/transfers/" + jobSummary.getJobId() + "/results/transferDetails";
        ResultReference transferDetailsRef = new ResultReference();
        transferDetailsRef.setId("transferDetails");
        transferDetailsRef.setHref(href);
        jobSummary.getResults().add(transferDetailsRef);
        switch (getJobDirection(transfer)) {
            case pullFromVoSpace:
            case pushToVoSpace:
                ResultReference dataNodeRef = new ResultReference();
                dataNodeRef.setId("dataNode");
                dataNodeRef.setHref(transfer.getTarget().get(0));
                jobSummary.getResults().add(dataNodeRef);
                break;
        }
    }
}
+2 −3
Original line number Diff line number Diff line
@@ -53,7 +53,7 @@ public class TransferController {

        JobSummary jobSummary = newJobSummary(transfer, principal);

        jobDAO.createJob(jobSummary);
        jobDAO.createJob(jobSummary, null);

        if (phase.isPresent()) {
            jobService.setJobPhase(jobSummary, phase.get());
@@ -157,8 +157,7 @@ public class TransferController {
                return ResponseEntity.status(HttpStatus.UNAUTHORIZED).build();
            }
            
            // TODO: check type
            return ResponseEntity.ok((Transfer) (job.getJobInfo().getAny().get(0)));
            return ResponseEntity.ok(jobDAO.getTransferDetails(jobId));

        }).orElse(ResponseEntity.notFound().build());
    }
+22 −25
Original line number Diff line number Diff line
@@ -31,7 +31,6 @@ import java.util.Optional;
import java.util.stream.Collectors;
import javax.servlet.http.HttpServletRequest;
import net.ivoa.xml.uws.v1.JobSummary;
import net.ivoa.xml.uws.v1.ResultReference;
import net.ivoa.xml.vospace.v2.DataNode;
import net.ivoa.xml.vospace.v2.Node;
import net.ivoa.xml.vospace.v2.Protocol;
@@ -67,24 +66,17 @@ public class UriService {
    @Autowired
    private FileServiceClient fileServiceClient;

    public void setTransferJobResult(JobSummary job, Transfer transfer) {

        List<ResultReference> results = new ArrayList<>();

        ResultReference result = new ResultReference();
        result.setHref(getEndpoint(job, transfer));
        results.add(result);

        job.setResults(results);
        // Moved phase setting to caller method for ERROR management
    }

    /**
     * Sets the endpoint value for all valid protocols (protocol negotiation).
     * For a given job, returns a new transfer object containing only valid
     * protocols (protocol negotiation) and sets proper endpoints on them.
     */
    public void setSyncTransferEndpoints(JobSummary job) {
    public Transfer getNegotiatedTransfer(JobSummary job, Transfer transfer) {

        Transfer transfer = getTransfer(job);
        // Original transfer object shouldn't be modified, so a new transfer object is created
        Transfer negotiatedTransfer = new Transfer();
        negotiatedTransfer.setTarget(transfer.getTarget());
        negotiatedTransfer.setDirection(transfer.getDirection());
        // according to examples found in specification view is not copied
        
        if (transfer.getProtocols().isEmpty()) {
            // At least one protocol is expected from client
@@ -97,6 +89,7 @@ public class UriService {
        List<String> validProtocolUris = new ArrayList<>();
        switch (jobDirection) {
            case pullFromVoSpace:
            case pullToVoSpace:
                validProtocolUris.add("ivo://ivoa.net/vospace/core#httpget");
                break;
            case pushToVoSpace:
@@ -109,20 +102,24 @@ public class UriService {

        List<Protocol> validProtocols
                = transfer.getProtocols().stream()
                        // discard invalid protocols
                        .filter(protocol -> validProtocolUris.contains(protocol.getUri()))
                        .collect(Collectors.toList());
                        .map(p -> {
                            // set endpoints
                            Protocol protocol = new Protocol();
                            protocol.setUri(p.getUri());
                            protocol.setEndpoint(getEndpoint(job, transfer));
                            return protocol;
                        }).collect(Collectors.toList());

        if (validProtocols.isEmpty()) {
            Protocol protocol = transfer.getProtocols().get(0);
            throw new ProtocolNotSupportedException(protocol.getUri());
        }

        String endpoint = getEndpoint(job, transfer);
        validProtocols.stream().forEach(p -> p.setEndpoint(endpoint));
        negotiatedTransfer.getProtocols().addAll(validProtocols);
        
        // Returns modified transfer containing only valid protocols
        transfer.getProtocols().clear();
        transfer.getProtocols().addAll(validProtocols);
        return negotiatedTransfer;
    }

    private Node getEndpointNode(String relativePath,
+1 −1
Original line number Diff line number Diff line
@@ -12,6 +12,6 @@ import org.springframework.web.bind.annotation.ResponseStatus;
public class InvalidArgumentException extends VoSpaceErrorSummarizableException {

    public InvalidArgumentException(String message) {
        super("Description: " + message, VOSpaceFaultEnum.NODE_NOT_FOUND);
        super("Description: " + message, VOSpaceFaultEnum.INVALID_ARGUMENT);
    }
}
+29 −7
Original line number Diff line number Diff line
@@ -49,12 +49,12 @@ public class JobDAO {
        jdbcTemplate = new JdbcTemplate(dataSource);
    }

    public void createJob(JobSummary jobSummary) {
    public void createJob(JobSummary jobSummary, Transfer transferDetails) {

        String sql
                = "INSERT INTO job(job_id, owner_id, job_type, phase, job_info,"
                + " error_message, error_type, error_has_detail, error_detail) "
                + "VALUES (?, ?, ?, ?, ?, ? ,? ,? ,?)";
                = "INSERT INTO job(job_id, owner_id, job_type, phase, job_info, transfer_details, "
                + " results, error_message, error_type, error_has_detail, error_detail) "
                + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";

        jdbcTemplate.update(sql, ps -> {
            int i = 0;
@@ -63,6 +63,8 @@ public class JobDAO {
            ps.setObject(++i, getJobDirection(jobSummary), Types.VARCHAR);
            ps.setObject(++i, jobSummary.getPhase().value(), Types.OTHER);
            ps.setObject(++i, toJson(jobSummary.getJobInfo()), Types.OTHER);
            ps.setObject(++i, toJson(transferDetails), Types.OTHER);
            ps.setObject(++i, toJson(jobSummary.getResults()), Types.OTHER);

            ErrorSummary errorSummary = jobSummary.getErrorSummary();
            if (errorSummary != null) {
@@ -263,16 +265,16 @@ public class JobDAO {
        }
    }

    public void updateJob(JobSummary job) {
    public void updateJob(JobSummary job, Transfer transferDetails) {

        String sql = "UPDATE job SET (phase, results";
        String sql = "UPDATE job SET (phase, results, transfer_details ";

        ErrorSummary errorSummary = job.getErrorSummary();
        if (errorSummary != null) {
            sql += ", error_message, error_type, error_has_detail, error_detail";
        }

        sql += ") = (?, ?";
        sql += ") = (?, ?, ?";

        if (errorSummary != null) {
            sql += ", ?, ?, ?, ?";
@@ -284,6 +286,7 @@ public class JobDAO {
            int i = 0;
            ps.setObject(++i, job.getPhase().name(), Types.OTHER);
            ps.setObject(++i, toJson(job.getResults()), Types.OTHER);
            ps.setObject(++i, toJson(transferDetails), Types.OTHER);
            if (errorSummary != null) {
                ps.setString(++i, errorSummary.getMessage());
                ps.setObject(++i, errorSummary.getType().value(), Types.OTHER);
@@ -294,7 +297,26 @@ public class JobDAO {
        });
    }
    
    public Transfer getTransferDetails(String jobId) {

        String sql = "SELECT transfer_details FROM job WHERE job_id = ?";

        String json = jdbcTemplate.queryForObject(sql, String.class, new Object[]{jobId});
        if (json == null) {
            return null;
        }

        try {
            return MAPPER.readValue(json, Transfer.class);
        } catch (JsonProcessingException ex) {
            throw new RuntimeException(ex);
        }
    }

    private String toJson(Object data) {
        if (data == null) {
            return null;
        }
        try {
            return MAPPER.writeValueAsString(data);
        } catch (JsonProcessingException ex) {
Loading