Commit 5d8899dc authored by Sonia Zorba's avatar Sonia Zorba
Browse files

Merge branch 'copyNode'

parents 4ee3cf44 a850e3d0
Pipeline #2208 passed with stages
in 2 minutes and 23 seconds
/*
* This file is part of vospace-rest
* Copyright (C) 2021 Istituto Nazionale di Astrofisica
* SPDX-License-Identifier: GPL-3.0-or-later
*/
package it.inaf.oats.vospace;
import it.inaf.oats.vospace.exception.InternalFaultException;
import it.inaf.oats.vospace.exception.NodeBusyException;
import it.inaf.oats.vospace.exception.PermissionDeniedException;
import it.inaf.oats.vospace.persistence.NodeDAO;
import it.inaf.oats.vospace.persistence.NodeDAO.ShortNodeDescriptor;
import javax.servlet.http.HttpServletRequest;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
public abstract class AbstractNodeService {
@Autowired
protected NodeDAO nodeDao;
@Value("${vospace-authority}")
protected String authority;
protected void validatePath(String path) {
if (path.equals("/")) {
throw new IllegalArgumentException("Cannot move root node or to root node");
}
}
protected void validateDestinationContainer(ShortNodeDescriptor snd, String destinationVosPath) {
if (snd.isBusy()) {
throw new NodeBusyException(destinationVosPath);
}
if (snd.isPermissionDenied()) {
throw new PermissionDeniedException(destinationVosPath);
}
if (!snd.isWritable()) {
throw new InternalFaultException("Destination is not writable: " + destinationVosPath);
}
if (!snd.isContainer()) {
throw new InternalFaultException("Existing destination is not a container: " + destinationVosPath);
}
}
}
/*
* This file is part of vospace-rest
* Copyright (C) 2021 Istituto Nazionale di Astrofisica
* SPDX-License-Identifier: GPL-3.0-or-later
*/
package it.inaf.oats.vospace;
import it.inaf.ia2.aa.data.User;
import it.inaf.oats.vospace.datamodel.NodeUtils;
import it.inaf.oats.vospace.exception.NodeBusyException;
import it.inaf.oats.vospace.exception.NodeNotFoundException;
import it.inaf.oats.vospace.exception.PermissionDeniedException;
import it.inaf.oats.vospace.persistence.NodeDAO.ShortNodeDescriptor;
import java.util.List;
import java.util.Optional;
import net.ivoa.xml.vospace.v2.Transfer;
import org.springframework.dao.CannotSerializeTransactionException;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Transactional;
@Service
@EnableTransactionManagement
public class CopyService extends AbstractNodeService {
@Transactional(rollbackFor = {Exception.class}, isolation = Isolation.REPEATABLE_READ)
public List<String> processCopyNodes(Transfer transfer, String jobId, User user) {
// Get Source Vos Path
String sourcePath = URIUtils.returnVosPathFromNodeURI(transfer.getTarget(), authority);
// Get Destination Vos Path (it's in transfer direction)
String destinationPath = URIUtils.returnVosPathFromNodeURI(transfer.getDirection(), authority);
// Destination source to be returned, null if no copy was performed
String destinationCopyRoot = null;
this.validatePath(sourcePath);
this.validatePath(destinationPath);
if (sourcePath.equals(destinationPath)) {
throw new IllegalArgumentException("Cannot copy node to itself");
}
// Check if destination is subpath of source
// Linux-like: "cannot copy to a subdirectory of itself"
if (destinationPath.startsWith(sourcePath + "/")) {
throw new IllegalArgumentException("Cannot copy node to a subdirectory of its own path");
}
// Check if destination equals parent path of source
if (NodeUtils.getParentPath(sourcePath).equals(destinationPath)) {
throw new IllegalArgumentException("Cannot duplicate node at same path without renaming it");
}
try {
// check source branch for read and lock it
this.checkBranchForReadAndLock(sourcePath, jobId, user);
// Check destination
Optional<ShortNodeDescriptor> destShortNodeDescriptor
= nodeDao.getShortNodeDescriptor(destinationPath, user.getName(), user.getGroups());
if (destShortNodeDescriptor.isPresent()) {
this.validateDestinationContainer(destShortNodeDescriptor.get(), destinationPath);
destinationCopyRoot = destinationPath + "/" + NodeUtils.getNodeName(sourcePath);
} else {
// Check if parent exists
String destinationParentPath = NodeUtils.getParentPath(destinationPath);
Optional<ShortNodeDescriptor> destShortNodeDescriptorParent
= nodeDao.getShortNodeDescriptor(destinationParentPath, user.getName(), user.getGroups());
if (destShortNodeDescriptorParent.isPresent()) {
this.validateDestinationContainer(destShortNodeDescriptorParent.get(), destinationParentPath);
destinationCopyRoot = destinationPath;
} else {
throw new UnsupportedOperationException("Creation of destination upon copy not supported");
}
}
nodeDao.copyBranch(
sourcePath,
destinationCopyRoot);
} catch (CannotSerializeTransactionException ex) {
// Concurrent transactions attempted to modify this set of nodes
throw new NodeBusyException(sourcePath);
}
return List.of(sourcePath, destinationCopyRoot);
}
private void checkBranchForReadAndLock(String sourcePath, String jobId, User user) {
// Get source node
Optional<Long> sourceIdOpt = nodeDao.getNodeId(sourcePath);
long sourceId = sourceIdOpt.orElseThrow(() -> new NodeNotFoundException(sourcePath));
if (nodeDao.isBranchBusy(sourceId)) {
throw new NodeBusyException(sourcePath);
}
if (!nodeDao.isBranchReadable(sourceId, user.getName(), user.getGroups())) {
throw new PermissionDeniedException(sourcePath);
}
nodeDao.setBranchJobId(sourceId, jobId);
}
}
......@@ -79,13 +79,72 @@ public class FileServiceClient {
headers.setBearerAuth(token);
}
headers.setContentType(MediaType.APPLICATION_JSON);
try ( OutputStream os = req.getBody()) {
try (OutputStream os = req.getBody()) {
MAPPER.writeValue(os, archiveRequest);
}
}, res -> {
return res.getHeaders().getLocation().toString();
}, new Object[]{});
}
public void startFileCopyJob(String sourceVosPath,
String destiantionVosPath, String jobId, User user) {
CopyRequest copyRequest = new CopyRequest();
copyRequest.setJobId(jobId);
copyRequest.setSourceRootVosPath(sourceVosPath);
copyRequest.setDestinationRootVosPath(destiantionVosPath);
String url = fileServiceUrl + "/copy";
String token = user.getAccessToken();
restTemplate.execute(url, HttpMethod.POST, req -> {
HttpHeaders headers = req.getHeaders();
if (token != null) {
headers.setBearerAuth(token);
}
headers.setContentType(MediaType.APPLICATION_JSON);
try (OutputStream os = req.getBody()) {
MAPPER.writeValue(os, copyRequest);
}
}, res -> {
return null;
}, new Object[]{});
}
public static class CopyRequest {
private String jobId;
private String sourceRootVosPath;
private String destinationRootVosPath;
public String getJobId() {
return jobId;
}
public void setJobId(String jobId) {
this.jobId = jobId;
}
public String getSourceRootVosPath() {
return sourceRootVosPath;
}
public void setSourceRootVosPath(String sourceRootVosPath) {
this.sourceRootVosPath = sourceRootVosPath;
}
public String getDestinationRootVosPath() {
return destinationRootVosPath;
}
public void setDestinationRootVosPath(String destinationRootVosPath) {
this.destinationRootVosPath = destinationRootVosPath;
}
}
public static class ArchiveRequest {
......
......@@ -18,6 +18,8 @@ import it.inaf.oats.vospace.exception.InvalidArgumentException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import it.inaf.oats.vospace.exception.VoSpaceErrorSummarizableException;
import it.inaf.oats.vospace.persistence.NodeDAO;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
......@@ -40,11 +42,20 @@ public class JobService {
@Autowired
private MoveService moveService;
@Autowired
private CopyService copyService;
@Autowired
private AsyncTransferService asyncTransfService;
@Autowired
private HttpServletRequest servletRequest;
@Autowired
private FileServiceClient fileServiceClient;
@Autowired
private NodeDAO nodeDao;
public enum JobDirection {
pullToVoSpace,
......@@ -119,6 +130,9 @@ public class JobService {
case moveNode:
handleMoveNode(job, transfer);
break;
case copyNode:
handleCopyNode(job, transfer);
break;
default:
throw new UnsupportedOperationException("Not implemented yet");
}
......@@ -177,8 +191,33 @@ public class JobService {
});
}
private void handleCopyNode(JobSummary jobSummary, Transfer transfer) {
// User data must be extracted before starting the new thread
// to avoid the "No thread-bound request found" exception
User user = (User) servletRequest.getUserPrincipal();
CompletableFuture.runAsync(() -> {
handleJobErrors(jobSummary, job -> {
String jobId = jobSummary.getJobId();
// Index 0: source 1: destination
List<String> sourceAndDestination = copyService.processCopyNodes(transfer, jobId, user);
// Call file service and command copy
try{
fileServiceClient.startFileCopyJob(sourceAndDestination.get(0), sourceAndDestination.get(1), jobId, user);
} catch (Exception e) {
// We decided not to purge metadata in case of failure
// just release busy nodes setting job_id = null
nodeDao.releaseBusyNodesByJobId(jobId);
throw e;
}
return null;
});
});
}
private void handleJobErrors(JobSummary job, Function<JobSummary, Transfer> jobConsumer) {
Transfer negotiatedTransfer = null;
Transfer negotiatedTransfer = null;
try {
negotiatedTransfer = jobConsumer.apply(job);
} catch (VoSpaceErrorSummarizableException e) {
......
......@@ -11,12 +11,9 @@ import it.inaf.oats.vospace.exception.InternalFaultException;
import it.inaf.oats.vospace.exception.NodeBusyException;
import it.inaf.oats.vospace.exception.NodeNotFoundException;
import it.inaf.oats.vospace.exception.PermissionDeniedException;
import it.inaf.oats.vospace.persistence.NodeDAO;
import it.inaf.oats.vospace.persistence.NodeDAO.ShortNodeDescriptor;
import java.util.Optional;
import net.ivoa.xml.vospace.v2.Transfer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.dao.CannotSerializeTransactionException;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.EnableTransactionManagement;
......@@ -25,13 +22,7 @@ import org.springframework.transaction.annotation.Transactional;
@Service
@EnableTransactionManagement
public class MoveService {
@Autowired
private NodeDAO nodeDao;
@Value("${vospace-authority}")
private String authority;
public class MoveService extends AbstractNodeService {
/**
* Perform modeNode operation. User is passed as parameter because this method
......@@ -52,13 +43,18 @@ public class MoveService {
this.validatePath(destinationPath);
if (sourcePath.equals(destinationPath)) {
return;
throw new IllegalArgumentException("Cannot move node to itself");
}
// Check if destination is subpath of source
// Linux-like: "cannot move to a subdirectory of itself"
if(destinationPath.startsWith(sourcePath+"/")) {
throw new IllegalArgumentException("Cannot move node to a subdirectory of its own path");
}
// Check if destination equals parent path of source
if(NodeUtils.getParentPath(sourcePath).equals(destinationPath)){
return;
}
try {
......@@ -110,13 +106,6 @@ public class MoveService {
// Concurrent transactions attempted to modify this set of nodes
throw new NodeBusyException(sourcePath);
}
}
private void validatePath(String path) {
if (path.equals("/")) {
throw new IllegalArgumentException("Cannot move root node or to root node");
}
}
}
......@@ -135,6 +135,25 @@ public class NodeDAO {
return Optional.of(node);
}
public List<String> listNodeChildren(String path) {
String sql = "SELECT n.name\n"
+ "FROM node n\n"
+ "WHERE n.path ~ ('*.' || id_from_vos_path(?) || '.*{1}')::lquery\n"
+ "ORDER BY n.path";
List<String> childrenNames = jdbcTemplate.query(conn -> {
PreparedStatement ps = conn.prepareStatement(sql);
int i = 0;
ps.setString(++i, path);
return ps;
}, (row, index) -> {
return row.getString("name");
});
return childrenNames;
}
public Node setNode(Node newNode) {
return setNode(newNode, false);
}
......@@ -309,6 +328,61 @@ public class NodeDAO {
});
}
public void copyBranch(String sourceVosPath, String destVosPath) {
String destVosParentPath = NodeUtils.getParentPath(destVosPath);
String destName = NodeUtils.getNodeName(destVosPath);
String parentInsert = "INSERT INTO node (node_id, parent_path, parent_relative_path, name, type, location_id, creator_id, group_write, group_read, is_public,\n"
+ "job_id, tstamp_wrapper_dir, format, async_trans, sticky, accept_views, provide_views, protocols)\n";
String ctePathPrefix = "SELECT CASE WHEN path::varchar = '' THEN '' ELSE (path::varchar || '.') END AS prefix\n"
+ "FROM node WHERE node_id = id_from_vos_path(?)";
String cteCopiedNodes = "SELECT nextval('node_node_id_seq') AS new_node_id,\n"
+ "((SELECT prefix FROM path_prefix) || currval('node_node_id_seq'))::ltree AS new_path,\n"
+ "path, relative_path, parent_path, parent_relative_path, ? AS name,\n"
+ "type, location_id, creator_id, group_write, group_read, is_public,\n"
+ "job_id, tstamp_wrapper_dir, format, async_trans, sticky, accept_views, provide_views, protocols\n"
+ "FROM node WHERE node_id = id_from_vos_path(?)\n"
+ "UNION ALL\n"
+ "SELECT nextval('node_node_id_seq') AS new_node_id,\n"
+ "(p.new_path::varchar || '.' || currval('node_node_id_seq'))::ltree,\n"
+ "n.path, n.relative_path, n.parent_path, n.parent_relative_path, n.name,\n"
+ "n.type, n.location_id, n.creator_id, n.group_write, n.group_read, n.is_public,\n"
+ "n.job_id, n.tstamp_wrapper_dir, n.format, n.async_trans, n.sticky, n.accept_views, n.provide_views, n.protocols\n"
+ "FROM node n\n"
+ "JOIN copied_nodes p ON p.path = n.parent_path";
String cteCopiedNodesPaths = "SELECT subpath(new_path, 0, nlevel(new_path) - 1) AS new_parent_path,\n"
+ "nlevel(parent_path) - nlevel(parent_relative_path) AS rel_offset, * FROM copied_nodes";
String parentSelect = "SELECT\n"
+ "new_node_id, new_parent_path,\n"
+ "CASE WHEN nlevel(new_parent_path) = rel_offset THEN ''::ltree ELSE subpath(new_parent_path, rel_offset) END new_parent_relative_path,\n"
+ "name, type, location_id, creator_id, group_write, group_read, is_public,\n"
+ "job_id, tstamp_wrapper_dir, format, async_trans, sticky, accept_views, provide_views, protocols\n"
+ "FROM copied_nodes_paths\n";
String sql = parentInsert
+ "WITH RECURSIVE path_prefix AS ("
+ ctePathPrefix + "),\n"
+ "copied_nodes AS ("
+ cteCopiedNodes + "),\n"
+ "copied_nodes_paths AS ("
+ cteCopiedNodesPaths + ")\n"
+ parentSelect;
jdbcTemplate.update(conn -> {
PreparedStatement ps = conn.prepareStatement(sql);
ps.setString(1, destVosParentPath);
ps.setString(2, destName);
ps.setString(3, sourceVosPath);
return ps;
});
}
public boolean isBranchBusy(long parentNodeId) {
String sql = "SELECT COUNT(c.node_id) > 0 "
......@@ -319,6 +393,28 @@ public class NodeDAO {
return jdbcTemplate.queryForObject(sql, new Object[]{parentNodeId}, new int[]{Types.BIGINT}, Boolean.class);
}
public void setBranchJobId(Long rootNodeId, String jobId) {
String sql = "UPDATE node SET job_id = ?\n"
+ "WHERE path ~ ('*.' || ? || '.*')::lquery";
jdbcTemplate.update(conn -> {
PreparedStatement ps = conn.prepareStatement(sql);
ps.setString(1, jobId);
ps.setLong(2, rootNodeId);
return ps;
});
}
public void releaseBusyNodesByJobId(String jobId) {
String sql = "UPDATE node SET job_id = NULL WHERE job_id = ?";
jdbcTemplate.update(conn -> {
PreparedStatement ps = conn.prepareStatement(sql);
ps.setString(1, jobId);
return ps;
});
}
public boolean isBranchWritable(long parentNodeId, String userId, List<String> userGroups) {
String sql = "SELECT COUNT(c.node_id) = 0 "
......@@ -350,6 +446,36 @@ public class NodeDAO {
});
}
public boolean isBranchReadable(long parentNodeId, String userId, List<String> userGroups) {
String sql = "SELECT COUNT(c.node_id) = 0 "
+ "FROM node n "
+ "JOIN node c ON c.path <@ n.path "
+ "WHERE n.node_id = ? AND "
+ "NOT COALESCE(c.is_public, FALSE) "
+ "AND (SELECT COUNT(*) FROM (SELECT UNNEST(?) INTERSECT SELECT UNNEST(c.group_read)) AS allowed_groups) = 0 "
+ "AND c.creator_id <> ?";
return jdbcTemplate.query(sql, ps -> {
ps.setLong(1, parentNodeId);
String[] groups;
if (userGroups == null) {
groups = new String[0];
} else {
groups = userGroups.toArray(String[]::new);
}
ps.setArray(2, ps.getConnection().createArrayOf("varchar", groups));
ps.setString(3, userId);
}, row -> {
if (!row.next()) {
throw new IllegalStateException("Expected one result");
}
return row.getBoolean(1);
});
}
public void deleteNode(String path) {
int nodesWithPath = countNodesWithPath(path);
if (nodesWithPath == 0) {
......
/*
* This file is part of vospace-rest
* Copyright (C) 2021 Istituto Nazionale di Astrofisica
* SPDX-License-Identifier: GPL-3.0-or-later
*/
package it.inaf.oats.vospace;
import it.inaf.ia2.aa.data.User;
import it.inaf.oats.vospace.exception.NodeBusyException;
import it.inaf.oats.vospace.exception.NodeNotFoundException;
import it.inaf.oats.vospace.exception.PermissionDeniedException;
import it.inaf.oats.vospace.persistence.DataSourceConfigSingleton;
import it.inaf.oats.vospace.persistence.NodeDAO;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import net.ivoa.xml.vospace.v2.Transfer;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import org.junit.jupiter.api.MethodOrderer.OrderAnnotation;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.ContextConfiguration;
@SpringBootTest
@AutoConfigureMockMvc
@ContextConfiguration(classes = DataSourceConfigSingleton.class)
@TestPropertySource(locations = "classpath:test.properties", properties = {"vospace-authority=example.com!vospace", "file-service-url=http://file-service"})
@TestMethodOrder(OrderAnnotation.class)
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
public class CopyServiceTest {
@Value("${vospace-authority}")
private String authority;
@Autowired
private CopyService copyService;
@Autowired
private NodeDAO nodeDao;
@Test
@Order(1)
public void copyRootTest() {
assertThrows(IllegalArgumentException.class, () -> {
copyService.processCopyNodes(getTransfer("/", "/pippo"), "job_pippo", getAnonymousUser());
}
);