Loading transfer_service/retrieve_executor.py +76 −28 Original line number Diff line number Diff line #!/usr/bin/env python # # # This class is responsible to retrieve data from a generic storage point. # # The operations performed are the briefly summarized here below: # * obtain the storage type # * create a list of files to be retrieved (list of dictionaries) # * split the list in blocks of a fixed size # * loop on each block and retrieve data # - if the storage type is 'cold' (tape) perform a recall operation # before the copy and a migrate operation after the copy # - check if data associated to a VOSpace node has been copied # every time a block is retrieved # - recursively update the 'async_trans' flag # * cleanup # # import json import os Loading Loading @@ -46,14 +64,21 @@ class RetrieveExecutor(TaskExecutor): super(RetrieveExecutor, self).__init__() def buildFileList(self): """ Generates the list of all files to retrieve. """ self.dbConn.setPhase(self.jobId, "EXECUTING") self.dbConn.setStartTime(self.jobId) # debug block... if os.path.exists("nodeList.txt"): os.remove("nodeList.txt") nl = open("nodeList.txt", 'w') for vospacePath in self.nodeList: nl.write(vospacePath + '\n') nl.close() # Obtain the storage type self.storageType = self.dbConn.getOSPath(self.nodeList[0])["storageType"] for vospacePath in self.nodeList: nodeInfo = self.dbConn.getOSPath(vospacePath) Loading Loading @@ -87,7 +112,7 @@ class RetrieveExecutor(TaskExecutor): } self.fileList.append(fileInfo.copy()) # debug block... if os.path.exists("fileList.txt"): os.remove("fileList.txt") fl = open("fileList.txt", 'w') Loading @@ -95,17 +120,19 @@ class RetrieveExecutor(TaskExecutor): fl.close() def buildBlocks(self): """ Algorithm to split data in blocks of a well known size. """ if self.fileList: blockIdx = 0 blockSize = 0 for fileInfo in self.fileList: fileSize = fileInfo["fileSize"] self.totalSize += fileSize # controlla se il file ha dimensione superiore alla dimensione di un blocco # check if the file is larger than a block size if fileSize > self.maxBlockSize: # chiudi il blocco corrente, se non vuoto, altrimenti usa quello? # crea un nuovo blocco con solo il file in questione # chiudi il blocco e aggiungilo alla lista dei blocchi # if the current block is not empty, "close" it, otherwise # use it and then create a new block if blockSize > 0: blockIdx += 1 fileInfo["blockIdx"] = blockIdx Loading @@ -115,22 +142,26 @@ class RetrieveExecutor(TaskExecutor): blockIdx += 1 blockSize = 0 else: # il file sta dentro un blocco, quindi controlla se # la dimensione del file sommata al livello di riempimento # del blocco attuale eccede o meno la dimensione del blocco # the file can be contained by a block, so check if # the file size plus the current block fill is lower # than the maximum block size if blockSize + fileSize <= self.maxBlockSize: # se si, aggiungi il file al blocco e vai avanti col prossimo # if so, add the file to the block and go ahead with # the next one fileInfo["blockIdx"] = blockIdx blockSize += fileSize else: # se no, chiudi il blocco attuale e aggiungilo alla lista blocchi # crea un nuovo blocco, aggiungi il file e vai avanti col prossimo # if not, "close" the current block, add it to the block list, # then create a new block, add the file to it and go ahead # with the next one blockIdx += 1 fileInfo["blockIdx"] = blockIdx blockSize = fileSize if self.fileList: self.numBlocks = blockIdx + 1 self.dbConn.setTotalBlocks(self.jobId, self.numBlocks) # debug block... print(f"numBlocks = {self.numBlocks}") if os.path.exists("blocks.txt"): os.remove("blocks.txt") Loading @@ -139,16 +170,29 @@ class RetrieveExecutor(TaskExecutor): fl.close() def retrieveCompleted(self, vospacePath): """ Returns 'True' if all data associated to 'vospacePath' has been copied, otherwise it returns 'False'. """ return not any(vospacePath in f["vospaceRootParent"] for f in self.fileList) def retrieveData(self): """ Retrieves data from a generic storage point (hot or cold). """ # Loop on blocks for blockIdx in range(self.numBlocks): blockFileList = [ f for f in self.fileList if f["blockIdx"] == blockIdx ] # Recall all files from tape library to tape frontend # if the storage type is 'cold' if self.storageType == "cold": self.tapeClient.connect() self.tapeClient.recall([ f["fullPath"] for f in blockFileList ]) self.tapeClient.disconnect() # Loop on files in a block for fileInfo in blockFileList: srcPath = fileInfo["fullPath"] username = fileInfo["username"] Loading @@ -163,14 +207,20 @@ class RetrieveExecutor(TaskExecutor): if(sp.returncode or sp.stderr): return False # Remove files from file list at the end of the copy for fileInfo in blockFileList: if fileInfo in self.fileList: self.fileList.remove(fileInfo) # Check if the copy related to a certain VOSpace node # is completed and recursively update the 'async_trans' # flag for vospacePath in self.nodeList: if self.retrieveCompleted(vospacePath): self.dbConn.setAsyncTrans(vospacePath, False) # Empty the tape library frontend if the storage type # is 'cold' if self.storageType == "cold": self.tapeClient.connect() self.tapeClient.migrate([ f["fullPath"] for f in blockFileList ]) Loading @@ -179,10 +229,12 @@ class RetrieveExecutor(TaskExecutor): blockFileList.clear() self.procBlocks += 1 self.dbConn.updateProcessedBlocks(self.jobId, self.procBlocks) return True def updateJobStatus(self): """ Updates the job status. """ results = [{"target": ""}] results[0]["target"] = self.jobObj.jobInfo["transfer"]["target"] self.dbConn.setResults(self.jobId, results) Loading @@ -191,6 +243,9 @@ class RetrieveExecutor(TaskExecutor): self.dbConn.setEndTime(self.jobId) def cleanup(self): """ Cleanup method. """ nodeType = self.jobObj.jobInfo["transfer"]["protocols"][0]["param"][0]["value"] vospacePath = self.jobObj.jobInfo["transfer"]["target"].split("!vospace")[1] if nodeType == "list": Loading Loading @@ -218,6 +273,8 @@ class RetrieveExecutor(TaskExecutor): if result: self.updateJobStatus() self.cleanup() # debug block... print(f"fileList = {self.fileList}") print(f"nodeList = {self.nodeList}") else: Loading @@ -225,12 +282,3 @@ class RetrieveExecutor(TaskExecutor): self.destQueue.insertJob(self.jobObj) self.srcQueue.extractJob() print(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}") # Ottieni il tipo di storage # Cicla su ciascun blocco # -Se storage = cold => recall di tutti i file del blocco # -copia e rimuovi ciascun file del blocco # - alla fine della copia del blocco controlla se è stato completato qualche path vospace di cartella ( # (cioè se ciascuno dei vospacePath è presente o meno in uno dei campi vospaceRootParent) # -se si, aggiorna ricorsivamente i flag su quel path vospace # -Se storage = cold => migra tutti i file del blocco per fare spazio sul frontend Loading
transfer_service/retrieve_executor.py +76 −28 Original line number Diff line number Diff line #!/usr/bin/env python # # # This class is responsible to retrieve data from a generic storage point. # # The operations performed are the briefly summarized here below: # * obtain the storage type # * create a list of files to be retrieved (list of dictionaries) # * split the list in blocks of a fixed size # * loop on each block and retrieve data # - if the storage type is 'cold' (tape) perform a recall operation # before the copy and a migrate operation after the copy # - check if data associated to a VOSpace node has been copied # every time a block is retrieved # - recursively update the 'async_trans' flag # * cleanup # # import json import os Loading Loading @@ -46,14 +64,21 @@ class RetrieveExecutor(TaskExecutor): super(RetrieveExecutor, self).__init__() def buildFileList(self): """ Generates the list of all files to retrieve. """ self.dbConn.setPhase(self.jobId, "EXECUTING") self.dbConn.setStartTime(self.jobId) # debug block... if os.path.exists("nodeList.txt"): os.remove("nodeList.txt") nl = open("nodeList.txt", 'w') for vospacePath in self.nodeList: nl.write(vospacePath + '\n') nl.close() # Obtain the storage type self.storageType = self.dbConn.getOSPath(self.nodeList[0])["storageType"] for vospacePath in self.nodeList: nodeInfo = self.dbConn.getOSPath(vospacePath) Loading Loading @@ -87,7 +112,7 @@ class RetrieveExecutor(TaskExecutor): } self.fileList.append(fileInfo.copy()) # debug block... if os.path.exists("fileList.txt"): os.remove("fileList.txt") fl = open("fileList.txt", 'w') Loading @@ -95,17 +120,19 @@ class RetrieveExecutor(TaskExecutor): fl.close() def buildBlocks(self): """ Algorithm to split data in blocks of a well known size. """ if self.fileList: blockIdx = 0 blockSize = 0 for fileInfo in self.fileList: fileSize = fileInfo["fileSize"] self.totalSize += fileSize # controlla se il file ha dimensione superiore alla dimensione di un blocco # check if the file is larger than a block size if fileSize > self.maxBlockSize: # chiudi il blocco corrente, se non vuoto, altrimenti usa quello? # crea un nuovo blocco con solo il file in questione # chiudi il blocco e aggiungilo alla lista dei blocchi # if the current block is not empty, "close" it, otherwise # use it and then create a new block if blockSize > 0: blockIdx += 1 fileInfo["blockIdx"] = blockIdx Loading @@ -115,22 +142,26 @@ class RetrieveExecutor(TaskExecutor): blockIdx += 1 blockSize = 0 else: # il file sta dentro un blocco, quindi controlla se # la dimensione del file sommata al livello di riempimento # del blocco attuale eccede o meno la dimensione del blocco # the file can be contained by a block, so check if # the file size plus the current block fill is lower # than the maximum block size if blockSize + fileSize <= self.maxBlockSize: # se si, aggiungi il file al blocco e vai avanti col prossimo # if so, add the file to the block and go ahead with # the next one fileInfo["blockIdx"] = blockIdx blockSize += fileSize else: # se no, chiudi il blocco attuale e aggiungilo alla lista blocchi # crea un nuovo blocco, aggiungi il file e vai avanti col prossimo # if not, "close" the current block, add it to the block list, # then create a new block, add the file to it and go ahead # with the next one blockIdx += 1 fileInfo["blockIdx"] = blockIdx blockSize = fileSize if self.fileList: self.numBlocks = blockIdx + 1 self.dbConn.setTotalBlocks(self.jobId, self.numBlocks) # debug block... print(f"numBlocks = {self.numBlocks}") if os.path.exists("blocks.txt"): os.remove("blocks.txt") Loading @@ -139,16 +170,29 @@ class RetrieveExecutor(TaskExecutor): fl.close() def retrieveCompleted(self, vospacePath): """ Returns 'True' if all data associated to 'vospacePath' has been copied, otherwise it returns 'False'. """ return not any(vospacePath in f["vospaceRootParent"] for f in self.fileList) def retrieveData(self): """ Retrieves data from a generic storage point (hot or cold). """ # Loop on blocks for blockIdx in range(self.numBlocks): blockFileList = [ f for f in self.fileList if f["blockIdx"] == blockIdx ] # Recall all files from tape library to tape frontend # if the storage type is 'cold' if self.storageType == "cold": self.tapeClient.connect() self.tapeClient.recall([ f["fullPath"] for f in blockFileList ]) self.tapeClient.disconnect() # Loop on files in a block for fileInfo in blockFileList: srcPath = fileInfo["fullPath"] username = fileInfo["username"] Loading @@ -163,14 +207,20 @@ class RetrieveExecutor(TaskExecutor): if(sp.returncode or sp.stderr): return False # Remove files from file list at the end of the copy for fileInfo in blockFileList: if fileInfo in self.fileList: self.fileList.remove(fileInfo) # Check if the copy related to a certain VOSpace node # is completed and recursively update the 'async_trans' # flag for vospacePath in self.nodeList: if self.retrieveCompleted(vospacePath): self.dbConn.setAsyncTrans(vospacePath, False) # Empty the tape library frontend if the storage type # is 'cold' if self.storageType == "cold": self.tapeClient.connect() self.tapeClient.migrate([ f["fullPath"] for f in blockFileList ]) Loading @@ -179,10 +229,12 @@ class RetrieveExecutor(TaskExecutor): blockFileList.clear() self.procBlocks += 1 self.dbConn.updateProcessedBlocks(self.jobId, self.procBlocks) return True def updateJobStatus(self): """ Updates the job status. """ results = [{"target": ""}] results[0]["target"] = self.jobObj.jobInfo["transfer"]["target"] self.dbConn.setResults(self.jobId, results) Loading @@ -191,6 +243,9 @@ class RetrieveExecutor(TaskExecutor): self.dbConn.setEndTime(self.jobId) def cleanup(self): """ Cleanup method. """ nodeType = self.jobObj.jobInfo["transfer"]["protocols"][0]["param"][0]["value"] vospacePath = self.jobObj.jobInfo["transfer"]["target"].split("!vospace")[1] if nodeType == "list": Loading Loading @@ -218,6 +273,8 @@ class RetrieveExecutor(TaskExecutor): if result: self.updateJobStatus() self.cleanup() # debug block... print(f"fileList = {self.fileList}") print(f"nodeList = {self.nodeList}") else: Loading @@ -225,12 +282,3 @@ class RetrieveExecutor(TaskExecutor): self.destQueue.insertJob(self.jobObj) self.srcQueue.extractJob() print(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}") # Ottieni il tipo di storage # Cicla su ciascun blocco # -Se storage = cold => recall di tutti i file del blocco # -copia e rimuovi ciascun file del blocco # - alla fine della copia del blocco controlla se è stato completato qualche path vospace di cartella ( # (cioè se ciascuno dei vospacePath è presente o meno in uno dei campi vospaceRootParent) # -se si, aggiorna ricorsivamente i flag su quel path vospace # -Se storage = cold => migra tutti i file del blocco per fare spazio sul frontend