Commit 2e359b92 authored by Nicola Fulvio Calabria's avatar Nicola Fulvio Calabria
Browse files

Merge origin/master into copyNode

Conflicts:
	src/main/java/it/inaf/oats/vospace/JobService.java
	src/main/java/it/inaf/oats/vospace/MoveService.java
parents 9e209dc2 32847848
Loading
Loading
Loading
Loading
+1 −10
Original line number Diff line number Diff line
@@ -7,19 +7,13 @@ package it.inaf.oats.vospace;

import it.inaf.ia2.aa.data.User;
import it.inaf.oats.vospace.datamodel.NodeUtils;
import it.inaf.oats.vospace.exception.InternalFaultException;
import it.inaf.oats.vospace.exception.InvalidArgumentException;
import it.inaf.oats.vospace.exception.NodeBusyException;
import it.inaf.oats.vospace.exception.NodeNotFoundException;
import it.inaf.oats.vospace.exception.PermissionDeniedException;
import it.inaf.oats.vospace.persistence.NodeDAO;
import it.inaf.oats.vospace.persistence.NodeDAO.ShortNodeDescriptor;
import java.util.List;
import java.util.Optional;
import javax.servlet.http.HttpServletRequest;
import net.ivoa.xml.vospace.v2.Transfer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.dao.CannotSerializeTransactionException;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.EnableTransactionManagement;
@@ -34,7 +28,7 @@ public class CopyService extends AbstractNodeService {
    private NodeBranchService nodeBranchService;

    @Transactional(rollbackFor = {Exception.class}, isolation = Isolation.REPEATABLE_READ)
    public List<String> processCopyNodes(Transfer transfer, String jobId) {
    public List<String> processCopyNodes(Transfer transfer, String jobId, User user) {
        if (transfer.getTarget().size() != 1) {
            throw new InvalidArgumentException("Invalid target size for copyNode: " + transfer.getTarget().size());
        }
@@ -45,9 +39,6 @@ public class CopyService extends AbstractNodeService {
        // Get Destination Vos Path (it's in transfer direction)
        String destinationPath = URIUtils.returnVosPathFromNodeURI(transfer.getDirection(), authority);

        // Extract User permissions from servlet request
        User user = (User) servletRequest.getUserPrincipal();

        this.validatePath(sourcePath);
        this.validatePath(destinationPath);

+58 −27
Original line number Diff line number Diff line
@@ -5,6 +5,7 @@
 */
package it.inaf.oats.vospace;

import it.inaf.ia2.aa.data.User;
import it.inaf.oats.vospace.exception.InternalFaultException;
import it.inaf.oats.vospace.persistence.JobDAO;
import net.ivoa.xml.uws.v1.ExecutionPhase;
@@ -16,6 +17,9 @@ import it.inaf.oats.vospace.exception.InvalidArgumentException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import it.inaf.oats.vospace.exception.VoSpaceErrorSummarizableException;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import javax.servlet.http.HttpServletRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@@ -39,6 +43,9 @@ public class JobService {
    @Autowired
    private AsyncTransferService asyncTransfService;

    @Autowired
    private HttpServletRequest servletRequest;

    public enum JobDirection {
        pullToVoSpace,
        pullFromVoSpace,
@@ -62,10 +69,8 @@ public class JobService {
                    } else {
                        return JobDirection.moveNode;
                    }

            }
        }

    }

    public void setJobPhase(JobSummary job, String phase) {
@@ -84,9 +89,9 @@ public class JobService {
        }
    }

    private void startJob(JobSummary job) {
    private void startJob(JobSummary jobSummary) {

        try {
        handleJobErrors(jobSummary, job -> {
            Transfer transfer = uriService.getTransfer(job);

            ExecutionPhase phase;
@@ -109,28 +114,20 @@ public class JobService {
                    handleVoSpaceUrlsListResult(job, transfer);
                    break;
                case moveNode:
                    handleMoveNode(transfer);
                    handleMoveNode(job, transfer);
                    break;
                case copyNode:
                    handleCopyNode(transfer, job.getJobId());
                    handleCopyNode(job, transfer);
                    break;

                default:
                    throw new UnsupportedOperationException("Not implemented yet");
            }

            job.setPhase(ExecutionPhase.COMPLETED);

        } catch (VoSpaceErrorSummarizableException e) {
            job.setPhase(ExecutionPhase.ERROR);
            job.setErrorSummary(ErrorSummaryFactory.newErrorSummary(e));
        } catch (Exception e) {
            job.setPhase(ExecutionPhase.ERROR);
            job.setErrorSummary(ErrorSummaryFactory.newErrorSummary(
                    new InternalFaultException(e)));
        } finally {
            jobDAO.updateJob(job);
        }
            // Note: ExecutionPhase can't be set to COMPLETED here because all
            // the previous job are asynchronous. Each job has to set its
            // completion independently. Only jobs started from the /synctrans
            // endpoints are completed immediately (see createSyncJobResult() method)
        });
    }

    private void handlePullToVoSpace(JobSummary job, Transfer transfer) {
@@ -148,6 +145,9 @@ public class JobService {
                    String contentUri = protocol.getEndpoint();
                    uriService.setNodeRemoteLocation(nodeUri, contentUri);
                    uriService.setTransferJobResult(job, transfer);
                    // Special case: import of a node from a portal file server
                    // doesn't imply file transfer, so it can be set to completed
                    job.setPhase(ExecutionPhase.COMPLETED);
                    return;
                default:
                    throw new InternalFaultException("Unsupported pullToVoSpace protocol: " + protocol.getUri());
@@ -159,15 +159,46 @@ public class JobService {
        uriService.setTransferJobResult(job, transfer);
    }

    private void handleMoveNode(Transfer transfer) {
        moveService.processMoveJob(transfer);
    private void handleMoveNode(JobSummary jobSummary, Transfer transfer) {
        // User data must be extracted before starting the new thread
        // to avoid the "No thread-bound request found" exception
        User user = (User) servletRequest.getUserPrincipal();
        CompletableFuture.runAsync(() -> {
            handleJobErrors(jobSummary, job -> {
                moveService.processMoveJob(transfer, user);
                job.setPhase(ExecutionPhase.COMPLETED);
            });
        });
    }

    private void handleCopyNode(Transfer transfer, String jobId)
    {
        copyService.processCopyNodes(transfer, jobId);
    private void handleCopyNode(JobSummary jobSummary, Transfer transfer) {
        // User data must be extracted before starting the new thread
        // to avoid the "No thread-bound request found" exception
        User user = (User) servletRequest.getUserPrincipal();
        CompletableFuture.runAsync(() -> {
            handleJobErrors(jobSummary, job -> {
                copyService.processCopyNodes(transfer, jobSummary.getJobId(), user);
                // add file service copy logic

                job.setPhase(ExecutionPhase.COMPLETED);
            });
        });
    }

    private void handleJobErrors(JobSummary job, Consumer<JobSummary> jobConsumer) {
        try {
            jobConsumer.accept(job);
        } catch (VoSpaceErrorSummarizableException e) {
            job.setPhase(ExecutionPhase.ERROR);
            job.setErrorSummary(ErrorSummaryFactory.newErrorSummary(e));
        } catch (Exception e) {
            job.setPhase(ExecutionPhase.ERROR);
            job.setErrorSummary(ErrorSummaryFactory.newErrorSummary(
                    new InternalFaultException(e)));
        } finally {
            jobDAO.updateJob(job);
        }
    }

    private JobDirection getJobDirection(Transfer transfer) {
        return JobDirection.getJobDirectionEnumFromTransfer(transfer);
+7 −5
Original line number Diff line number Diff line
@@ -24,8 +24,13 @@ import org.springframework.transaction.annotation.Transactional;
@EnableTransactionManagement
public class MoveService extends AbstractNodeService {
    
    /**
     * Perform modeNode operation. User is passed as parameter because this method
     * is run in a separate thread and original HttpServletRequest is not available
     * anymore ("No thread-bound request found" would happen).
     */
    @Transactional(rollbackFor = { Exception.class }, isolation = Isolation.REPEATABLE_READ)
    public void processMoveJob(Transfer transfer) {
    public void processMoveJob(Transfer transfer, User user) {

        if (transfer.getTarget().size() != 1) {
            throw new InvalidArgumentException("Invalid target size for moveNode: " + transfer.getTarget().size());
@@ -37,9 +42,6 @@ public class MoveService extends AbstractNodeService {
        // Get Destination Vos Path (it's in transfer direction)
        String destinationPath = URIUtils.returnVosPathFromNodeURI(transfer.getDirection(), authority);

        // Extract User permissions from servlet request
        User user = (User) servletRequest.getUserPrincipal();

        // Generic common validation for move process job paths
        this.validatePath(sourcePath);
        this.validatePath(destinationPath);
+17 −6
Original line number Diff line number Diff line
@@ -99,7 +99,7 @@ public class NodeDAO {

        String sql = "SELECT (CASE WHEN c.path = n.path THEN ? ELSE (? || ? || c.name) END) AS vos_path, c.node_id, c.name,\n"
                + "c.type, c.async_trans, c.sticky, c.job_id IS NOT NULL AS busy_state, c.creator_id, c.group_read, c.group_write,\n"
                + "c.is_public, c.content_length, c.created_on, c.last_modified, c.accept_views, c.provide_views\n"
                + "c.is_public, c.content_length, c.created_on, c.last_modified, c.accept_views, c.provide_views, c.quota\n"
                + "FROM node n\n"
                + "JOIN node c ON c.path ~ (n.path::varchar || ? || '*{1}')::lquery OR c.path = n.path\n"
                + "WHERE n.node_id = id_from_vos_path(?)\n"
@@ -229,6 +229,9 @@ public class NodeDAO {
        addProperty(NodeProperties.PUBLIC_READ_URI, String.valueOf(rs.getBoolean("is_public")),
                properties);

        addProperty(NodeProperties.QUOTA_URI, String.valueOf(rs.getString("quota")),
                properties);

        addProperty("urn:async_trans", String.valueOf(rs.getBoolean("async_trans")),
                properties);

@@ -265,7 +268,8 @@ public class NodeDAO {
            String userId, List<String> userGroups) {

        String sql = "SELECT path,\n"
                + "NOT (n.async_trans OR n.sticky OR COALESCE(location_type = 'async', FALSE)) AS is_writable,\n"
                + "NOT (n.async_trans OR COALESCE(location_type = 'async', FALSE)) AS is_writable,\n"
                + "n.sticky AS is_sticky,\n"
                + "((SELECT COUNT(*) FROM (SELECT UNNEST(?) INTERSECT SELECT UNNEST(n.group_write)) AS allowed_groups ) = 0 AND\n"
                + "n.creator_id <> ?) AS is_permission_denied,\n"
                + "n.type = 'container' AS is_container,\n"
@@ -299,8 +303,9 @@ public class NodeDAO {
            Boolean isWritable = rs.getBoolean("is_writable");
            Boolean isBusy = rs.getBoolean("busy_state");
            Boolean isPermissionDenied = rs.getBoolean("is_permission_denied");
            Boolean isSticky = rs.getBoolean("is_sticky");

            ShortNodeDescriptor result = new ShortNodeDescriptor(nodePath, isContainer, isWritable, isBusy, isPermissionDenied);
            ShortNodeDescriptor result = new ShortNodeDescriptor(nodePath, isContainer, isWritable, isBusy, isPermissionDenied, isSticky);

            return Optional.of(result);
        });
@@ -495,7 +500,7 @@ public class NodeDAO {
                + "(node_id, parent_path, parent_relative_path, "
                + "name, os_name, tstamp_wrapper_dir, type, location_id, format, "
                + "async_trans, job_id, creator_id, group_read, "
                + "group_write, is_public, delta, content_type, content_encoding, "
                + "group_write, is_public, quota, content_type, content_encoding, "
                + "content_length, content_md5, created_on, last_modified, "
                + "accept_views, provide_views, protocols, sticky)\n";

@@ -507,7 +512,7 @@ public class NodeDAO {
                + "n.node_id, n.parent_path, n.parent_relative_path, "
                + "n.name, n.os_name, n.tstamp_wrapper_dir, n.type, n.location_id, n.format, "
                + "n.async_trans, n.job_id, n.creator_id, n.group_read, "
                + "n.group_write, n.is_public, n.delta, n.content_type, n.content_encoding, "
                + "n.group_write, n.is_public, n.quota, n.content_type, n.content_encoding, "
                + "n.content_length, n.content_md5, n.created_on, n.last_modified, "
                + "n.accept_views, n.provide_views, n.protocols, n.sticky\n";

@@ -729,13 +734,15 @@ public class NodeDAO {
        private final boolean writable;
        private final boolean busy;
        private final boolean permissionDenied;
        private final boolean sticky;

        public ShortNodeDescriptor(String nodeLtreePath, boolean container, boolean writable, boolean busy, boolean permissionDenied) {
        public ShortNodeDescriptor(String nodeLtreePath, boolean container, boolean writable, boolean busy, boolean permissionDenied, boolean sticky) {
            this.nodeLtreePath = nodeLtreePath;
            this.container = container;
            this.writable = writable;
            this.busy = busy;
            this.permissionDenied = permissionDenied;
            this.sticky = sticky;
        }

        public String getDestinationNodeLtreePath() {
@@ -758,6 +765,10 @@ public class NodeDAO {
            return permissionDenied;
        }
        
        public boolean isSticky() {
            return sticky;
        }

    }

    private class NodePaths {
+83 −0
Original line number Diff line number Diff line
@@ -7,23 +7,32 @@ package it.inaf.oats.vospace;

import it.inaf.oats.vospace.exception.NodeBusyException;
import it.inaf.oats.vospace.persistence.JobDAO;
import java.util.ArrayList;
import java.util.List;
import javax.servlet.http.HttpServletRequest;
import net.ivoa.xml.uws.v1.ExecutionPhase;
import net.ivoa.xml.uws.v1.JobSummary;
import net.ivoa.xml.vospace.v2.Protocol;
import net.ivoa.xml.vospace.v2.Transfer;
import static org.junit.jupiter.api.Assertions.assertEquals;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;

@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
public class JobServiceTest {

    @Mock
@@ -35,6 +44,12 @@ public class JobServiceTest {
    @Mock
    private AsyncTransferService asyncTransfService;

    @Mock
    private HttpServletRequest servletRequest;
    
    @Mock
    private MoveService moveService;

    @InjectMocks
    private JobService jobService;

@@ -98,6 +113,68 @@ public class JobServiceTest {
        verify(jobDAO, times(1)).createJob(argThat(j -> ExecutionPhase.ERROR.equals(j.getPhase())));
    }

    @Test
    public void testStartJobSetQueuedPhaseForAsyncRecall() {

        Transfer asyncRecallTransfer = getTapeTransfer();

        JobSummary job = new JobSummary();
        setJobInfo(job, asyncRecallTransfer);

        when(uriService.getTransfer(any())).thenReturn(asyncRecallTransfer);

        when(asyncTransfService.startJob(any())).thenReturn(job);

        jobService.setJobPhase(job, "RUN");

        // Job will be executed by transfer service
        assertEquals(ExecutionPhase.QUEUED, job.getPhase());
    }

    @Test
    public void testStartJobSetExecutingPhaseForAsyncPullFromVoSpace() {

        Transfer httpTransfer = getHttpTransfer();

        JobSummary job = new JobSummary();
        setJobInfo(job, httpTransfer);

        when(uriService.getTransfer(any())).thenReturn(httpTransfer);

        jobService.setJobPhase(job, "RUN");

        // Completion will be set by file service
        assertEquals(ExecutionPhase.EXECUTING, job.getPhase());
    }

    @Test
    public void testStartJobMoveNode() {
        
        Transfer moveNode = new Transfer();
        moveNode.setDirection("vos://example.com!vospace/myfile");

        JobSummary job = new JobSummary();
        setJobInfo(job, moveNode);

        when(uriService.getTransfer(any())).thenReturn(moveNode);

        List<ExecutionPhase> phases = new ArrayList<>();
        doAnswer(invocation -> {
            JobSummary j = invocation.getArgument(0);
            phases.add(j.getPhase());
            return null;
        }).when(jobDAO).updateJob(any());

        jobService.setJobPhase(job, "RUN");

        verify(moveService, timeout(1000).times(1)).processMoveJob(any(), any());

        verify(jobDAO, times(3)).updateJob(any());
        assertEquals(ExecutionPhase.EXECUTING, phases.get(0));
        assertEquals(ExecutionPhase.EXECUTING, phases.get(1));
        assertEquals(ExecutionPhase.COMPLETED, phases.get(2));
    }

    private Transfer getHttpTransfer() {
        Transfer transfer = new Transfer();
        transfer.setDirection("pullFromVoSpace");
@@ -115,4 +192,10 @@ public class JobServiceTest {
        transfer.getProtocols().add(protocol);
        return transfer;
    }

    private void setJobInfo(JobSummary job, Transfer transfer) {
        JobSummary.JobInfo jobInfo = new JobSummary.JobInfo();
        jobInfo.getAny().add(transfer);
        job.setJobInfo(jobInfo);
    }
}
Loading