Loading src/main/java/it/inaf/oats/vospace/JobService.java +11 −4 Original line number Original line Diff line number Diff line Loading @@ -93,7 +93,7 @@ public class JobService { Transfer transfer = uriService.getTransfer(job); Transfer transfer = uriService.getTransfer(job); ExecutionPhase phase; ExecutionPhase phase; if (transfer.getProtocols().stream().anyMatch(p -> "ia2:async-recall".equals(p.getUri()))) { if (isAsyncRecall(transfer)) { // Async recall from tape jobs are queued. They will be started by VOSpace transfer service // Async recall from tape jobs are queued. They will be started by VOSpace transfer service phase = ExecutionPhase.QUEUED; phase = ExecutionPhase.QUEUED; } else { } else { Loading Loading @@ -132,11 +132,13 @@ public class JobService { private Transfer handlePullToVoSpace(JobSummary job, Transfer transfer) { private Transfer handlePullToVoSpace(JobSummary job, Transfer transfer) { for (Protocol protocol : transfer.getProtocols()) { if (isAsyncRecall(transfer)) { switch (protocol.getUri()) { case "ia2:async-recall": asyncTransfService.startJob(job); asyncTransfService.startJob(job); return transfer; return transfer; } for (Protocol protocol : transfer.getProtocols()) { switch (protocol.getUri()) { case "ivo://ivoa.net/vospace/core#httpget": case "ivo://ivoa.net/vospace/core#httpget": if (transfer.getTarget().size() != 1) { if (transfer.getTarget().size() != 1) { throw new InvalidArgumentException("Invalid target size for pullToVoSpace: " + transfer.getTarget().size()); throw new InvalidArgumentException("Invalid target size for pullToVoSpace: " + transfer.getTarget().size()); Loading @@ -158,6 +160,11 @@ public class JobService { throw new InvalidArgumentException("Transfer contains no protocols"); throw new InvalidArgumentException("Transfer contains no protocols"); } } private boolean isAsyncRecall(Transfer transfer) { return transfer.getView() != null && "ivo://ia2.inaf.it/vospace/views#async-recall".equals(transfer.getView().getUri()); } private void handleMoveNode(JobSummary jobSummary, Transfer transfer) { private void handleMoveNode(JobSummary jobSummary, Transfer transfer) { // User data must be extracted before starting the new thread // User data must be extracted before starting the new thread // to avoid the "No thread-bound request found" exception // to avoid the "No thread-bound request found" exception Loading src/test/java/it/inaf/oats/vospace/AsyncTransferServiceTest.java +22 −7 Original line number Original line Diff line number Diff line Loading @@ -5,12 +5,14 @@ */ */ package it.inaf.oats.vospace; package it.inaf.oats.vospace; import it.inaf.oats.vospace.datamodel.Views; import java.util.ArrayList; import java.util.ArrayList; import java.util.Arrays; import java.util.Arrays; import java.util.List; import java.util.List; import net.ivoa.xml.uws.v1.JobSummary; import net.ivoa.xml.uws.v1.JobSummary; import net.ivoa.xml.vospace.v2.Protocol; import net.ivoa.xml.vospace.v2.Param; import net.ivoa.xml.vospace.v2.Transfer; import net.ivoa.xml.vospace.v2.Transfer; import net.ivoa.xml.vospace.v2.View; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.fail; import static org.junit.jupiter.api.Assertions.fail; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test; Loading @@ -26,7 +28,7 @@ import redis.clients.jedis.Jedis; @ExtendWith(MockitoExtension.class) @ExtendWith(MockitoExtension.class) public class AsyncTransferServiceTest { public class AsyncTransferServiceTest { private static final String JSON_JOB = "{\"jobId\":\"job_id\",\"runId\":null,\"ownerId\":null,\"phase\":null,\"quote\":null,\"creationTime\":null,\"startTime\":null,\"endTime\":null,\"executionDuration\":0,\"destruction\":null,\"parameters\":null,\"results\":[],\"errorSummary\":null,\"jobInfo\":{\"transfer\":{\"target\":[\"vos://example.com!vospace/my-node\"],\"direction\":\"pullToVoSpace\",\"view\":null,\"protocols\":[],\"keepBytes\":null,\"version\":null,\"param\":[]}},\"version\":null}\n"; private static final String JSON_JOB = "{\"jobId\":\"job_id\",\"runId\":null,\"ownerId\":null,\"phase\":null,\"quote\":null,\"creationTime\":null,\"startTime\":null,\"endTime\":null,\"executionDuration\":0,\"destruction\":null,\"parameters\":null,\"results\":[],\"errorSummary\":null,\"jobInfo\":{\"transfer\":{\"target\":[\"vos://example.com!vospace/my-node\"],\"direction\":\"pullToVoSpace\",\"view\":{\"param\":[{\"value\":\"file1.txt\",\"uri\":\"ivo://ia2.inaf.it/vospace/views#async-recall/include\"},{\"value\":\"file2.txt\",\"uri\":\"ivo://ia2.inaf.it/vospace/views#async-recall/include\"}],\"uri\":\"ivo://ia2.inaf.it/vospace/views#async-recall\",\"original\":true},\"protocols\":[],\"keepBytes\":false,\"version\":null,\"param\":[]}},\"version\":null}"; @Test @Test public void testRedisRpc() { public void testRedisRpc() { Loading Loading @@ -61,11 +63,24 @@ public class AsyncTransferServiceTest { private JobSummary getFakeJob() { private JobSummary getFakeJob() { Transfer transfer = new Transfer(); Transfer transfer = new Transfer(); transfer.setDirection("pullToVoSpace"); transfer.setDirection("pullToVoSpace"); Protocol protocol = new Protocol(); protocol.setUri("ia2:async-recall"); transfer.getProtocols().add(protocol); transfer.setTarget(Arrays.asList("vos://example.com!vospace/my-node")); transfer.setTarget(Arrays.asList("vos://example.com!vospace/my-node")); View view = new View(); view.setUri(Views.ASYNC_RECALL_VIEW_URI); Param p1 = new Param(); p1.setUri(Views.ASYNC_RECALL_VIEW_URI + "/include"); p1.setValue("file1.txt"); view.getParam().add(p1); Param p2 = new Param(); p2.setUri(Views.ASYNC_RECALL_VIEW_URI + "/include"); p2.setValue("file2.txt"); view.getParam().add(p2); transfer.setView(view); JobSummary job = new JobSummary(); JobSummary job = new JobSummary(); job.setJobId("job_id"); job.setJobId("job_id"); Loading src/test/java/it/inaf/oats/vospace/JobServiceTest.java +5 −3 Original line number Original line Diff line number Diff line Loading @@ -5,6 +5,7 @@ */ */ package it.inaf.oats.vospace; package it.inaf.oats.vospace; import it.inaf.oats.vospace.datamodel.Views; import it.inaf.oats.vospace.exception.NodeBusyException; import it.inaf.oats.vospace.exception.NodeBusyException; import it.inaf.oats.vospace.persistence.JobDAO; import it.inaf.oats.vospace.persistence.JobDAO; import java.util.ArrayList; import java.util.ArrayList; Loading @@ -15,6 +16,7 @@ import net.ivoa.xml.uws.v1.ExecutionPhase; import net.ivoa.xml.uws.v1.JobSummary; import net.ivoa.xml.uws.v1.JobSummary; import net.ivoa.xml.vospace.v2.Protocol; import net.ivoa.xml.vospace.v2.Protocol; import net.ivoa.xml.vospace.v2.Transfer; import net.ivoa.xml.vospace.v2.Transfer; import net.ivoa.xml.vospace.v2.View; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue; Loading Loading @@ -229,9 +231,9 @@ public class JobServiceTest { private Transfer getTapeTransfer() { private Transfer getTapeTransfer() { Transfer transfer = new Transfer(); Transfer transfer = new Transfer(); transfer.setDirection("pullToVoSpace"); transfer.setDirection("pullToVoSpace"); Protocol protocol = new Protocol(); View view = new View(); protocol.setUri("ia2:async-recall"); view.setUri(Views.ASYNC_RECALL_VIEW_URI); transfer.getProtocols().add(protocol); transfer.setView(view); return transfer; return transfer; } } Loading src/test/resources/pullToVoSpace-tape.xml +5 −1 Original line number Original line Diff line number Diff line <vos:transfer xmlns:vos="http://www.ivoa.net/xml/VOSpace/v2.0" version="2.1"> <vos:transfer xmlns:vos="http://www.ivoa.net/xml/VOSpace/v2.0" version="2.1"> <vos:target>vos://example.com!vospace/mynode</vos:target> <vos:target>vos://example.com!vospace/mynode</vos:target> <vos:direction>pullToVoSpace</vos:direction> <vos:direction>pullToVoSpace</vos:direction> <vos:protocol uri="ia2:async-recall" /> <vos:view uri="ivo://ia2.inaf.it/vospace/views#async-recall"> <vos:param uri="ivo://ia2.inaf.it/vospace/views#async-recall/include">test1.txt</vos:param> <vos:param uri="ivo://ia2.inaf.it/vospace/views#async-recall/include">test2.txt</vos:param> <vos:param uri="ivo://ia2.inaf.it/vospace/views#async-recall/include">test3.txt</vos:param> </vos:view> </vos:transfer> </vos:transfer> No newline at end of file Loading
src/main/java/it/inaf/oats/vospace/JobService.java +11 −4 Original line number Original line Diff line number Diff line Loading @@ -93,7 +93,7 @@ public class JobService { Transfer transfer = uriService.getTransfer(job); Transfer transfer = uriService.getTransfer(job); ExecutionPhase phase; ExecutionPhase phase; if (transfer.getProtocols().stream().anyMatch(p -> "ia2:async-recall".equals(p.getUri()))) { if (isAsyncRecall(transfer)) { // Async recall from tape jobs are queued. They will be started by VOSpace transfer service // Async recall from tape jobs are queued. They will be started by VOSpace transfer service phase = ExecutionPhase.QUEUED; phase = ExecutionPhase.QUEUED; } else { } else { Loading Loading @@ -132,11 +132,13 @@ public class JobService { private Transfer handlePullToVoSpace(JobSummary job, Transfer transfer) { private Transfer handlePullToVoSpace(JobSummary job, Transfer transfer) { for (Protocol protocol : transfer.getProtocols()) { if (isAsyncRecall(transfer)) { switch (protocol.getUri()) { case "ia2:async-recall": asyncTransfService.startJob(job); asyncTransfService.startJob(job); return transfer; return transfer; } for (Protocol protocol : transfer.getProtocols()) { switch (protocol.getUri()) { case "ivo://ivoa.net/vospace/core#httpget": case "ivo://ivoa.net/vospace/core#httpget": if (transfer.getTarget().size() != 1) { if (transfer.getTarget().size() != 1) { throw new InvalidArgumentException("Invalid target size for pullToVoSpace: " + transfer.getTarget().size()); throw new InvalidArgumentException("Invalid target size for pullToVoSpace: " + transfer.getTarget().size()); Loading @@ -158,6 +160,11 @@ public class JobService { throw new InvalidArgumentException("Transfer contains no protocols"); throw new InvalidArgumentException("Transfer contains no protocols"); } } private boolean isAsyncRecall(Transfer transfer) { return transfer.getView() != null && "ivo://ia2.inaf.it/vospace/views#async-recall".equals(transfer.getView().getUri()); } private void handleMoveNode(JobSummary jobSummary, Transfer transfer) { private void handleMoveNode(JobSummary jobSummary, Transfer transfer) { // User data must be extracted before starting the new thread // User data must be extracted before starting the new thread // to avoid the "No thread-bound request found" exception // to avoid the "No thread-bound request found" exception Loading
src/test/java/it/inaf/oats/vospace/AsyncTransferServiceTest.java +22 −7 Original line number Original line Diff line number Diff line Loading @@ -5,12 +5,14 @@ */ */ package it.inaf.oats.vospace; package it.inaf.oats.vospace; import it.inaf.oats.vospace.datamodel.Views; import java.util.ArrayList; import java.util.ArrayList; import java.util.Arrays; import java.util.Arrays; import java.util.List; import java.util.List; import net.ivoa.xml.uws.v1.JobSummary; import net.ivoa.xml.uws.v1.JobSummary; import net.ivoa.xml.vospace.v2.Protocol; import net.ivoa.xml.vospace.v2.Param; import net.ivoa.xml.vospace.v2.Transfer; import net.ivoa.xml.vospace.v2.Transfer; import net.ivoa.xml.vospace.v2.View; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.fail; import static org.junit.jupiter.api.Assertions.fail; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test; Loading @@ -26,7 +28,7 @@ import redis.clients.jedis.Jedis; @ExtendWith(MockitoExtension.class) @ExtendWith(MockitoExtension.class) public class AsyncTransferServiceTest { public class AsyncTransferServiceTest { private static final String JSON_JOB = "{\"jobId\":\"job_id\",\"runId\":null,\"ownerId\":null,\"phase\":null,\"quote\":null,\"creationTime\":null,\"startTime\":null,\"endTime\":null,\"executionDuration\":0,\"destruction\":null,\"parameters\":null,\"results\":[],\"errorSummary\":null,\"jobInfo\":{\"transfer\":{\"target\":[\"vos://example.com!vospace/my-node\"],\"direction\":\"pullToVoSpace\",\"view\":null,\"protocols\":[],\"keepBytes\":null,\"version\":null,\"param\":[]}},\"version\":null}\n"; private static final String JSON_JOB = "{\"jobId\":\"job_id\",\"runId\":null,\"ownerId\":null,\"phase\":null,\"quote\":null,\"creationTime\":null,\"startTime\":null,\"endTime\":null,\"executionDuration\":0,\"destruction\":null,\"parameters\":null,\"results\":[],\"errorSummary\":null,\"jobInfo\":{\"transfer\":{\"target\":[\"vos://example.com!vospace/my-node\"],\"direction\":\"pullToVoSpace\",\"view\":{\"param\":[{\"value\":\"file1.txt\",\"uri\":\"ivo://ia2.inaf.it/vospace/views#async-recall/include\"},{\"value\":\"file2.txt\",\"uri\":\"ivo://ia2.inaf.it/vospace/views#async-recall/include\"}],\"uri\":\"ivo://ia2.inaf.it/vospace/views#async-recall\",\"original\":true},\"protocols\":[],\"keepBytes\":false,\"version\":null,\"param\":[]}},\"version\":null}"; @Test @Test public void testRedisRpc() { public void testRedisRpc() { Loading Loading @@ -61,11 +63,24 @@ public class AsyncTransferServiceTest { private JobSummary getFakeJob() { private JobSummary getFakeJob() { Transfer transfer = new Transfer(); Transfer transfer = new Transfer(); transfer.setDirection("pullToVoSpace"); transfer.setDirection("pullToVoSpace"); Protocol protocol = new Protocol(); protocol.setUri("ia2:async-recall"); transfer.getProtocols().add(protocol); transfer.setTarget(Arrays.asList("vos://example.com!vospace/my-node")); transfer.setTarget(Arrays.asList("vos://example.com!vospace/my-node")); View view = new View(); view.setUri(Views.ASYNC_RECALL_VIEW_URI); Param p1 = new Param(); p1.setUri(Views.ASYNC_RECALL_VIEW_URI + "/include"); p1.setValue("file1.txt"); view.getParam().add(p1); Param p2 = new Param(); p2.setUri(Views.ASYNC_RECALL_VIEW_URI + "/include"); p2.setValue("file2.txt"); view.getParam().add(p2); transfer.setView(view); JobSummary job = new JobSummary(); JobSummary job = new JobSummary(); job.setJobId("job_id"); job.setJobId("job_id"); Loading
src/test/java/it/inaf/oats/vospace/JobServiceTest.java +5 −3 Original line number Original line Diff line number Diff line Loading @@ -5,6 +5,7 @@ */ */ package it.inaf.oats.vospace; package it.inaf.oats.vospace; import it.inaf.oats.vospace.datamodel.Views; import it.inaf.oats.vospace.exception.NodeBusyException; import it.inaf.oats.vospace.exception.NodeBusyException; import it.inaf.oats.vospace.persistence.JobDAO; import it.inaf.oats.vospace.persistence.JobDAO; import java.util.ArrayList; import java.util.ArrayList; Loading @@ -15,6 +16,7 @@ import net.ivoa.xml.uws.v1.ExecutionPhase; import net.ivoa.xml.uws.v1.JobSummary; import net.ivoa.xml.uws.v1.JobSummary; import net.ivoa.xml.vospace.v2.Protocol; import net.ivoa.xml.vospace.v2.Protocol; import net.ivoa.xml.vospace.v2.Transfer; import net.ivoa.xml.vospace.v2.Transfer; import net.ivoa.xml.vospace.v2.View; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue; Loading Loading @@ -229,9 +231,9 @@ public class JobServiceTest { private Transfer getTapeTransfer() { private Transfer getTapeTransfer() { Transfer transfer = new Transfer(); Transfer transfer = new Transfer(); transfer.setDirection("pullToVoSpace"); transfer.setDirection("pullToVoSpace"); Protocol protocol = new Protocol(); View view = new View(); protocol.setUri("ia2:async-recall"); view.setUri(Views.ASYNC_RECALL_VIEW_URI); transfer.getProtocols().add(protocol); transfer.setView(view); return transfer; return transfer; } } Loading
src/test/resources/pullToVoSpace-tape.xml +5 −1 Original line number Original line Diff line number Diff line <vos:transfer xmlns:vos="http://www.ivoa.net/xml/VOSpace/v2.0" version="2.1"> <vos:transfer xmlns:vos="http://www.ivoa.net/xml/VOSpace/v2.0" version="2.1"> <vos:target>vos://example.com!vospace/mynode</vos:target> <vos:target>vos://example.com!vospace/mynode</vos:target> <vos:direction>pullToVoSpace</vos:direction> <vos:direction>pullToVoSpace</vos:direction> <vos:protocol uri="ia2:async-recall" /> <vos:view uri="ivo://ia2.inaf.it/vospace/views#async-recall"> <vos:param uri="ivo://ia2.inaf.it/vospace/views#async-recall/include">test1.txt</vos:param> <vos:param uri="ivo://ia2.inaf.it/vospace/views#async-recall/include">test2.txt</vos:param> <vos:param uri="ivo://ia2.inaf.it/vospace/views#async-recall/include">test3.txt</vos:param> </vos:view> </vos:transfer> </vos:transfer> No newline at end of file