Loading transfer_service/retrieve_executor.py +10 −14 Original line number Diff line number Diff line Loading @@ -40,7 +40,9 @@ class RetrieveExecutor(TaskExecutor): self.jobId = None self.nodeList = [] self.fileList = [] self.numBlocks = None self.numBlocks = 0 self.procBlocks = 0 self.totalSize = 0 super(RetrieveExecutor, self).__init__() def buildFileList(self): Loading Loading @@ -98,6 +100,7 @@ class RetrieveExecutor(TaskExecutor): blockSize = 0 for fileInfo in self.fileList: fileSize = fileInfo["fileSize"] self.totalSize += fileSize # controlla se il file ha dimensione superiore alla dimensione di un blocco if fileSize > self.maxBlockSize: # chiudi il blocco corrente, se non vuoto, altrimenti usa quello? Loading @@ -107,15 +110,9 @@ class RetrieveExecutor(TaskExecutor): blockIdx += 1 fileInfo["blockIdx"] = blockIdx blockIdx += 1 #self.blockList.append(currentBlock.copy()) #currentBlock.clear() #currentBlock.append(fileInfo.copy()) else: fileInfo["blockIdx"] = blockIdx blockIdx += 1 #currentBlock.append(fileInfo.copy()) #self.blockList.append(currentBlock.copy()) #currentBlock.clear() blockSize = 0 else: # il file sta dentro un blocco, quindi controlla se Loading @@ -124,7 +121,6 @@ class RetrieveExecutor(TaskExecutor): if blockSize + fileSize <= self.maxBlockSize: # se si, aggiungi il file al blocco e vai avanti col prossimo fileInfo["blockIdx"] = blockIdx #currentBlock.append(fileInfo.copy()) blockSize += fileSize else: # se no, chiudi il blocco attuale e aggiungilo alla lista blocchi Loading @@ -132,13 +128,9 @@ class RetrieveExecutor(TaskExecutor): blockIdx += 1 fileInfo["blockIdx"] = blockIdx blockSize = fileSize #self.blockList.append(currentBlock.copy()) #currentBlock.clear() #blockSize = 0 #currentBlock.append(fileInfo.copy()) #blockSize += fileSize if self.fileList: self.numBlocks = blockIdx + 1 self.dbConn.setTotalBlocks(self.jobId, self.numBlocks) print(f"numBlocks = {self.numBlocks}") if os.path.exists("blocks.txt"): os.remove("blocks.txt") Loading Loading @@ -185,6 +177,8 @@ class RetrieveExecutor(TaskExecutor): self.tapeClient.disconnect() blockFileList.clear() self.procBlocks += 1 self.dbConn.updateProcessedBlocks(self.jobId, self.procBlocks) return True Loading @@ -204,7 +198,9 @@ class RetrieveExecutor(TaskExecutor): self.fileList.clear() self.nodeList.clear() self.storageType = None self.numBlocks = None self.numBlocks = 0 self.procBlocks = 0 self.totalSize = 0 def run(self): print("Starting retrieve executor...") Loading Loading
transfer_service/retrieve_executor.py +10 −14 Original line number Diff line number Diff line Loading @@ -40,7 +40,9 @@ class RetrieveExecutor(TaskExecutor): self.jobId = None self.nodeList = [] self.fileList = [] self.numBlocks = None self.numBlocks = 0 self.procBlocks = 0 self.totalSize = 0 super(RetrieveExecutor, self).__init__() def buildFileList(self): Loading Loading @@ -98,6 +100,7 @@ class RetrieveExecutor(TaskExecutor): blockSize = 0 for fileInfo in self.fileList: fileSize = fileInfo["fileSize"] self.totalSize += fileSize # controlla se il file ha dimensione superiore alla dimensione di un blocco if fileSize > self.maxBlockSize: # chiudi il blocco corrente, se non vuoto, altrimenti usa quello? Loading @@ -107,15 +110,9 @@ class RetrieveExecutor(TaskExecutor): blockIdx += 1 fileInfo["blockIdx"] = blockIdx blockIdx += 1 #self.blockList.append(currentBlock.copy()) #currentBlock.clear() #currentBlock.append(fileInfo.copy()) else: fileInfo["blockIdx"] = blockIdx blockIdx += 1 #currentBlock.append(fileInfo.copy()) #self.blockList.append(currentBlock.copy()) #currentBlock.clear() blockSize = 0 else: # il file sta dentro un blocco, quindi controlla se Loading @@ -124,7 +121,6 @@ class RetrieveExecutor(TaskExecutor): if blockSize + fileSize <= self.maxBlockSize: # se si, aggiungi il file al blocco e vai avanti col prossimo fileInfo["blockIdx"] = blockIdx #currentBlock.append(fileInfo.copy()) blockSize += fileSize else: # se no, chiudi il blocco attuale e aggiungilo alla lista blocchi Loading @@ -132,13 +128,9 @@ class RetrieveExecutor(TaskExecutor): blockIdx += 1 fileInfo["blockIdx"] = blockIdx blockSize = fileSize #self.blockList.append(currentBlock.copy()) #currentBlock.clear() #blockSize = 0 #currentBlock.append(fileInfo.copy()) #blockSize += fileSize if self.fileList: self.numBlocks = blockIdx + 1 self.dbConn.setTotalBlocks(self.jobId, self.numBlocks) print(f"numBlocks = {self.numBlocks}") if os.path.exists("blocks.txt"): os.remove("blocks.txt") Loading Loading @@ -185,6 +177,8 @@ class RetrieveExecutor(TaskExecutor): self.tapeClient.disconnect() blockFileList.clear() self.procBlocks += 1 self.dbConn.updateProcessedBlocks(self.jobId, self.procBlocks) return True Loading @@ -204,7 +198,9 @@ class RetrieveExecutor(TaskExecutor): self.fileList.clear() self.nodeList.clear() self.storageType = None self.numBlocks = None self.numBlocks = 0 self.procBlocks = 0 self.totalSize = 0 def run(self): print("Starting retrieve executor...") Loading