Loading src/main/java/it/inaf/ia2/transfer/controller/CopyController.java +4 −1 Original line number Diff line number Diff line Loading @@ -17,6 +17,7 @@ import it.inaf.oats.vospace.exception.InvalidArgumentException; import java.util.concurrent.CompletableFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import it.inaf.ia2.transfer.auth.TokenPrincipal; @RestController public class CopyController extends AuthenticatedFileController { Loading @@ -42,13 +43,15 @@ public class CopyController extends AuthenticatedFileController { LOG.debug("copyFiles called from jobId {}", jobId); TokenPrincipal principal = getPrincipal(); // need to make a completable future start CompletableFuture.runAsync(() -> { handleFileJob(() -> { try { copyService.copyFiles(copyRequest.getSourceRootVosPath(), copyRequest.getDestinationRootVosPath(), copyRequest.getJobId(), getPrincipal()); principal); } finally { // TODO: cleanup code to remove unpopulated nodes in case // of failure? Loading src/main/java/it/inaf/ia2/transfer/persistence/FileDAO.java +20 −10 Original line number Diff line number Diff line Loading @@ -228,8 +228,8 @@ public class FileDAO { + "JOIN node p ON p.path @> n.path\n" + "LEFT JOIN location l ON l.location_id = n.location_id\n" + "LEFT JOIN storage s ON s.storage_id = l.storage_dest_id\n" + "WHERE (p.node_id = id_from_vos_path(?) AND n.job_id = ?)" + "\nORDER BY vos_path ASC"; + "WHERE (p.node_id = id_from_vos_path(?) AND n.job_id = ?)\n" + "ORDER BY vos_path ASC\n"; return jdbcTemplate.query(conn -> { PreparedStatement ps = conn.prepareStatement(sql); Loading @@ -248,16 +248,26 @@ public class FileDAO { public void setBranchLocationId(String rootVosPath, String jobId, int locationId) { // TODO: validate rootVosPath as a vos_path String sql = "UPDATE node n SET\n" + "location_id = ?\n" + "JOIN node p ON n.path <@ p.path\n" + "WHERE (path ~ ('*.' || id_from_vos_path(?))::lquery AND n.job_id = ?)"; String cte = "SELECT n.node_id AS id\n" + "FROM node n\n" + "JOIN node p ON p.path @> n.path\n" + "WHERE (p.node_id = id_from_vos_path(?) AND n.job_id = ?)\n"; String update = "UPDATE node\n" + " SET location_id = ?\n" + "FROM cte\n" + "WHERE node_id = cte.id\n"; String sql = "WITH cte AS (\n" + cte + ")\n" +update; jdbcTemplate.update(conn -> { PreparedStatement ps = conn.prepareStatement(sql); ps.setInt(1, locationId); ps.setString(2, rootVosPath); ps.setString(3, jobId); ps.setString(1, rootVosPath); ps.setString(2, jobId); ps.setInt(3, locationId); return ps; }); } Loading src/main/java/it/inaf/ia2/transfer/service/FileCopyService.java +12 −4 Original line number Diff line number Diff line Loading @@ -52,16 +52,22 @@ public class FileCopyService { public void copyFiles(String sourceRootVosPath, String destinationRootVosPath, String jobId, TokenPrincipal principal) { LOG.trace("copyFiles called for source {}, destination {}, from jobId \"{}\"", sourceRootVosPath, destinationRootVosPath, jobId); // We use jobId to identify nodes created by the REST part of CopyNode // We expect them to be locked List<FileInfo> sources = fileDAO.getBranchFileInfos(sourceRootVosPath, jobId); LOG.debug("found {} sources", sources.size()); if (sources.isEmpty()) { throw new NodeNotFoundException(sourceRootVosPath); } // Set location of destinations to this file service update location // before retrieving file infos fileDAO.setBranchLocationId(destinationRootVosPath, jobId, uploadLocationId); Loading @@ -69,6 +75,8 @@ public class FileCopyService { List<FileInfo> destinations = fileDAO.getBranchFileInfos(destinationRootVosPath, jobId); LOG.debug("found {} destinations", destinations.size()); if (destinations.isEmpty()) { throw new NodeNotFoundException(destinationRootVosPath); } Loading Loading @@ -97,6 +105,7 @@ public class FileCopyService { Map<Integer, String> portalLocationUrls = null; for (FileInfo destinationFileInfo : destinationFileInfos) { LOG.trace("Processing {} destination", destinationFileInfo.getVirtualPath()); // Cycle on files only if (!destinationFileInfo.isDirectory()) { // Calculate source file vos path Loading Loading @@ -169,7 +178,7 @@ public class FileCopyService { String url = baseUrl + "/" + sourceFileInfo.getVirtualName(); LOG.trace("Downloading file from " + url); LOG.trace("Downloading file from {}", url); restTemplate.execute(url, HttpMethod.GET, req -> { HttpHeaders headers = req.getHeaders(); Loading Loading @@ -198,8 +207,7 @@ public class FileCopyService { } File file = new File(sourceFileInfo.getOsPath()); LOG.trace("Copying file: " + file.getAbsolutePath() + " to: " + destinationFileInfo.getOsPath()); LOG.trace("Copying file: {} to {}",file.getAbsolutePath(), destinationFileInfo.getOsPath()); putFileService.copyLocalFile(sourceFileInfo, destinationFileInfo, remainingQuota); Loading src/main/java/it/inaf/ia2/transfer/service/PutFileService.java +6 −4 Original line number Diff line number Diff line Loading @@ -123,15 +123,17 @@ public class PutFileService { // TODO: discuss if mismatches lead to taking actions if (sourceFileInfo != null) { if (!Objects.equals(sourceFileInfo.getContentLength(), fileSize)) { LOG.warn("Destination file size mismatch with source"); LOG.warn("Destination file {} size mismatch with source", destinationFile.toPath().toString()); } if (!sourceFileInfo.getContentType().equals(destinationFileInfo.getContentType())) { LOG.warn("Destination file content type mismatch with source"); LOG.warn("Destination file {} content type mismatch with source {} {}", destinationFile.toPath().toString(), destinationFileInfo.getContentType(), sourceFileInfo.getContentType()); } if (!sourceFileInfo.getContentMd5().equals(destinationFileInfo.getContentMd5())) { LOG.warn("Destination file md5 mismatch with source"); if (!sourceFileInfo.getContentMd5().equals(md5Checksum)) { LOG.warn("Destination file {} md5 mismatch with source {} {}", destinationFile.toPath().toString(), destinationFileInfo.getContentMd5(), sourceFileInfo.getContentMd5() ); } } Loading src/test/java/it/inaf/ia2/transfer/persistence/FileDAOTest.java +21 −1 Original line number Diff line number Diff line Loading @@ -42,6 +42,26 @@ public class FileDAOTest { ReflectionTestUtils.setField(dao, "uploadLocationId", uploadLocationId); } @Test public void testGetBranchFileInfo() { List<FileInfo> fi = dao.getBranchFileInfos("/test100", "pippo"); assertEquals(3, fi.size()); List<FileInfo> fi2 = dao.getBranchFileInfos("/test100/test1001.txt", "pippo"); assertEquals(1, fi2.size()); } @Test public void testSetBranchLocationId() { dao.setBranchLocationId("/test100", "pippo", 3); List<FileInfo> fi = dao.getBranchFileInfos("/test100", "pippo"); assertEquals(3, fi.size()); for(FileInfo f : fi) { assertEquals(3, f.getLocationId()); } } @Test public void testGetFileInfo() { Loading Loading
src/main/java/it/inaf/ia2/transfer/controller/CopyController.java +4 −1 Original line number Diff line number Diff line Loading @@ -17,6 +17,7 @@ import it.inaf.oats.vospace.exception.InvalidArgumentException; import java.util.concurrent.CompletableFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import it.inaf.ia2.transfer.auth.TokenPrincipal; @RestController public class CopyController extends AuthenticatedFileController { Loading @@ -42,13 +43,15 @@ public class CopyController extends AuthenticatedFileController { LOG.debug("copyFiles called from jobId {}", jobId); TokenPrincipal principal = getPrincipal(); // need to make a completable future start CompletableFuture.runAsync(() -> { handleFileJob(() -> { try { copyService.copyFiles(copyRequest.getSourceRootVosPath(), copyRequest.getDestinationRootVosPath(), copyRequest.getJobId(), getPrincipal()); principal); } finally { // TODO: cleanup code to remove unpopulated nodes in case // of failure? Loading
src/main/java/it/inaf/ia2/transfer/persistence/FileDAO.java +20 −10 Original line number Diff line number Diff line Loading @@ -228,8 +228,8 @@ public class FileDAO { + "JOIN node p ON p.path @> n.path\n" + "LEFT JOIN location l ON l.location_id = n.location_id\n" + "LEFT JOIN storage s ON s.storage_id = l.storage_dest_id\n" + "WHERE (p.node_id = id_from_vos_path(?) AND n.job_id = ?)" + "\nORDER BY vos_path ASC"; + "WHERE (p.node_id = id_from_vos_path(?) AND n.job_id = ?)\n" + "ORDER BY vos_path ASC\n"; return jdbcTemplate.query(conn -> { PreparedStatement ps = conn.prepareStatement(sql); Loading @@ -248,16 +248,26 @@ public class FileDAO { public void setBranchLocationId(String rootVosPath, String jobId, int locationId) { // TODO: validate rootVosPath as a vos_path String sql = "UPDATE node n SET\n" + "location_id = ?\n" + "JOIN node p ON n.path <@ p.path\n" + "WHERE (path ~ ('*.' || id_from_vos_path(?))::lquery AND n.job_id = ?)"; String cte = "SELECT n.node_id AS id\n" + "FROM node n\n" + "JOIN node p ON p.path @> n.path\n" + "WHERE (p.node_id = id_from_vos_path(?) AND n.job_id = ?)\n"; String update = "UPDATE node\n" + " SET location_id = ?\n" + "FROM cte\n" + "WHERE node_id = cte.id\n"; String sql = "WITH cte AS (\n" + cte + ")\n" +update; jdbcTemplate.update(conn -> { PreparedStatement ps = conn.prepareStatement(sql); ps.setInt(1, locationId); ps.setString(2, rootVosPath); ps.setString(3, jobId); ps.setString(1, rootVosPath); ps.setString(2, jobId); ps.setInt(3, locationId); return ps; }); } Loading
src/main/java/it/inaf/ia2/transfer/service/FileCopyService.java +12 −4 Original line number Diff line number Diff line Loading @@ -52,16 +52,22 @@ public class FileCopyService { public void copyFiles(String sourceRootVosPath, String destinationRootVosPath, String jobId, TokenPrincipal principal) { LOG.trace("copyFiles called for source {}, destination {}, from jobId \"{}\"", sourceRootVosPath, destinationRootVosPath, jobId); // We use jobId to identify nodes created by the REST part of CopyNode // We expect them to be locked List<FileInfo> sources = fileDAO.getBranchFileInfos(sourceRootVosPath, jobId); LOG.debug("found {} sources", sources.size()); if (sources.isEmpty()) { throw new NodeNotFoundException(sourceRootVosPath); } // Set location of destinations to this file service update location // before retrieving file infos fileDAO.setBranchLocationId(destinationRootVosPath, jobId, uploadLocationId); Loading @@ -69,6 +75,8 @@ public class FileCopyService { List<FileInfo> destinations = fileDAO.getBranchFileInfos(destinationRootVosPath, jobId); LOG.debug("found {} destinations", destinations.size()); if (destinations.isEmpty()) { throw new NodeNotFoundException(destinationRootVosPath); } Loading Loading @@ -97,6 +105,7 @@ public class FileCopyService { Map<Integer, String> portalLocationUrls = null; for (FileInfo destinationFileInfo : destinationFileInfos) { LOG.trace("Processing {} destination", destinationFileInfo.getVirtualPath()); // Cycle on files only if (!destinationFileInfo.isDirectory()) { // Calculate source file vos path Loading Loading @@ -169,7 +178,7 @@ public class FileCopyService { String url = baseUrl + "/" + sourceFileInfo.getVirtualName(); LOG.trace("Downloading file from " + url); LOG.trace("Downloading file from {}", url); restTemplate.execute(url, HttpMethod.GET, req -> { HttpHeaders headers = req.getHeaders(); Loading Loading @@ -198,8 +207,7 @@ public class FileCopyService { } File file = new File(sourceFileInfo.getOsPath()); LOG.trace("Copying file: " + file.getAbsolutePath() + " to: " + destinationFileInfo.getOsPath()); LOG.trace("Copying file: {} to {}",file.getAbsolutePath(), destinationFileInfo.getOsPath()); putFileService.copyLocalFile(sourceFileInfo, destinationFileInfo, remainingQuota); Loading
src/main/java/it/inaf/ia2/transfer/service/PutFileService.java +6 −4 Original line number Diff line number Diff line Loading @@ -123,15 +123,17 @@ public class PutFileService { // TODO: discuss if mismatches lead to taking actions if (sourceFileInfo != null) { if (!Objects.equals(sourceFileInfo.getContentLength(), fileSize)) { LOG.warn("Destination file size mismatch with source"); LOG.warn("Destination file {} size mismatch with source", destinationFile.toPath().toString()); } if (!sourceFileInfo.getContentType().equals(destinationFileInfo.getContentType())) { LOG.warn("Destination file content type mismatch with source"); LOG.warn("Destination file {} content type mismatch with source {} {}", destinationFile.toPath().toString(), destinationFileInfo.getContentType(), sourceFileInfo.getContentType()); } if (!sourceFileInfo.getContentMd5().equals(destinationFileInfo.getContentMd5())) { LOG.warn("Destination file md5 mismatch with source"); if (!sourceFileInfo.getContentMd5().equals(md5Checksum)) { LOG.warn("Destination file {} md5 mismatch with source {} {}", destinationFile.toPath().toString(), destinationFileInfo.getContentMd5(), sourceFileInfo.getContentMd5() ); } } Loading
src/test/java/it/inaf/ia2/transfer/persistence/FileDAOTest.java +21 −1 Original line number Diff line number Diff line Loading @@ -42,6 +42,26 @@ public class FileDAOTest { ReflectionTestUtils.setField(dao, "uploadLocationId", uploadLocationId); } @Test public void testGetBranchFileInfo() { List<FileInfo> fi = dao.getBranchFileInfos("/test100", "pippo"); assertEquals(3, fi.size()); List<FileInfo> fi2 = dao.getBranchFileInfos("/test100/test1001.txt", "pippo"); assertEquals(1, fi2.size()); } @Test public void testSetBranchLocationId() { dao.setBranchLocationId("/test100", "pippo", 3); List<FileInfo> fi = dao.getBranchFileInfos("/test100", "pippo"); assertEquals(3, fi.size()); for(FileInfo f : fi) { assertEquals(3, f.getLocationId()); } } @Test public void testGetFileInfo() { Loading