Commit 29c7a017 authored by Nicola Fulvio Calabria's avatar Nicola Fulvio Calabria
Browse files

Most of file copy implementation written

parent d2c59ebd
Loading
Loading
Loading
Loading
+13 −3
Original line number Diff line number Diff line
@@ -11,20 +11,30 @@ import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.beans.factory.annotation.Autowired;
import it.inaf.ia2.transfer.service.CopyService;
import it.inaf.ia2.transfer.service.FileCopyService;
import java.util.concurrent.CompletableFuture;

@RestController
public class CopyController extends AuthenticatedFileController {

    @Autowired
    private CopyService copyService;
    private FileCopyService copyService;

    @PostMapping(value = "/copy", consumes = MediaType.APPLICATION_JSON_VALUE)
    public ResponseEntity<?> copyFiles(@RequestBody CopyRequest copyRequest) {
               
        // need to make a completable future start
        CompletableFuture.runAsync(() -> {
            handleFileJob(() -> copyService.copyFiles(copyRequest.getSourceRootVosPath(), 
                copyRequest.getDestinationRootVosPath(), copyRequest.getJobId(), 
                getPrincipal()), copyRequest.jobId);
        });     

        return ResponseEntity.ok(
                copyRequest.getJobId() + " copy task accepted by File Service"
        );        
        
        
    }

    @Override
+51 −0
Original line number Diff line number Diff line
@@ -192,6 +192,57 @@ public class FileDAO {
        });
    }
    
    // TODO: same problem as get archive file infos   
    public List<FileInfo> getBranchFileInfos(String rootVosPath, String jobId) {

        // TODO: validate rootVosPath as a vos_path

        String sql = "SELECT n.node_id, n.is_public, n.group_read, n.group_write, n.creator_id, n.async_trans,\n"
                + "n.content_type, n.content_encoding, n.content_length, n.content_md5,\n"
                + "n.accept_views, n.provide_views, l.location_type, n.path <> n.relative_path AS virtual_parent,\n"
                + "(SELECT user_name FROM users WHERE user_id = n.creator_id) AS username,\n"
                + "base_path, get_os_path(n.node_id) AS os_path, get_vos_path(n.node_id) AS vos_path,\n"
                + "n.type = 'container' AS is_directory, n.name, n.location_id, n.job_id\n"
                + "FROM node n\n"
                + "JOIN node p ON p.path @> n.path\n"
                + "LEFT JOIN location l ON l.location_id = n.location_id\n"
                + "LEFT JOIN storage s ON s.storage_id = l.storage_dest_id\n"
                + "WHERE (p.node_id  = id_from_vos_path(?) AND n.job_id = ?)"
                + "\nORDER BY vos_path ASC";

        return jdbcTemplate.query(conn -> {
            PreparedStatement ps = conn.prepareStatement(sql);
            ps.setString(1, rootVosPath);
            ps.setString(2, jobId);
            return ps;
        }, rs -> {
            List<FileInfo> fileInfos = new ArrayList<>();
            while (rs.next()) {
                fileInfos.add(getFileInfo(rs));
            }
            return fileInfos;
        });
    }
    
    public void setBranchLocationId(String rootVosPath, String jobId, int locationId) {

        // TODO: validate rootVosPath as a vos_path

        String sql = "UPDATE node n SET\n"
                + "location_id = ?\n"                
                + "JOIN node p ON n.path <@ p.path\n"                
                + "WHERE (path ~ ('*.' || id_from_vos_path(?))::lquery AND n.job_id = ?)";

        jdbcTemplate.update(conn -> {
            PreparedStatement ps = conn.prepareStatement(sql);
            ps.setInt(1, locationId);
            ps.setString(2, rootVosPath);
            ps.setString(3, jobId);
            return ps;
        });
    }
    

    private FileInfo getFileInfo(ResultSet rs) throws SQLException {
        FileInfo fi = new FileInfo();
        fi.setNodeId(rs.getInt("node_id"));
+0 −153
Original line number Diff line number Diff line
/*
 * This file is part of vospace-file-service
 * Copyright (C) 2021 Istituto Nazionale di Astrofisica
 * SPDX-License-Identifier: GPL-3.0-or-later
 */
package it.inaf.ia2.transfer.service;

import it.inaf.ia2.transfer.auth.TokenPrincipal;
import it.inaf.ia2.transfer.exception.InsufficientStorageException;
import it.inaf.ia2.transfer.exception.JobException;
import it.inaf.ia2.transfer.exception.JobException.Type;
import it.inaf.ia2.transfer.persistence.FileDAO;
import it.inaf.ia2.transfer.persistence.JobDAO;
import it.inaf.ia2.transfer.persistence.LocationDAO;
import it.inaf.ia2.transfer.persistence.model.FileInfo;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.security.Principal;
import java.util.List;
import java.util.Map;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
import net.ivoa.xml.uws.v1.ExecutionPhase;
import org.kamranzafar.jtar.TarEntry;
import org.kamranzafar.jtar.TarOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.stereotype.Service;
import org.springframework.util.FileSystemUtils;
import org.springframework.util.unit.DataSize;
import org.springframework.web.client.RestTemplate;

@Service
public class CopyService {

    private static final Logger LOG = LoggerFactory.getLogger(CopyService.class);

    @Autowired
    private FileDAO fileDAO;

    @Autowired
    private LocationDAO locationDAO;

    @Autowired
    private JobDAO jobDAO;

    @Autowired
    private AuthorizationService authorizationService;

    @Autowired
    private RestTemplate restTemplate;

    @Value("${upload_location_id}")
    private int uploadLocationId;

    // Maximum size of the working directory for each registered user
    @Value("${generated.dir.max-size}")
    private DataSize generatedDirMaxSize;
    
    
    
    
    public void copyFiles(String sourceRootVosPath, 
            String destinationRootVosPath, String jobId) {
        // We use jobId to identify nodes created by the REST part of CopyNode
        // We expect them to be locked
        
                
        
        
    }

    

    private String getCommonParent(List<String> vosPaths) {
        String commonParent = null;
        for (String vosPath : vosPaths) {
            if (commonParent == null) {
                commonParent = vosPath;
            } else {
                StringBuilder newCommonParent = new StringBuilder();
                boolean same = true;
                for (int i = 0; same && i < Math.min(commonParent.length(), vosPath.length()); i++) {
                    if (commonParent.charAt(i) == vosPath.charAt(i)) {
                        newCommonParent.append(commonParent.charAt(i));
                    } else {
                        same = false;
                    }
                }
                commonParent = newCommonParent.toString();
            }
        }
        return commonParent;
    }

    private <O extends OutputStream, E> void downloadFileIntoArchive(FileInfo fileInfo, String relPath, TokenPrincipal tokenPrincipal, ArchiveHandler<O, E> handler, String baseUrl) {

        if (baseUrl == null) {
            LOG.error("Location URL not found for location " + fileInfo.getLocationId());
            throw new JobException(Type.FATAL, "Internal Fault")
                    .setErrorDetail("InternalFault: Unable to retrieve location of file " + fileInfo.getVirtualPath());
        }

        String url = baseUrl + "/" + fileInfo.getVirtualName();

        LOG.trace("Downloading file from " + url);

        restTemplate.execute(url, HttpMethod.GET, req -> {
            HttpHeaders headers = req.getHeaders();
            if (tokenPrincipal.getToken() != null) {
                headers.setBearerAuth(tokenPrincipal.getToken());
            }
        }, res -> {
            File tmpFile = Files.createTempFile("download", null).toFile();
            try ( FileOutputStream os = new FileOutputStream(tmpFile)) {
                res.getBody().transferTo(os);
                handler.putNextEntry(tmpFile, relPath);
                try ( FileInputStream is = new FileInputStream(tmpFile)) {
                    is.transferTo(handler.getOutputStream());
                }
            } finally {
                tmpFile.delete();
            }
            return null;
        }, new Object[]{});
    }

    private <O extends OutputStream, E> void writeFileIntoArchive(FileInfo fileInfo, String relPath, TokenPrincipal tokenPrincipal, ArchiveHandler<O, E> handler) throws IOException {
        if (!authorizationService.isDownloadable(fileInfo, tokenPrincipal)) {
            throw new JobException(Type.FATAL, "Permission Denied")
                    .setErrorDetail("PermissionDenied: " + fileInfo.getVirtualPath());
        }

        File file = new File(fileInfo.getOsPath());
        LOG.trace("Adding file " + file.getAbsolutePath() + " to tar archive");

        try ( InputStream is = new FileInputStream(file)) {
            handler.putNextEntry(file, relPath);
            is.transferTo(handler.getOutputStream());
        }
    }
}
+223 −0
Original line number Diff line number Diff line
/*
 * This file is part of vospace-file-service
 * Copyright (C) 2021 Istituto Nazionale di Astrofisica
 * SPDX-License-Identifier: GPL-3.0-or-later
 */
package it.inaf.ia2.transfer.service;

import it.inaf.ia2.transfer.auth.TokenPrincipal;
import it.inaf.ia2.transfer.exception.JobException;
import it.inaf.ia2.transfer.exception.JobException.Type;
import it.inaf.ia2.transfer.persistence.FileDAO;
import it.inaf.ia2.transfer.persistence.LocationDAO;
import it.inaf.ia2.transfer.persistence.model.FileInfo;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;

@Service
public class FileCopyService {

    private static final Logger LOG = LoggerFactory.getLogger(FileCopyService.class);

    @Autowired
    private FileDAO fileDAO;

    @Autowired
    private LocationDAO locationDAO;

    @Autowired
    private AuthorizationService authorizationService;

    @Autowired
    private RestTemplate restTemplate;

    @Value("${upload_location_id}")
    private int uploadLocationId;

    public void copyFiles(String sourceRootVosPath,
            String destinationRootVosPath, String jobId, TokenPrincipal principal) {
        // We use jobId to identify nodes created by the REST part of CopyNode
        // We expect them to be locked

        List<FileInfo> sources
                = fileDAO.getBranchFileInfos(sourceRootVosPath, jobId);
        // Set location of destinations to this file service update location 
        // before retrieving file infos

        fileDAO.setBranchLocationId(destinationRootVosPath, jobId, uploadLocationId);
        List<FileInfo> destinations
                = fileDAO.getBranchFileInfos(destinationRootVosPath, jobId);

        if (sources.size() != destinations.size()) {
            throw new IllegalStateException("Sources and destinations list have different sizes");
        }

        // Create destination directories on disk
        this.makeDirectoryStructure(destinations);

        this.fillDestinations(destinations,
                sources,
                sourceRootVosPath,
                destinationRootVosPath,
                principal);

    }

    private void makeDirectory(FileInfo containerFileInfo) {
        File file = new File(containerFileInfo.getOsPath());

        if (!file.exists()) {
            if (!file.mkdirs()) {
                throw new IllegalStateException("Unable to create directory " + containerFileInfo.getOsPath());
            }
        }

    }

    private void makeDirectoryStructure(List<FileInfo> destinationFileInfos) {
        for (FileInfo f : destinationFileInfos) {
            if (f.isDirectory()) {
                this.makeDirectory(f);
            }
        }
    }

    private void fillDestinations(List<FileInfo> destinationFileInfos,
            List<FileInfo> sourcesFileInfos,
            String sourceRootVosPath,
            String destinationRootVosPath,
            TokenPrincipal principal) {

        // it will be initialized only when necessary
        Map<Integer, String> portalLocationUrls = null;

        for (FileInfo f : destinationFileInfos) {
            if (!f.isDirectory()) {
                // Calculate source file vos path
                String correspondingSourceVosPath
                        = this.getCorrespondingSourceVosPath(
                                sourceRootVosPath,
                                destinationRootVosPath,
                                f.getVirtualPath());

                Optional<FileInfo> sourceOpt = this.findFileInfoByVosPath(sourcesFileInfos,
                        correspondingSourceVosPath);

                FileInfo source = sourceOpt
                        .orElseThrow(() -> new IllegalStateException("Can't find file info for: "
                        + correspondingSourceVosPath + " in source files list"));

                if (source.getLocationId() != null && source.getLocationId() != uploadLocationId) {
                    // remote file
                    if (portalLocationUrls == null) {
                        portalLocationUrls = locationDAO.getPortalLocationUrls();
                    }
                    String url = portalLocationUrls.get(source.getLocationId());
                    // download file to destination disk path
                    this.downloadFileToDisk(source, f, principal, url);

                } else {
                    // local file
                    // copy file to destination disk path
                    this.copyLocalFile(source, f, principal);

                }

            }
        }

    }

    private String getCorrespondingSourceVosPath(String sourceRootVosPath,
            String destinationRootVosPath,
            String destinationVosPath) {
        return sourceRootVosPath
                + destinationVosPath.substring(destinationRootVosPath.length());
    }

    private Optional<FileInfo> findFileInfoByVosPath(List<FileInfo> list, String vosPath) {

        return list.stream().filter(i -> i.getVirtualPath().equals(vosPath)).findFirst();

    }

    private void downloadFileToDisk(FileInfo sourceFile,
            FileInfo destinationFile, TokenPrincipal tokenPrincipal, String baseUrl) {

        if (baseUrl == null) {
            LOG.error("Location URL not found for location " + sourceFile.getLocationId());
            throw new JobException(Type.FATAL, "Internal Fault")
                    .setErrorDetail("InternalFault: Unable to retrieve location of file " + sourceFile.getVirtualPath());
        }

        String url = baseUrl + "/" + sourceFile.getVirtualName();

        LOG.trace("Downloading file from " + url);

        restTemplate.execute(url, HttpMethod.GET, req -> {
            HttpHeaders headers = req.getHeaders();
            if (tokenPrincipal.getToken() != null) {
                headers.setBearerAuth(tokenPrincipal.getToken());
            }
        }, res -> {

            File outFile = new File(destinationFile.getOsPath());

            try (FileOutputStream os = new FileOutputStream(outFile)) {
                res.getBody().transferTo(os);
            } catch (Exception e) {
                outFile.delete();
                throw e;
            }

            return null;
        }, new Object[]{});
    }

    private void copyLocalFile(FileInfo sourceFileInfo,
            FileInfo destinationFileInfo, TokenPrincipal tokenPrincipal) {
        if (!authorizationService.isDownloadable(sourceFileInfo, tokenPrincipal)) {
            throw new JobException(Type.FATAL, "Permission Denied")
                    .setErrorDetail("PermissionDenied: " + sourceFileInfo.getVirtualPath());
        }

        File file = new File(sourceFileInfo.getOsPath());
        LOG.trace("Copying file: " + file.getAbsolutePath() + " to: "
                + destinationFileInfo.getOsPath());

        File sourceFile = new File(sourceFileInfo.getOsPath());
        File destinationFile = new File(destinationFileInfo.getOsPath());

        try {
            Files.copy(sourceFile.toPath(), destinationFile.toPath());
        } catch (IOException e) {
            if (Files.exists(destinationFile.toPath())) {
                destinationFile.delete();
            }

            throw new UncheckedIOException(e);
        } catch (Exception e) {
            if (Files.exists(destinationFile.toPath())) {
                destinationFile.delete();
            }

            throw e;

        }

    }
}