Loading transfer_service/db_connector.py +4 −2 Original line number Diff line number Diff line Loading @@ -171,7 +171,7 @@ class DbConnector(object): conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" SELECT storage_type, base_path, user_name, tstamp_wrapper_dir, get_os_path(n.node_id) AS os_path, content_length SELECT storage_type, base_path, user_name, tstamp_wrapper_dir, '/' || fs_path AS os_path, content_length FROM node n JOIN location l ON n.location_id = l.location_id JOIN storage s ON s.storage_id = l.storage_src_id Loading Loading @@ -1026,6 +1026,7 @@ class DbConnector(object): cursor.execute(""" INSERT INTO node(parent_path, parent_relative_path, fs_path, name, tstamp_wrapper_dir, type, Loading @@ -1036,13 +1037,14 @@ class DbConnector(object): creator_id, content_length, content_md5) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT DO NOTHING RETURNING node_id; """, (parentLtreePath, parentLtreeRelativePath, node.fsPath, node.name, node.wrapperDir, node.type, Loading transfer_service/import_executor.py +9 −3 Original line number Diff line number Diff line Loading @@ -71,7 +71,11 @@ class ImportExecutor(TaskExecutor): self.logger.info("++++++++++ Start of import phase ++++++++++") self.jobObj.setPhase("EXECUTING") self.jobObj.setStartTime(datetime.datetime.now().isoformat()) try: self.dbConn.insertJob(self.jobObj) except Exception: self.logger.exception("FATAL: unable to set job 'start_time' and 'phase'.") return False self.logger.info("Job phase updated to EXECUTING.") if self.storageType == "cold": Loading Loading @@ -103,14 +107,15 @@ class ImportExecutor(TaskExecutor): vospacePath = parentPath + nodeName else: vospacePath = parentPath + '/' + nodeName fsPath = vospacePath.split('/', 2)[-1] cnode.setParentPath(parentPath) cnode.setFsPath(fsPath) cnode.setLocationId(locationId) cnode.setJobId(self.jobId) cnode.setCreatorId(self.userId) cnode.setContentLength(0) cnode.setAsyncTrans(True) cnode.setSticky(True) try: now = datetime.datetime.now().isoformat() if os.path.islink(dir): Loading @@ -134,7 +139,9 @@ class ImportExecutor(TaskExecutor): parentPath = tstampWrapperDirPattern.sub("", parentPath) dnode.setWrapperDir(tstampWrapperDir) vospacePath = parentPath + '/' + nodeName fsPath = vospacePath.split('/', 2)[-1] dnode.setParentPath(parentPath) dnode.setFsPath(fsPath) dnode.setLocationId(locationId) dnode.setJobId(self.jobId) dnode.setCreatorId(self.userId) Loading @@ -142,7 +149,6 @@ class ImportExecutor(TaskExecutor): dnode.setContentMD5(self.md5calc.getMD5(file)) dnode.setAsyncTrans(True) dnode.setSticky(True) try: now = datetime.datetime.now().isoformat() if os.path.islink(file): Loading transfer_service/node.py +4 −0 Original line number Diff line number Diff line Loading @@ -14,6 +14,7 @@ class Node(object): def __init__(self, name, type): self.parentPath = None self.fsPath = None self.name = name self.wrapperDir = None self.type = type Loading Loading @@ -53,6 +54,9 @@ class Node(object): def setParentPath(self, parentPath): self.parentPath = parentPath def setFsPath(self, fsPath): self.fsPath = fsPath def setName(self, name): self.name = name Loading transfer_service/store_preprocessor.py +9 −9 Original line number Diff line number Diff line Loading @@ -152,15 +152,15 @@ class StorePreprocessor(TaskExecutor): tstampWrapperDir = tstampWrapperDirPattern.search(basePath).group(0).lstrip('/') basePath = tstampWrapperDirPattern.sub("", basePath) cnode.setWrapperDir(tstampWrapperDir) vospacePath = basePath + '/' + nodeName fsPath = vospacePath.split("/" + self.username + "/")[1] cnode.setParentPath(basePath) cnode.setFsPath(fsPath) cnode.setLocationId(locationId) cnode.setJobId(self.jobId) cnode.setCreatorId(self.userId) cnode.setContentLength(0) cnode.setSticky(True) vospacePath = basePath + '/' + nodeName try: now = datetime.datetime.now().isoformat() if os.path.islink(dir): Loading @@ -185,16 +185,16 @@ class StorePreprocessor(TaskExecutor): tstampWrapperDir = tstampWrapperDirPattern.search(basePath).group(0).lstrip('/') basePath = tstampWrapperDirPattern.sub("", basePath) dnode.setWrapperDir(tstampWrapperDir) vospacePath = basePath + '/' + nodeName fsPath = vospacePath.split("/" + self.username + "/")[1] dnode.setParentPath(basePath) dnode.setFsPath(fsPath) dnode.setLocationId(locationId) dnode.setJobId(self.jobId) dnode.setCreatorId(self.userId) dnode.setContentLength(os.path.getsize(file)) dnode.setContentMD5(self.md5calc.getMD5(file)) dnode.setSticky(True) vospacePath = basePath + '/' + nodeName try: now = datetime.datetime.now().isoformat() if os.path.islink(file): Loading transfer_service/transfer_service.py +5 −3 Original line number Diff line number Diff line Loading @@ -84,9 +84,11 @@ class TransferService(object): self.jobScheduler.start() self.vosRestHandler.start() self.cliHandler.start() self.logger.info("##########################################################") self.logger.info("########## VOSpace Transfer Service is RUNNING! ##########") self.logger.info("##########################################################") self.logger.info(""" ########################################################## ########## VOSpace Transfer Service is RUNNING! ########## ########################################################## """) #else: # print("The VOSpace Transfer Service requires super user privileges.") # sys.exit(1) Loading Loading
transfer_service/db_connector.py +4 −2 Original line number Diff line number Diff line Loading @@ -171,7 +171,7 @@ class DbConnector(object): conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" SELECT storage_type, base_path, user_name, tstamp_wrapper_dir, get_os_path(n.node_id) AS os_path, content_length SELECT storage_type, base_path, user_name, tstamp_wrapper_dir, '/' || fs_path AS os_path, content_length FROM node n JOIN location l ON n.location_id = l.location_id JOIN storage s ON s.storage_id = l.storage_src_id Loading Loading @@ -1026,6 +1026,7 @@ class DbConnector(object): cursor.execute(""" INSERT INTO node(parent_path, parent_relative_path, fs_path, name, tstamp_wrapper_dir, type, Loading @@ -1036,13 +1037,14 @@ class DbConnector(object): creator_id, content_length, content_md5) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT DO NOTHING RETURNING node_id; """, (parentLtreePath, parentLtreeRelativePath, node.fsPath, node.name, node.wrapperDir, node.type, Loading
transfer_service/import_executor.py +9 −3 Original line number Diff line number Diff line Loading @@ -71,7 +71,11 @@ class ImportExecutor(TaskExecutor): self.logger.info("++++++++++ Start of import phase ++++++++++") self.jobObj.setPhase("EXECUTING") self.jobObj.setStartTime(datetime.datetime.now().isoformat()) try: self.dbConn.insertJob(self.jobObj) except Exception: self.logger.exception("FATAL: unable to set job 'start_time' and 'phase'.") return False self.logger.info("Job phase updated to EXECUTING.") if self.storageType == "cold": Loading Loading @@ -103,14 +107,15 @@ class ImportExecutor(TaskExecutor): vospacePath = parentPath + nodeName else: vospacePath = parentPath + '/' + nodeName fsPath = vospacePath.split('/', 2)[-1] cnode.setParentPath(parentPath) cnode.setFsPath(fsPath) cnode.setLocationId(locationId) cnode.setJobId(self.jobId) cnode.setCreatorId(self.userId) cnode.setContentLength(0) cnode.setAsyncTrans(True) cnode.setSticky(True) try: now = datetime.datetime.now().isoformat() if os.path.islink(dir): Loading @@ -134,7 +139,9 @@ class ImportExecutor(TaskExecutor): parentPath = tstampWrapperDirPattern.sub("", parentPath) dnode.setWrapperDir(tstampWrapperDir) vospacePath = parentPath + '/' + nodeName fsPath = vospacePath.split('/', 2)[-1] dnode.setParentPath(parentPath) dnode.setFsPath(fsPath) dnode.setLocationId(locationId) dnode.setJobId(self.jobId) dnode.setCreatorId(self.userId) Loading @@ -142,7 +149,6 @@ class ImportExecutor(TaskExecutor): dnode.setContentMD5(self.md5calc.getMD5(file)) dnode.setAsyncTrans(True) dnode.setSticky(True) try: now = datetime.datetime.now().isoformat() if os.path.islink(file): Loading
transfer_service/node.py +4 −0 Original line number Diff line number Diff line Loading @@ -14,6 +14,7 @@ class Node(object): def __init__(self, name, type): self.parentPath = None self.fsPath = None self.name = name self.wrapperDir = None self.type = type Loading Loading @@ -53,6 +54,9 @@ class Node(object): def setParentPath(self, parentPath): self.parentPath = parentPath def setFsPath(self, fsPath): self.fsPath = fsPath def setName(self, name): self.name = name Loading
transfer_service/store_preprocessor.py +9 −9 Original line number Diff line number Diff line Loading @@ -152,15 +152,15 @@ class StorePreprocessor(TaskExecutor): tstampWrapperDir = tstampWrapperDirPattern.search(basePath).group(0).lstrip('/') basePath = tstampWrapperDirPattern.sub("", basePath) cnode.setWrapperDir(tstampWrapperDir) vospacePath = basePath + '/' + nodeName fsPath = vospacePath.split("/" + self.username + "/")[1] cnode.setParentPath(basePath) cnode.setFsPath(fsPath) cnode.setLocationId(locationId) cnode.setJobId(self.jobId) cnode.setCreatorId(self.userId) cnode.setContentLength(0) cnode.setSticky(True) vospacePath = basePath + '/' + nodeName try: now = datetime.datetime.now().isoformat() if os.path.islink(dir): Loading @@ -185,16 +185,16 @@ class StorePreprocessor(TaskExecutor): tstampWrapperDir = tstampWrapperDirPattern.search(basePath).group(0).lstrip('/') basePath = tstampWrapperDirPattern.sub("", basePath) dnode.setWrapperDir(tstampWrapperDir) vospacePath = basePath + '/' + nodeName fsPath = vospacePath.split("/" + self.username + "/")[1] dnode.setParentPath(basePath) dnode.setFsPath(fsPath) dnode.setLocationId(locationId) dnode.setJobId(self.jobId) dnode.setCreatorId(self.userId) dnode.setContentLength(os.path.getsize(file)) dnode.setContentMD5(self.md5calc.getMD5(file)) dnode.setSticky(True) vospacePath = basePath + '/' + nodeName try: now = datetime.datetime.now().isoformat() if os.path.islink(file): Loading
transfer_service/transfer_service.py +5 −3 Original line number Diff line number Diff line Loading @@ -84,9 +84,11 @@ class TransferService(object): self.jobScheduler.start() self.vosRestHandler.start() self.cliHandler.start() self.logger.info("##########################################################") self.logger.info("########## VOSpace Transfer Service is RUNNING! ##########") self.logger.info("##########################################################") self.logger.info(""" ########################################################## ########## VOSpace Transfer Service is RUNNING! ########## ########################################################## """) #else: # print("The VOSpace Transfer Service requires super user privileges.") # sys.exit(1) Loading