Loading transfer_service/import_executor.py +8 −2 Original line number Diff line number Diff line Loading @@ -93,7 +93,10 @@ class ImportExecutor(TaskExecutor): cnode.setAsyncTrans(True) cnode.setSticky(True) if self.dbConn.insertNode(cnode): if os.path.islink(dir): now = dt.now() nodeList.append([ now, dir, vospacePath, "container", "SYMLINK" ]) elif self.dbConn.insertNode(cnode): now = dt.now() nodeList.append([ now, dir, vospacePath, "container", "DONE" ]) else: Loading Loading @@ -130,7 +133,10 @@ class ImportExecutor(TaskExecutor): dnode.setAsyncTrans(True) dnode.setSticky(True) if self.dbConn.insertNode(dnode): if os.path.islink(file): now = dt.now() nodeList.append([ now, file, vospacePath, "data", "SYMLINK" ]) elif self.dbConn.insertNode(dnode): now = dt.now() nodeList.append([ now, file, vospacePath, "data", "DONE" ]) else: Loading transfer_service/store_preprocessor.py +32 −20 Original line number Diff line number Diff line Loading @@ -113,26 +113,26 @@ class StorePreprocessor(TaskExecutor): [ dirs, files ] = self.systemUtils.scanRecursive(self.path) # File catalog update out = open("store_preprocessor_log.txt", "a") #out = open("store_preprocessor_log.txt", "a") self.userId = self.dbConn.getUserId(self.username) out.write(f"USER: {self.username}\n") out.write(f"USER_ID: {self.userId}\n") #out.write(f"USER: {self.username}\n") #out.write(f"USER_ID: {self.userId}\n") pathPrefix = self.storageStorePath.replace("{username}", self.username) tstampWrapperDirPattern = re.compile("/[0-9]{4}_[0-9]{2}_[0-9]{2}-[0-9]{2}_[0-9]{2}_[0-9]{2}-vos_wrapper") for dir in dirs: out.write(f"DIR: {dir}\n") out.write(f"pathPrefix: {pathPrefix}\n") #out.write(f"DIR: {dir}\n") #out.write(f"pathPrefix: {pathPrefix}\n") basePath = os.path.dirname(dir).replace(pathPrefix, "/" + self.username) out.write(f"basePath: {basePath}\n") #out.write(f"basePath: {basePath}\n") nodeName = os.path.basename(dir) out.write(f"nodeName: {nodeName}\n") #out.write(f"nodeName: {nodeName}\n") cnode = Node(nodeName, "container") if not tstampWrapperDirPattern.match("/" + nodeName): if tstampWrapperDirPattern.search(basePath): tstampWrapperDir = tstampWrapperDirPattern.search(basePath).group(0).lstrip('/') out.write(f"tstampWrapperDir: {tstampWrapperDir}\n") #out.write(f"tstampWrapperDir: {tstampWrapperDir}\n") basePath = tstampWrapperDirPattern.sub("", basePath) out.write(f"newBasePath: {basePath}\n") #out.write(f"newBasePath: {basePath}\n") cnode.setWrapperDir(tstampWrapperDir) cnode.setParentPath(basePath) locationId = self.dbConn.getLocationId(self.storageId) Loading @@ -142,26 +142,32 @@ class StorePreprocessor(TaskExecutor): cnode.setContentLength(0) cnode.setSticky(True) if self.dbConn.insertNode(cnode): if os.path.islink(dir): # node is a symlink, do not import it... pass elif self.dbConn.insertNode(cnode): self.nodeList.append(basePath + '/' + nodeName) else: # node already exists, skip it... pass out.write("\n\n") #out.write("\n\n") for flist in files: for file in flist: out.write(f"FILE: {file}\n") #out.write(f"FILE: {file}\n") if self.md5calc.fileIsValid(file): out.write(f"pathPrefix: {pathPrefix}\n") #out.write(f"pathPrefix: {pathPrefix}\n") basePath = os.path.dirname(file).replace(pathPrefix, "/" + self.username) out.write(f"basePath: {basePath}\n") #out.write(f"basePath: {basePath}\n") nodeName = os.path.basename(file) out.write(f"nodeName: {nodeName}\n") #out.write(f"nodeName: {nodeName}\n") dnode = Node(nodeName, "data") if tstampWrapperDirPattern.search(basePath): tstampWrapperDir = tstampWrapperDirPattern.search(basePath).group(0).lstrip('/') out.write(f"tstampWrapperDir: {tstampWrapperDir}\n") #out.write(f"tstampWrapperDir: {tstampWrapperDir}\n") basePath = tstampWrapperDirPattern.sub("", basePath) out.write(f"newBasePath: {basePath}\n") #out.write(f"newBasePath: {basePath}\n") dnode.setWrapperDir(tstampWrapperDir) dnode.setParentPath(basePath) locationId = self.dbConn.getLocationId(self.storageId) Loading @@ -172,11 +178,17 @@ class StorePreprocessor(TaskExecutor): dnode.setContentMD5(self.md5calc.getMD5(file)) dnode.setSticky(True) if self.dbConn.insertNode(dnode): if os.path.islink(file): # node is a symlink, do not import it... pass elif self.dbConn.insertNode(dnode): self.nodeList.append(basePath + '/' + nodeName) else: # node already exists, skip it... pass out.write("\n") out.close() #out.write("\n") #out.close() def update(self): self.jobObj.setPhase("QUEUED") Loading Loading
transfer_service/import_executor.py +8 −2 Original line number Diff line number Diff line Loading @@ -93,7 +93,10 @@ class ImportExecutor(TaskExecutor): cnode.setAsyncTrans(True) cnode.setSticky(True) if self.dbConn.insertNode(cnode): if os.path.islink(dir): now = dt.now() nodeList.append([ now, dir, vospacePath, "container", "SYMLINK" ]) elif self.dbConn.insertNode(cnode): now = dt.now() nodeList.append([ now, dir, vospacePath, "container", "DONE" ]) else: Loading Loading @@ -130,7 +133,10 @@ class ImportExecutor(TaskExecutor): dnode.setAsyncTrans(True) dnode.setSticky(True) if self.dbConn.insertNode(dnode): if os.path.islink(file): now = dt.now() nodeList.append([ now, file, vospacePath, "data", "SYMLINK" ]) elif self.dbConn.insertNode(dnode): now = dt.now() nodeList.append([ now, file, vospacePath, "data", "DONE" ]) else: Loading
transfer_service/store_preprocessor.py +32 −20 Original line number Diff line number Diff line Loading @@ -113,26 +113,26 @@ class StorePreprocessor(TaskExecutor): [ dirs, files ] = self.systemUtils.scanRecursive(self.path) # File catalog update out = open("store_preprocessor_log.txt", "a") #out = open("store_preprocessor_log.txt", "a") self.userId = self.dbConn.getUserId(self.username) out.write(f"USER: {self.username}\n") out.write(f"USER_ID: {self.userId}\n") #out.write(f"USER: {self.username}\n") #out.write(f"USER_ID: {self.userId}\n") pathPrefix = self.storageStorePath.replace("{username}", self.username) tstampWrapperDirPattern = re.compile("/[0-9]{4}_[0-9]{2}_[0-9]{2}-[0-9]{2}_[0-9]{2}_[0-9]{2}-vos_wrapper") for dir in dirs: out.write(f"DIR: {dir}\n") out.write(f"pathPrefix: {pathPrefix}\n") #out.write(f"DIR: {dir}\n") #out.write(f"pathPrefix: {pathPrefix}\n") basePath = os.path.dirname(dir).replace(pathPrefix, "/" + self.username) out.write(f"basePath: {basePath}\n") #out.write(f"basePath: {basePath}\n") nodeName = os.path.basename(dir) out.write(f"nodeName: {nodeName}\n") #out.write(f"nodeName: {nodeName}\n") cnode = Node(nodeName, "container") if not tstampWrapperDirPattern.match("/" + nodeName): if tstampWrapperDirPattern.search(basePath): tstampWrapperDir = tstampWrapperDirPattern.search(basePath).group(0).lstrip('/') out.write(f"tstampWrapperDir: {tstampWrapperDir}\n") #out.write(f"tstampWrapperDir: {tstampWrapperDir}\n") basePath = tstampWrapperDirPattern.sub("", basePath) out.write(f"newBasePath: {basePath}\n") #out.write(f"newBasePath: {basePath}\n") cnode.setWrapperDir(tstampWrapperDir) cnode.setParentPath(basePath) locationId = self.dbConn.getLocationId(self.storageId) Loading @@ -142,26 +142,32 @@ class StorePreprocessor(TaskExecutor): cnode.setContentLength(0) cnode.setSticky(True) if self.dbConn.insertNode(cnode): if os.path.islink(dir): # node is a symlink, do not import it... pass elif self.dbConn.insertNode(cnode): self.nodeList.append(basePath + '/' + nodeName) else: # node already exists, skip it... pass out.write("\n\n") #out.write("\n\n") for flist in files: for file in flist: out.write(f"FILE: {file}\n") #out.write(f"FILE: {file}\n") if self.md5calc.fileIsValid(file): out.write(f"pathPrefix: {pathPrefix}\n") #out.write(f"pathPrefix: {pathPrefix}\n") basePath = os.path.dirname(file).replace(pathPrefix, "/" + self.username) out.write(f"basePath: {basePath}\n") #out.write(f"basePath: {basePath}\n") nodeName = os.path.basename(file) out.write(f"nodeName: {nodeName}\n") #out.write(f"nodeName: {nodeName}\n") dnode = Node(nodeName, "data") if tstampWrapperDirPattern.search(basePath): tstampWrapperDir = tstampWrapperDirPattern.search(basePath).group(0).lstrip('/') out.write(f"tstampWrapperDir: {tstampWrapperDir}\n") #out.write(f"tstampWrapperDir: {tstampWrapperDir}\n") basePath = tstampWrapperDirPattern.sub("", basePath) out.write(f"newBasePath: {basePath}\n") #out.write(f"newBasePath: {basePath}\n") dnode.setWrapperDir(tstampWrapperDir) dnode.setParentPath(basePath) locationId = self.dbConn.getLocationId(self.storageId) Loading @@ -172,11 +178,17 @@ class StorePreprocessor(TaskExecutor): dnode.setContentMD5(self.md5calc.getMD5(file)) dnode.setSticky(True) if self.dbConn.insertNode(dnode): if os.path.islink(file): # node is a symlink, do not import it... pass elif self.dbConn.insertNode(dnode): self.nodeList.append(basePath + '/' + nodeName) else: # node already exists, skip it... pass out.write("\n") out.close() #out.write("\n") #out.close() def update(self): self.jobObj.setPhase("QUEUED") Loading