Commit 32847848 authored by Sonia Zorba's avatar Sonia Zorba
Browse files

Fixed job phase handling

parent 3025be05
Loading
Loading
Loading
Loading
+46 −23
Original line number Diff line number Diff line
@@ -5,6 +5,7 @@
 */
package it.inaf.oats.vospace;

import it.inaf.ia2.aa.data.User;
import it.inaf.oats.vospace.exception.InternalFaultException;
import it.inaf.oats.vospace.persistence.JobDAO;
import net.ivoa.xml.uws.v1.ExecutionPhase;
@@ -16,6 +17,9 @@ import it.inaf.oats.vospace.exception.InvalidArgumentException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import it.inaf.oats.vospace.exception.VoSpaceErrorSummarizableException;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import javax.servlet.http.HttpServletRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@@ -36,6 +40,9 @@ public class JobService {
    @Autowired
    private AsyncTransferService asyncTransfService;

    @Autowired
    private HttpServletRequest servletRequest;

    public enum JobDirection {
        pullToVoSpace,
        pullFromVoSpace,
@@ -59,10 +66,8 @@ public class JobService {
                    } else {
                        return JobDirection.moveNode;
                    }

            }
        }

    }

    public void setJobPhase(JobSummary job, String phase) {
@@ -81,9 +86,9 @@ public class JobService {
        }
    }

    private void startJob(JobSummary job) {
    private void startJob(JobSummary jobSummary) {

        try {
        handleJobErrors(jobSummary, job -> {
            Transfer transfer = uriService.getTransfer(job);

            ExecutionPhase phase;
@@ -106,24 +111,17 @@ public class JobService {
                    handleVoSpaceUrlsListResult(job, transfer);
                    break;
                case moveNode:
                    handleMoveNode(transfer);
                    handleMoveNode(job, transfer);
                    break;
                default:
                    throw new UnsupportedOperationException("Not implemented yet");
            }

            job.setPhase(ExecutionPhase.COMPLETED);
            
        } catch (VoSpaceErrorSummarizableException e) {
            job.setPhase(ExecutionPhase.ERROR);
            job.setErrorSummary(ErrorSummaryFactory.newErrorSummary(e));            
        } catch (Exception e) {
            job.setPhase(ExecutionPhase.ERROR);
            job.setErrorSummary(ErrorSummaryFactory.newErrorSummary(
                    new InternalFaultException(e)));            
        } finally {
            jobDAO.updateJob(job);
        }
            // Note: ExecutionPhase can't be set to COMPLETED here because all
            // the previous job are asynchronous. Each job has to set its
            // completion independently. Only jobs started from the /synctrans
            // endpoints are completed immediately (see createSyncJobResult() method)
        });
    }

    private void handlePullToVoSpace(JobSummary job, Transfer transfer) {
@@ -141,6 +139,9 @@ public class JobService {
                    String contentUri = protocol.getEndpoint();
                    uriService.setNodeRemoteLocation(nodeUri, contentUri);
                    uriService.setTransferJobResult(job, transfer);
                    // Special case: import of a node from a portal file server
                    // doesn't imply file transfer, so it can be set to completed
                    job.setPhase(ExecutionPhase.COMPLETED);
                    return;
                default:
                    throw new InternalFaultException("Unsupported pullToVoSpace protocol: " + protocol.getUri());
@@ -152,9 +153,31 @@ public class JobService {
        uriService.setTransferJobResult(job, transfer);
    }

    private void handleMoveNode(Transfer transfer)
    {
        moveService.processMoveJob(transfer);
    private void handleMoveNode(JobSummary jobSummary, Transfer transfer) {
        // User data must be extracted before starting the new thread
        // to avoid the "No thread-bound request found" exception
        User user = (User) servletRequest.getUserPrincipal();
        CompletableFuture.runAsync(() -> {
            handleJobErrors(jobSummary, job -> {
                moveService.processMoveJob(transfer, user);
                job.setPhase(ExecutionPhase.COMPLETED);
            });
        });
    }

    private void handleJobErrors(JobSummary job, Consumer<JobSummary> jobConsumer) {
        try {
            jobConsumer.accept(job);
        } catch (VoSpaceErrorSummarizableException e) {
            job.setPhase(ExecutionPhase.ERROR);
            job.setErrorSummary(ErrorSummaryFactory.newErrorSummary(e));
        } catch (Exception e) {
            job.setPhase(ExecutionPhase.ERROR);
            job.setErrorSummary(ErrorSummaryFactory.newErrorSummary(
                    new InternalFaultException(e)));
        } finally {
            jobDAO.updateJob(job);
        }
    }

    private JobDirection getJobDirection(Transfer transfer) {
+6 −8
Original line number Diff line number Diff line
@@ -15,7 +15,6 @@ import it.inaf.oats.vospace.exception.PermissionDeniedException;
import it.inaf.oats.vospace.persistence.NodeDAO;
import it.inaf.oats.vospace.persistence.NodeDAO.ShortNodeDescriptor;
import java.util.Optional;
import javax.servlet.http.HttpServletRequest;
import net.ivoa.xml.vospace.v2.Transfer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
@@ -35,11 +34,13 @@ public class MoveService {
    @Value("${vospace-authority}")
    private String authority;
    
    @Autowired
    private HttpServletRequest servletRequest;
    
    /**
     * Perform modeNode operation. User is passed as parameter because this method
     * is run in a separate thread and original HttpServletRequest is not available
     * anymore ("No thread-bound request found" would happen).
     */
    @Transactional(rollbackFor = { Exception.class }, isolation = Isolation.REPEATABLE_READ)
    public void processMoveJob(Transfer transfer) {
    public void processMoveJob(Transfer transfer, User user) {

        if (transfer.getTarget().size() != 1) {
            throw new InvalidArgumentException("Invalid target size for moveNode: " + transfer.getTarget().size());
@@ -51,9 +52,6 @@ public class MoveService {
        // Get Destination Vos Path (it's in transfer direction)
        String destinationPath = URIUtils.returnVosPathFromNodeURI(transfer.getDirection(), authority);

        // Extract User permissions from servlet request
        User user = (User) servletRequest.getUserPrincipal();

        // Generic common validation for move process job paths
        this.validatePath(sourcePath);
        this.validatePath(destinationPath);
+83 −0
Original line number Diff line number Diff line
@@ -7,23 +7,32 @@ package it.inaf.oats.vospace;

import it.inaf.oats.vospace.exception.NodeBusyException;
import it.inaf.oats.vospace.persistence.JobDAO;
import java.util.ArrayList;
import java.util.List;
import javax.servlet.http.HttpServletRequest;
import net.ivoa.xml.uws.v1.ExecutionPhase;
import net.ivoa.xml.uws.v1.JobSummary;
import net.ivoa.xml.vospace.v2.Protocol;
import net.ivoa.xml.vospace.v2.Transfer;
import static org.junit.jupiter.api.Assertions.assertEquals;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;

@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
public class JobServiceTest {

    @Mock
@@ -35,6 +44,12 @@ public class JobServiceTest {
    @Mock
    private AsyncTransferService asyncTransfService;

    @Mock
    private HttpServletRequest servletRequest;
    
    @Mock
    private MoveService moveService;

    @InjectMocks
    private JobService jobService;

@@ -98,6 +113,68 @@ public class JobServiceTest {
        verify(jobDAO, times(1)).createJob(argThat(j -> ExecutionPhase.ERROR.equals(j.getPhase())));
    }

    @Test
    public void testStartJobSetQueuedPhaseForAsyncRecall() {

        Transfer asyncRecallTransfer = getTapeTransfer();

        JobSummary job = new JobSummary();
        setJobInfo(job, asyncRecallTransfer);

        when(uriService.getTransfer(any())).thenReturn(asyncRecallTransfer);

        when(asyncTransfService.startJob(any())).thenReturn(job);

        jobService.setJobPhase(job, "RUN");

        // Job will be executed by transfer service
        assertEquals(ExecutionPhase.QUEUED, job.getPhase());
    }

    @Test
    public void testStartJobSetExecutingPhaseForAsyncPullFromVoSpace() {

        Transfer httpTransfer = getHttpTransfer();

        JobSummary job = new JobSummary();
        setJobInfo(job, httpTransfer);

        when(uriService.getTransfer(any())).thenReturn(httpTransfer);

        jobService.setJobPhase(job, "RUN");

        // Completion will be set by file service
        assertEquals(ExecutionPhase.EXECUTING, job.getPhase());
    }

    @Test
    public void testStartJobMoveNode() {
        
        Transfer moveNode = new Transfer();
        moveNode.setDirection("vos://example.com!vospace/myfile");

        JobSummary job = new JobSummary();
        setJobInfo(job, moveNode);

        when(uriService.getTransfer(any())).thenReturn(moveNode);

        List<ExecutionPhase> phases = new ArrayList<>();
        doAnswer(invocation -> {
            JobSummary j = invocation.getArgument(0);
            phases.add(j.getPhase());
            return null;
        }).when(jobDAO).updateJob(any());

        jobService.setJobPhase(job, "RUN");

        verify(moveService, timeout(1000).times(1)).processMoveJob(any(), any());

        verify(jobDAO, times(3)).updateJob(any());
        assertEquals(ExecutionPhase.EXECUTING, phases.get(0));
        assertEquals(ExecutionPhase.EXECUTING, phases.get(1));
        assertEquals(ExecutionPhase.COMPLETED, phases.get(2));
    }

    private Transfer getHttpTransfer() {
        Transfer transfer = new Transfer();
        transfer.setDirection("pullFromVoSpace");
@@ -115,4 +192,10 @@ public class JobServiceTest {
        transfer.getProtocols().add(protocol);
        return transfer;
    }

    private void setJobInfo(JobSummary job, Transfer transfer) {
        JobSummary.JobInfo jobInfo = new JobSummary.JobInfo();
        jobInfo.getAny().add(transfer);
        job.setJobInfo(jobInfo);
    }
}
+67 −95
Original line number Diff line number Diff line
@@ -14,7 +14,6 @@ import it.inaf.oats.vospace.persistence.NodeDAO;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import javax.servlet.http.HttpServletRequest;
import net.ivoa.xml.vospace.v2.Transfer;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -29,16 +28,13 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.ContextConfiguration;

@SpringBootTest
@AutoConfigureMockMvc
@ContextConfiguration(classes = {DataSourceConfigSingleton.class, MoveServiceTest.TestConfig.class})
@ContextConfiguration(classes = {DataSourceConfigSingleton.class})
@TestPropertySource(locations = "classpath:test.properties", properties = {"vospace-authority=example.com!vospace", "file-service-url=http://file-service"})
@TestMethodOrder(OrderAnnotation.class)
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
@@ -53,36 +49,17 @@ public class MoveServiceTest {
    @Autowired
    private NodeDAO nodeDao;

    @Autowired
    private HttpServletRequest servletRequest;
       
    @TestConfiguration
    public static class TestConfig {

        /**
         * Necessary because MockBean doesn't work with HttpServletRequest.
         */
        @Bean
        @Primary
        public HttpServletRequest servletRequest() {
            HttpServletRequest request = mock(HttpServletRequest.class);
            User user = new User().setUserId("anonymous");
            when(request.getUserPrincipal()).thenReturn(user);
            return request;
        }
    }
    
    @Test
    @Order(1)
    public void moveRootTest() {

        assertThrows(IllegalArgumentException.class, () -> {
            moveService.processMoveJob(getTransfer("/", "/pippo"));
            moveService.processMoveJob(getTransfer("/", "/pippo"), getAnonymousUser());
        }
        );

        assertThrows(IllegalArgumentException.class, () -> {
            moveService.processMoveJob(getTransfer("/pippo", "/"));
            moveService.processMoveJob(getTransfer("/pippo", "/"), getAnonymousUser());
        }
        );

@@ -93,10 +70,9 @@ public class MoveServiceTest {
    public void testMoveToSubnodeOfItself() {
        User user = mock(User.class);
        when(user.getName()).thenReturn("user3");
        when(servletRequest.getUserPrincipal()).thenReturn(user);

        assertThrows(IllegalArgumentException.class, () -> {
            moveService.processMoveJob(getTransfer("/test3/m1", "/test3/m1/m2"));
            moveService.processMoveJob(getTransfer("/test3/m1", "/test3/m1/m2"), user);
        }
        );
    }
@@ -105,7 +81,7 @@ public class MoveServiceTest {
    @Order(3)
    public void testNonExistingSourceNode() {
        assertThrows(NodeNotFoundException.class, () -> {
            moveService.processMoveJob(getTransfer("/pippo", "/test2"));
            moveService.processMoveJob(getTransfer("/pippo", "/test2"), getAnonymousUser());
        }
        );
    }
@@ -115,10 +91,9 @@ public class MoveServiceTest {
    public void testMoveDeniedOnBusySource() {
        User user = mock(User.class);
        when(user.getName()).thenReturn("user3");
        when(servletRequest.getUserPrincipal()).thenReturn(user);

        assertThrows(NodeBusyException.class, () -> {
            moveService.processMoveJob(getTransfer("/test3/mbusy", "/test3/m1"));
            moveService.processMoveJob(getTransfer("/test3/mbusy", "/test3/m1"), user);
        }
        );
    }
@@ -128,10 +103,9 @@ public class MoveServiceTest {
    public void testPermissionDeniedOnSource() {
        User user = mock(User.class);
        when(user.getName()).thenReturn("user1");
        when(servletRequest.getUserPrincipal()).thenReturn(user);

        assertThrows(PermissionDeniedException.class, () -> {
            moveService.processMoveJob(getTransfer("/test3/m1", "/test4"));
            moveService.processMoveJob(getTransfer("/test3/m1", "/test4"), user);
        }
        );
    }
@@ -141,10 +115,9 @@ public class MoveServiceTest {
    public void testDontMoveIfStickySource() {
        User user = mock(User.class);
        when(user.getName()).thenReturn("user3");
        when(servletRequest.getUserPrincipal()).thenReturn(user);

        assertThrows(PermissionDeniedException.class, () -> {
            moveService.processMoveJob(getTransfer("/test3/mstick", "/test4"));
            moveService.processMoveJob(getTransfer("/test3/mstick", "/test4"), user);
        }
        );
    }
@@ -155,10 +128,9 @@ public class MoveServiceTest {
        User user = mock(User.class);
        when(user.getName()).thenReturn("user1");
        when(user.getGroups()).thenReturn(List.of("group1"));
        when(servletRequest.getUserPrincipal()).thenReturn(user);

        assertThrows(PermissionDeniedException.class, () -> {
            moveService.processMoveJob(getTransfer("/test3/group1", "/test3/m1/m2"));
            moveService.processMoveJob(getTransfer("/test3/group1", "/test3/m1/m2"), user);
        }
        );
    }
@@ -168,10 +140,9 @@ public class MoveServiceTest {
    public void testDestinationExistsAndIsBusy() {
        User user = mock(User.class);
        when(user.getName()).thenReturn("user3");
        when(servletRequest.getUserPrincipal()).thenReturn(user);        

        assertThrows(NodeBusyException.class, () -> {
            moveService.processMoveJob(getTransfer("/test3/m1", "/test3/mbusy"));
            moveService.processMoveJob(getTransfer("/test3/m1", "/test3/mbusy"), user);
        }
        );
    }
@@ -181,14 +152,13 @@ public class MoveServiceTest {
    public void testRenameNode() {
        User user = mock(User.class);
        when(user.getName()).thenReturn("user3");
        when(servletRequest.getUserPrincipal()).thenReturn(user);

        Optional<Long> sourceId = nodeDao.getNodeId("/test3/m1");
        assertTrue(sourceId.isPresent());
        Optional<Long> childId = nodeDao.getNodeId("/test3/m1/m2");
        assertTrue(childId.isPresent());
        // Rename
        moveService.processMoveJob(getTransfer("/test3/m1", "/test3/m1ren"));
        moveService.processMoveJob(getTransfer("/test3/m1", "/test3/m1ren"), user);

        Optional<Long> checkSourceId = nodeDao.getNodeId("/test3/m1");
        assertTrue(checkSourceId.isEmpty());
@@ -208,7 +178,6 @@ public class MoveServiceTest {
    public void testMoveToExistingDestination() {
        User user = mock(User.class);
        when(user.getName()).thenReturn("user3");
        when(servletRequest.getUserPrincipal()).thenReturn(user);

        // Preliminary checks for assumptions
        Optional<Long> sourceId = nodeDao.getNodeId("/test3/m1");
@@ -223,7 +192,7 @@ public class MoveServiceTest {
        assertTrue(destId.isPresent());

        // move
        moveService.processMoveJob(getTransfer("/test3/m1", "/test4"));
        moveService.processMoveJob(getTransfer("/test3/m1", "/test4"), user);

        // source has been moved
        Optional<Long> oldSourceId = nodeDao.getNodeId("/test3/m1");
@@ -248,4 +217,7 @@ public class MoveServiceTest {
        return transfer;
    }

    private User getAnonymousUser() {
        return new User().setUserId("anonymous");
    }
}
+13 −0
Original line number Diff line number Diff line
@@ -141,6 +141,8 @@ public class TransferControllerTest {
        testPullToVoSpace("/mynode", getResourceFileContent("pullToVoSpace-tape.xml"));

        verify(asyncTransfService, times(1)).startJob(any());
        
        verify(jobDao, times(2)).updateJob(argThat(j -> ExecutionPhase.QUEUED == j.getPhase()));
    }

    @Test
@@ -159,6 +161,17 @@ public class TransferControllerTest {
        }));
    }

    @Test
    public void testPushToVoSpace() throws Exception {

        when(nodeDao.getNodeOsName(eq("/uploadedfile"))).thenReturn("file.fits");

        testPullToVoSpace("/uploadedfile", getResourceFileContent("pushToVoSpace.xml"));
        
        // job completion will be set by file service
        verify(jobDao, times(2)).updateJob(argThat(j -> ExecutionPhase.EXECUTING == j.getPhase()));
    }

    private void testPullToVoSpace(String path, String requestBody) throws Exception {

        Node node = mockPublicDataNode();
Loading