Loading transfer_service/retrieve_executor.py +136 −2 Original line number Diff line number Diff line #!/usr/bin/env python import json import os import subprocess import sys from checksum import Checksum from config import Config from db_connector import DbConnector from tape_client import TapeClient Loading @@ -29,17 +31,127 @@ class RetrieveExecutor(TaskExecutor): self.params["db"], 1, 1) self.storageType = None self.jobObj = None self.jobId = None self.nodeList = [] self.fileList = [] self.numBlocks = None self.maxBlockSize = 20 super(RetrieveExecutor, self).__init__() def buildFileList(self): if os.path.exists("nodeList.txt"): os.remove("nodeList.txt") nl = open("nodeList.txt", 'w') for vospacePath in self.nodeList: nl.write(vospacePath + '\n') nl.close() for vospacePath in self.nodeList: nodeInfo = self.dbConn.getOSPath(vospacePath) srcPath = nodeInfo["fullPath"] md5calc = Checksum() if os.path.isdir(srcPath): for root, dirs, files in os.walk(srcPath, topdown = False): for f in files: fullPath = os.path.join(root, f) if md5calc.fileIsValid(fullPath): fileSize = os.stat(fullPath).st_size fileInfo = { "fullPath": fullPath, "fileSize": fileSize, "vospaceRootParent": vospacePath } self.fileList.append(fileInfo.copy()) else: if md5calc.fileIsValid(srcPath): fileSize = nodeInfo["contentLength"] fileInfo = { "fullPath": srcPath, "fileSize": fileSize, "vospaceRootParent": vospacePath } self.fileList.append(fileInfo.copy()) if os.path.exists("fileList.txt"): os.remove("fileList.txt") fl = open("fileList.txt", 'w') fl.write(json.dumps(self.fileList, indent = 4)) fl.close() def buildBlocks(self): if self.fileList: blockIdx = 0 blockSize = 0 for fileInfo in self.fileList: fileSize = fileInfo["fileSize"] # controlla se il file ha dimensione superiore alla dimensione di un blocco if fileSize > self.maxBlockSize: # chiudi il blocco corrente, se non vuoto, altrimenti usa quello? # crea un nuovo blocco con solo il file in questione # chiudi il blocco e aggiungilo alla lista dei blocchi if blockSize > 0: blockIdx += 1 fileInfo["blockIdx"] = blockIdx blockIdx += 1 #self.blockList.append(currentBlock.copy()) #currentBlock.clear() #currentBlock.append(fileInfo.copy()) else: fileInfo["blockIdx"] = blockIdx blockIdx += 1 #currentBlock.append(fileInfo.copy()) #self.blockList.append(currentBlock.copy()) #currentBlock.clear() blockSize = 0 else: # il file sta dentro un blocco, quindi controlla se # la dimensione del file sommata al livello di riempimento # del blocco attuale eccede o meno la dimensione del blocco if blockSize + fileSize <= self.maxBlockSize: # se si, aggiungi il file al blocco e vai avanti col prossimo fileInfo["blockIdx"] = blockIdx #currentBlock.append(fileInfo.copy()) blockSize += fileSize else: # se no, chiudi il blocco attuale e aggiungilo alla lista blocchi # crea un nuovo blocco, aggiungi il file e vai avanti col prossimo blockIdx += 1 fileInfo["blockIdx"] = blockIdx blockSize = fileSize #self.blockList.append(currentBlock.copy()) #currentBlock.clear() #blockSize = 0 #currentBlock.append(fileInfo.copy()) #blockSize += fileSize if self.fileList: self.numBlocks = blockIdx + 1 print(f"numBlocks = {self.numBlocks}") if os.path.exists("blocks.txt"): os.remove("blocks.txt") fl = open("blocks.txt", 'w') fl.write(json.dumps(self.fileList, indent = 4)) fl.close() #blockList = [ block0 = [file0 = [], ..., fileJ = [] ], # ..., # blockN = [file0 = [], ..., fileK = [] ] # ] def prepareData(self): fileList = [] self.dbConn.setPhase(self.jobId, "EXECUTING") self.dbConn.setStartTime(self.jobId) for vospacePath in self.nodeList: [srcPath, storageType, username, osRelPath] = self.dbConn.getOSPath(vospacePath) #[srcPath, storageType, username, osRelPath] = self.dbConn.getOSPath(vospacePath) fileInfo = self.dbConn.getOSPath(vospacePath) srcPath = fileInfo["fullPath"] storageType = fileInfo["storageType"] username = fileInfo["username"] osRelPath = fileInfo["osPath"] fileSize = fileInfo["contentLength"] if storageType == "cold": if os.path.isdir(srcPath): for root, dirs, files in os.walk(srcPath): Loading @@ -54,7 +166,14 @@ class RetrieveExecutor(TaskExecutor): def retrieveData(self): for vospacePath in self.nodeList: [srcPath, storageType, username, osRelPath] = self.dbConn.getOSPath(vospacePath) #[srcPath, storageType, username, osRelPath] = self.dbConn.getOSPath(vospacePath) fileInfo = self.dbConn.getOSPath(vospacePath) srcPath = fileInfo["fullPath"] storageType = fileInfo["storageType"] username = fileInfo["username"] osRelPath = fileInfo["osPath"] fileSize = fileInfo["contentLength"] osRelParentPath = os.path.dirname(osRelPath) if osRelParentPath != "/": osRelParentPath += "/" Loading Loading @@ -91,6 +210,8 @@ class RetrieveExecutor(TaskExecutor): vospacePath = self.jobObj.jobInfo["transfer"]["target"].split("!vospace")[1] if nodeType == "list": self.dbConn.deleteTmpDataNode(vospacePath) self.fileList.clear() self.nodeList.clear() def run(self): print("Starting retrieve executor...") Loading @@ -102,13 +223,26 @@ class RetrieveExecutor(TaskExecutor): self.jobObj = self.srcQueue.getJob() self.jobId = self.jobObj.jobId self.nodeList = self.jobObj.jobInfo["nodeList"] self.buildFileList() self.buildBlocks() self.prepareData() result = self.retrieveData() if result: self.updateJobStatus() self.cleanup() print(f"fileList = {self.fileList}") print(f"nodeList = {self.nodeList}") else: sys.exit("Failed to retrieve data!") self.destQueue.insertJob(self.jobObj) self.srcQueue.extractJob() print(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}") # Ottieni il tipo di storage # Cicla su ciascun blocco # -Se storage = cold => recall di tutti i file del blocco # -copia e rimuovi ciascun file del blocco # - alla fine della copia del blocco controlla se è stato completato qualche path vospace di cartella ( # (cioè se ciascuno dei vospacePath è presente o meno in uno dei campi vospaceRootParent) # -se si, aggiorna ricorsivamente i flag su quel path vospace # -Se storage = cold => migra tutti i file del blocco per fare spazio sul frontend Loading
transfer_service/retrieve_executor.py +136 −2 Original line number Diff line number Diff line #!/usr/bin/env python import json import os import subprocess import sys from checksum import Checksum from config import Config from db_connector import DbConnector from tape_client import TapeClient Loading @@ -29,17 +31,127 @@ class RetrieveExecutor(TaskExecutor): self.params["db"], 1, 1) self.storageType = None self.jobObj = None self.jobId = None self.nodeList = [] self.fileList = [] self.numBlocks = None self.maxBlockSize = 20 super(RetrieveExecutor, self).__init__() def buildFileList(self): if os.path.exists("nodeList.txt"): os.remove("nodeList.txt") nl = open("nodeList.txt", 'w') for vospacePath in self.nodeList: nl.write(vospacePath + '\n') nl.close() for vospacePath in self.nodeList: nodeInfo = self.dbConn.getOSPath(vospacePath) srcPath = nodeInfo["fullPath"] md5calc = Checksum() if os.path.isdir(srcPath): for root, dirs, files in os.walk(srcPath, topdown = False): for f in files: fullPath = os.path.join(root, f) if md5calc.fileIsValid(fullPath): fileSize = os.stat(fullPath).st_size fileInfo = { "fullPath": fullPath, "fileSize": fileSize, "vospaceRootParent": vospacePath } self.fileList.append(fileInfo.copy()) else: if md5calc.fileIsValid(srcPath): fileSize = nodeInfo["contentLength"] fileInfo = { "fullPath": srcPath, "fileSize": fileSize, "vospaceRootParent": vospacePath } self.fileList.append(fileInfo.copy()) if os.path.exists("fileList.txt"): os.remove("fileList.txt") fl = open("fileList.txt", 'w') fl.write(json.dumps(self.fileList, indent = 4)) fl.close() def buildBlocks(self): if self.fileList: blockIdx = 0 blockSize = 0 for fileInfo in self.fileList: fileSize = fileInfo["fileSize"] # controlla se il file ha dimensione superiore alla dimensione di un blocco if fileSize > self.maxBlockSize: # chiudi il blocco corrente, se non vuoto, altrimenti usa quello? # crea un nuovo blocco con solo il file in questione # chiudi il blocco e aggiungilo alla lista dei blocchi if blockSize > 0: blockIdx += 1 fileInfo["blockIdx"] = blockIdx blockIdx += 1 #self.blockList.append(currentBlock.copy()) #currentBlock.clear() #currentBlock.append(fileInfo.copy()) else: fileInfo["blockIdx"] = blockIdx blockIdx += 1 #currentBlock.append(fileInfo.copy()) #self.blockList.append(currentBlock.copy()) #currentBlock.clear() blockSize = 0 else: # il file sta dentro un blocco, quindi controlla se # la dimensione del file sommata al livello di riempimento # del blocco attuale eccede o meno la dimensione del blocco if blockSize + fileSize <= self.maxBlockSize: # se si, aggiungi il file al blocco e vai avanti col prossimo fileInfo["blockIdx"] = blockIdx #currentBlock.append(fileInfo.copy()) blockSize += fileSize else: # se no, chiudi il blocco attuale e aggiungilo alla lista blocchi # crea un nuovo blocco, aggiungi il file e vai avanti col prossimo blockIdx += 1 fileInfo["blockIdx"] = blockIdx blockSize = fileSize #self.blockList.append(currentBlock.copy()) #currentBlock.clear() #blockSize = 0 #currentBlock.append(fileInfo.copy()) #blockSize += fileSize if self.fileList: self.numBlocks = blockIdx + 1 print(f"numBlocks = {self.numBlocks}") if os.path.exists("blocks.txt"): os.remove("blocks.txt") fl = open("blocks.txt", 'w') fl.write(json.dumps(self.fileList, indent = 4)) fl.close() #blockList = [ block0 = [file0 = [], ..., fileJ = [] ], # ..., # blockN = [file0 = [], ..., fileK = [] ] # ] def prepareData(self): fileList = [] self.dbConn.setPhase(self.jobId, "EXECUTING") self.dbConn.setStartTime(self.jobId) for vospacePath in self.nodeList: [srcPath, storageType, username, osRelPath] = self.dbConn.getOSPath(vospacePath) #[srcPath, storageType, username, osRelPath] = self.dbConn.getOSPath(vospacePath) fileInfo = self.dbConn.getOSPath(vospacePath) srcPath = fileInfo["fullPath"] storageType = fileInfo["storageType"] username = fileInfo["username"] osRelPath = fileInfo["osPath"] fileSize = fileInfo["contentLength"] if storageType == "cold": if os.path.isdir(srcPath): for root, dirs, files in os.walk(srcPath): Loading @@ -54,7 +166,14 @@ class RetrieveExecutor(TaskExecutor): def retrieveData(self): for vospacePath in self.nodeList: [srcPath, storageType, username, osRelPath] = self.dbConn.getOSPath(vospacePath) #[srcPath, storageType, username, osRelPath] = self.dbConn.getOSPath(vospacePath) fileInfo = self.dbConn.getOSPath(vospacePath) srcPath = fileInfo["fullPath"] storageType = fileInfo["storageType"] username = fileInfo["username"] osRelPath = fileInfo["osPath"] fileSize = fileInfo["contentLength"] osRelParentPath = os.path.dirname(osRelPath) if osRelParentPath != "/": osRelParentPath += "/" Loading Loading @@ -91,6 +210,8 @@ class RetrieveExecutor(TaskExecutor): vospacePath = self.jobObj.jobInfo["transfer"]["target"].split("!vospace")[1] if nodeType == "list": self.dbConn.deleteTmpDataNode(vospacePath) self.fileList.clear() self.nodeList.clear() def run(self): print("Starting retrieve executor...") Loading @@ -102,13 +223,26 @@ class RetrieveExecutor(TaskExecutor): self.jobObj = self.srcQueue.getJob() self.jobId = self.jobObj.jobId self.nodeList = self.jobObj.jobInfo["nodeList"] self.buildFileList() self.buildBlocks() self.prepareData() result = self.retrieveData() if result: self.updateJobStatus() self.cleanup() print(f"fileList = {self.fileList}") print(f"nodeList = {self.nodeList}") else: sys.exit("Failed to retrieve data!") self.destQueue.insertJob(self.jobObj) self.srcQueue.extractJob() print(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}") # Ottieni il tipo di storage # Cicla su ciascun blocco # -Se storage = cold => recall di tutti i file del blocco # -copia e rimuovi ciascun file del blocco # - alla fine della copia del blocco controlla se è stato completato qualche path vospace di cartella ( # (cioè se ciascuno dei vospacePath è presente o meno in uno dei campi vospaceRootParent) # -se si, aggiorna ricorsivamente i flag su quel path vospace # -Se storage = cold => migra tutti i file del blocco per fare spazio sul frontend