Loading transfer_service/retrieve_executor.py +20 −1 Original line number Diff line number Diff line Loading @@ -7,6 +7,7 @@ from db_connector import DbConnector from tape_client import TapeClient from task_executor import TaskExecutor class RetrieveExecutor(TaskExecutor): def __init__(self): Loading @@ -29,9 +30,26 @@ class RetrieveExecutor(TaskExecutor): self.nodeList = [] super(RetrieveExecutor, self).__init__() def retrieveData(self): def prepareData(self): fileList = [] self.dbConn.connect() self.dbConn.setPhase(self.jobId, "EXECUTING") for vospacePath in self.nodeList: [srcPath, storageType, username, osRelPath] = self.dbConn.getOSPath(vospacePath) if storageType == "cold": if os.path.isdir(srcPath): for root, dirs, files in os.walk(srcPath): for f in files: fileList.append(os.path.join(root, f)) else: fileList.append(srcPath) self.dbConn.disconnect() if fileList: self.tapeClient.recall(fileList) def retrieveData(self): self.dbConn.connect() #self.dbConn.setPhase(self.jobId, "EXECUTING") for vospacePath in self.nodeList: [srcPath, storageType, username, osRelPath] = self.dbConn.getOSPath(vospacePath) osRelParentPath = os.path.dirname(osRelPath) Loading Loading @@ -90,6 +108,7 @@ class RetrieveExecutor(TaskExecutor): self.jobObj = self.srcQueue.getJob() self.jobId = self.jobObj.jobId self.nodeList = self.jobObj.jobInfo["nodeList"] self.prepareData() result = self.retrieveData() if result: self.updateJobStatus() Loading transfer_service/tape_client.py +11 −0 Original line number Diff line number Diff line Loading @@ -3,6 +3,7 @@ import os import paramiko import scp import sys import uuid from tape_task import TapeTask Loading Loading @@ -69,6 +70,16 @@ class TapeClient(object): sys.exit("FATAL: invalid file/dir.") self.scp.close() def recall(self, fileList): #TODO cmd = "eeadm recall" tmp = str(uuid.uuid1().hex) + ".tmp" stdin, stdout, stderr = self.client.exec_command(cmd) def recallChecksumFiles(self, dirName): cmd = "find $(dirname " + dirName + ") -type f \( -iname \"*-md5sum.txt\" \) | eeadm recall" stdin, stdout, stderr = self.client.exec_command(cmd) # Closes the connection def disconnect(self): self.client.close() Loading Loading
transfer_service/retrieve_executor.py +20 −1 Original line number Diff line number Diff line Loading @@ -7,6 +7,7 @@ from db_connector import DbConnector from tape_client import TapeClient from task_executor import TaskExecutor class RetrieveExecutor(TaskExecutor): def __init__(self): Loading @@ -29,9 +30,26 @@ class RetrieveExecutor(TaskExecutor): self.nodeList = [] super(RetrieveExecutor, self).__init__() def retrieveData(self): def prepareData(self): fileList = [] self.dbConn.connect() self.dbConn.setPhase(self.jobId, "EXECUTING") for vospacePath in self.nodeList: [srcPath, storageType, username, osRelPath] = self.dbConn.getOSPath(vospacePath) if storageType == "cold": if os.path.isdir(srcPath): for root, dirs, files in os.walk(srcPath): for f in files: fileList.append(os.path.join(root, f)) else: fileList.append(srcPath) self.dbConn.disconnect() if fileList: self.tapeClient.recall(fileList) def retrieveData(self): self.dbConn.connect() #self.dbConn.setPhase(self.jobId, "EXECUTING") for vospacePath in self.nodeList: [srcPath, storageType, username, osRelPath] = self.dbConn.getOSPath(vospacePath) osRelParentPath = os.path.dirname(osRelPath) Loading Loading @@ -90,6 +108,7 @@ class RetrieveExecutor(TaskExecutor): self.jobObj = self.srcQueue.getJob() self.jobId = self.jobObj.jobId self.nodeList = self.jobObj.jobInfo["nodeList"] self.prepareData() result = self.retrieveData() if result: self.updateJobStatus() Loading
transfer_service/tape_client.py +11 −0 Original line number Diff line number Diff line Loading @@ -3,6 +3,7 @@ import os import paramiko import scp import sys import uuid from tape_task import TapeTask Loading Loading @@ -69,6 +70,16 @@ class TapeClient(object): sys.exit("FATAL: invalid file/dir.") self.scp.close() def recall(self, fileList): #TODO cmd = "eeadm recall" tmp = str(uuid.uuid1().hex) + ".tmp" stdin, stdout, stderr = self.client.exec_command(cmd) def recallChecksumFiles(self, dirName): cmd = "find $(dirname " + dirName + ") -type f \( -iname \"*-md5sum.txt\" \) | eeadm recall" stdin, stdout, stderr = self.client.exec_command(cmd) # Closes the connection def disconnect(self): self.client.close() Loading