Commit b58a6eac authored by Sonia Zorba's avatar Sonia Zorba
Browse files

Set JobPhase 'QUEUED' to async recall jobs startup

parent 042ca6ad
Loading
Loading
Loading
Loading
+18 −4
Original line number Diff line number Diff line
@@ -10,10 +10,14 @@ import it.inaf.oats.vospace.exception.ErrorSummaryFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import it.inaf.oats.vospace.exception.VoSpaceErrorSummarizableException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Service
public class JobService {

    private static final Logger LOG = LoggerFactory.getLogger(JobService.class);

    @Autowired
    private JobDAO jobDAO;

@@ -33,6 +37,8 @@ public class JobService {

    public void setJobPhase(JobSummary job, String phase) {

        LOG.trace("Job " + job.getJobId() + " phase set to " + phase);

        // TODO: check allowed job phase transitions
        switch (phase) {
            case "RUN":
@@ -48,11 +54,19 @@ public class JobService {
    private void startJob(JobSummary job) {

        try {
            job.setPhase(ExecutionPhase.EXECUTING);
            jobDAO.updateJob(job);

            Transfer transfer = uriService.getTransfer(job);

            ExecutionPhase phase;
            if (transfer.getProtocols().stream().anyMatch(p -> "ia2:async-recall".equals(p.getUri()))) {
                // Async recall from tape jobs are queued. They will be started by VOSpace transfer service
                phase = ExecutionPhase.QUEUED;
            } else {
                phase = ExecutionPhase.EXECUTING;
            }
            job.setPhase(phase);

            jobDAO.updateJob(job);

            switch (getJobType(transfer)) {
                case pullToVoSpace:
                    handlePullToVoSpace(job, transfer);
+113 −0
Original line number Diff line number Diff line
package it.inaf.oats.vospace;

import it.inaf.oats.vospace.exception.NodeBusyException;
import it.inaf.oats.vospace.persistence.JobDAO;
import net.ivoa.xml.uws.v1.ExecutionPhase;
import net.ivoa.xml.uws.v1.JobSummary;
import net.ivoa.xml.vospace.v2.Protocol;
import net.ivoa.xml.vospace.v2.Transfer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith(MockitoExtension.class)
public class JobServiceTest {

    @Mock
    private JobDAO jobDAO;

    @Mock
    private UriService uriService;

    @Mock
    private AsyncTransferService asyncTransfService;

    @InjectMocks
    private JobService jobService;

    @Test
    public void testStartJobDefault() {
        when(uriService.getTransfer(any())).thenReturn(getHttpTransfer());

        JobSummary job = new JobSummary();
        jobService.setJobPhase(job, "RUN");

        verify(jobDAO, times(1)).updateJob(argThat(j -> ExecutionPhase.EXECUTING.equals(j.getPhase())));
    }

    @Test
    public void testStartJobTape() {
        when(uriService.getTransfer(any())).thenReturn(getTapeTransfer());

        JobSummary job = new JobSummary();
        jobService.setJobPhase(job, "RUN");

        verify(jobDAO, times(1)).updateJob(argThat(j -> ExecutionPhase.QUEUED.equals(j.getPhase())));
    }

    @Test
    public void testStartJobVoSpaceError() {
        when(uriService.getTransfer(any())).thenReturn(getTapeTransfer());

        when(asyncTransfService.startJob(any())).thenThrow(new NodeBusyException("/foo"));

        JobSummary job = new JobSummary();
        jobService.setJobPhase(job, "RUN");

        verify(jobDAO, times(2)).updateJob(argThat(j -> ExecutionPhase.ERROR.equals(j.getPhase())));
    }

    @Test
    public void testStartJobUnexpectedError() {
        when(uriService.getTransfer(any())).thenReturn(getTapeTransfer());

        when(asyncTransfService.startJob(any())).thenThrow(new NullPointerException());

        JobSummary job = new JobSummary();
        jobService.setJobPhase(job, "RUN");

        verify(jobDAO, times(2)).updateJob(argThat(j -> ExecutionPhase.ERROR.equals(j.getPhase())));
    }

    @Test
    public void testSyncJobResultVoSpaceError() {
        when(uriService.getTransfer(any())).thenReturn(getHttpTransfer());
        doThrow(new NodeBusyException("/foo")).when(uriService).setSyncTransferEndpoints(any());
        jobService.createSyncJobResult(new JobSummary());
        verify(jobDAO, times(1)).createJob(argThat(j -> ExecutionPhase.ERROR.equals(j.getPhase())));
    }

    @Test
    public void testSyncJobResultUnexpectedError() {
        when(uriService.getTransfer(any())).thenReturn(getHttpTransfer());
        doThrow(new NullPointerException()).when(uriService).setSyncTransferEndpoints(any());
        jobService.createSyncJobResult(new JobSummary());
        verify(jobDAO, times(1)).createJob(argThat(j -> ExecutionPhase.ERROR.equals(j.getPhase())));
    }

    private Transfer getHttpTransfer() {
        Transfer transfer = new Transfer();
        transfer.setDirection("pullFromVoSpace");
        Protocol protocol = new Protocol();
        protocol.setUri("ivo://ivoa.net/vospace/core#httpget");
        transfer.getProtocols().add(protocol);
        return transfer;
    }

    private Transfer getTapeTransfer() {
        Transfer transfer = new Transfer();
        transfer.setDirection("pullToVoSpace");
        Protocol protocol = new Protocol();
        protocol.setUri("ia2:async-recall");
        transfer.getProtocols().add(protocol);
        return transfer;
    }
}