Loading transfer_service/store_preprocessor.py +110 −124 Original line number Diff line number Diff line #!/usr/bin/env python # # TODO: # - interface with the file catalog (insert) # - data and permissions cleanup # import json import logging Loading @@ -11,6 +6,7 @@ import os import re import shutil import sys import time from datetime import datetime as dt Loading Loading @@ -69,8 +65,10 @@ class StorePreprocessor(TaskExecutor): super(StorePreprocessor, self).__init__() def prepare(self, username): self.logger.info("File permissions setup") self.username = username self.path = "/home/" + username + "/store" #self.path = "/home/" + username + "/store" self.path = self.storageStorePath.replace("{username}", self.username) for folder, subfolders, files in os.walk(self.path): os.chown(folder, 0, 0) os.chmod(folder, 0o555) Loading @@ -82,24 +80,28 @@ class StorePreprocessor(TaskExecutor): os.chmod(os.path.join(folder, f), 0o555) def execute(self): try: self.logger.info("========== Start of preprocessing phase ==========") # First scan to find crowded dirs self.logger.info("Searching for 'crowded' dirs") [ dirs, files ] = self.systemUtils.scan(self.path) # Create a .tar for all dirs matching the constraints, if any self.logger.info("Starting .tar file generation for 'crowded' dirs (if any)") for dir in dirs: self.fileGrouper.recursive(self.path + '/' + dir) # Second scan after file grouper execution self.logger.info("First-level scan of the 'store' directory") [ dirs, files ] = self.systemUtils.scan(self.path) timestamp = dt.now().strftime("%Y_%m_%d-%H_%M_%S") # Case 1: /home/user/store contains both files and dirs if files and dirs: self.logger.debug("The 'store' directory contains both files and dirs") destPath = self.path + '/' + timestamp + "-vos_wrapper" try: os.mkdir(destPath) except OSError as error: sys.exit(f"FATAL: {error}") for file in files: srcPath = self.path + '/' + file shutil.move(srcPath, destPath) Loading @@ -109,51 +111,38 @@ class StorePreprocessor(TaskExecutor): self.md5calc.recursive(destPath) # Case 2: /home/user/store contains only files elif files and not dirs: self.logger.debug("The 'store' directory contains only files") destPath = self.path + '/' + timestamp + "-vos_wrapper" try: os.mkdir(destPath) except OSError as error: sys.exit(f"FATAL: {error}") for file in files: srcPath = self.path + '/' + file shutil.move(srcPath, destPath) self.md5calc.recursive(destPath) # Case 3: /home/user/store contains only dirs elif not files and dirs: self.logger.debug("The 'store' directory contains only dirs") for dir in dirs: destPath = self.path + '/' + dir self.md5calc.recursive(destPath) # Case 4: /home/user/store is empty # Case 4: /home/user/store is empty (this should be handled by data_rpc_server.py) else: sys.exit("The 'store' directory is empty.") self.logger.critical("FATAL: the 'store' directory is empty, the application MUST to be restarted.") # Third scan after directory structure 'check & repair' self.logger.info("Recursive scan of the 'store' directory") [ dirs, files ] = self.systemUtils.scanRecursive(self.path) # File catalog update #out = open("store_preprocessor_log.txt", "a") #self.userId = self.dbConn.getUserId(self.username) self.userId = self.jobObj.ownerId #out.write(f"USER: {self.username}\n") #out.write(f"USER_ID: {self.userId}\n") self.logger.info("Checksum calculation and file catalog update") pathPrefix = self.storageStorePath.replace("{username}", self.username) tstampWrapperDirPattern = re.compile("/[0-9]{4}_[0-9]{2}_[0-9]{2}-[0-9]{2}_[0-9]{2}_[0-9]{2}-vos_wrapper") for dir in dirs: #out.write(f"DIR: {dir}\n") #out.write(f"pathPrefix: {pathPrefix}\n") basePath = os.path.dirname(dir).replace(pathPrefix, "/" + self.username) #out.write(f"basePath: {basePath}\n") nodeName = os.path.basename(dir) #out.write(f"nodeName: {nodeName}\n") cnode = Node(nodeName, "container") if not tstampWrapperDirPattern.match("/" + nodeName): if tstampWrapperDirPattern.search(basePath): tstampWrapperDir = tstampWrapperDirPattern.search(basePath).group(0).lstrip('/') #out.write(f"tstampWrapperDir: {tstampWrapperDir}\n") basePath = tstampWrapperDirPattern.sub("", basePath) #out.write(f"newBasePath: {basePath}\n") cnode.setWrapperDir(tstampWrapperDir) cnode.setParentPath(basePath) locationId = self.dbConn.getLocationId(self.storageId) Loading @@ -177,23 +166,15 @@ class StorePreprocessor(TaskExecutor): now = dt.now().isoformat() self.nodeList.append([ now, dir, vospacePath, "container", "SKIPPED" ]) #out.write("\n\n") for flist in files: for file in flist: #out.write(f"FILE: {file}\n") if self.md5calc.fileIsValid(file): #out.write(f"pathPrefix: {pathPrefix}\n") basePath = os.path.dirname(file).replace(pathPrefix, "/" + self.username) #out.write(f"basePath: {basePath}\n") nodeName = os.path.basename(file) #out.write(f"nodeName: {nodeName}\n") dnode = Node(nodeName, "data") if tstampWrapperDirPattern.search(basePath): tstampWrapperDir = tstampWrapperDirPattern.search(basePath).group(0).lstrip('/') #out.write(f"tstampWrapperDir: {tstampWrapperDir}\n") basePath = tstampWrapperDirPattern.sub("", basePath) #out.write(f"newBasePath: {basePath}\n") dnode.setWrapperDir(tstampWrapperDir) dnode.setParentPath(basePath) locationId = self.dbConn.getLocationId(self.storageId) Loading @@ -217,13 +198,16 @@ class StorePreprocessor(TaskExecutor): # node already exists, skip it... now = dt.now().isoformat() self.nodeList.append([ now, file, vospacePath, "data", "SKIPPED" ]) self.logger.info("Overall data size calculation") self.jobObj.jobInfo["dataSize"] = self.systemUtils.getSize(self.path) #out.write("\n") #out.close() self.logger.info("========== End of preprocessing phase ==========") except Exception: self.logger.exception("FATAL: something went wrong during the preprocessing phase.") time.sleep(5) sys.exit(1) def update(self): self.logger.info("Job phase updated to QUEUED") self.jobObj.setPhase("QUEUED") self.dbConn.setPhase(self.jobId, "QUEUED") Loading @@ -240,6 +224,7 @@ class StorePreprocessor(TaskExecutor): your job has been QUEUED. Job ID: {self.jobId} Job type: {self.jobObj.type} Storage type: {self.storageType} Storage ID: {self.storageId} Owner ID: {self.jobObj.ownerId} Loading @@ -261,6 +246,7 @@ class StorePreprocessor(TaskExecutor): self.jobId = self.jobObj.jobId self.storageId = self.jobObj.jobInfo["storageId"] self.storageType = self.jobObj.jobInfo["storageType"] self.userId = self.jobObj.ownerId self.username = self.jobObj.jobInfo["userName"] self.prepare(self.username) self.execute() Loading Loading
transfer_service/store_preprocessor.py +110 −124 Original line number Diff line number Diff line #!/usr/bin/env python # # TODO: # - interface with the file catalog (insert) # - data and permissions cleanup # import json import logging Loading @@ -11,6 +6,7 @@ import os import re import shutil import sys import time from datetime import datetime as dt Loading Loading @@ -69,8 +65,10 @@ class StorePreprocessor(TaskExecutor): super(StorePreprocessor, self).__init__() def prepare(self, username): self.logger.info("File permissions setup") self.username = username self.path = "/home/" + username + "/store" #self.path = "/home/" + username + "/store" self.path = self.storageStorePath.replace("{username}", self.username) for folder, subfolders, files in os.walk(self.path): os.chown(folder, 0, 0) os.chmod(folder, 0o555) Loading @@ -82,24 +80,28 @@ class StorePreprocessor(TaskExecutor): os.chmod(os.path.join(folder, f), 0o555) def execute(self): try: self.logger.info("========== Start of preprocessing phase ==========") # First scan to find crowded dirs self.logger.info("Searching for 'crowded' dirs") [ dirs, files ] = self.systemUtils.scan(self.path) # Create a .tar for all dirs matching the constraints, if any self.logger.info("Starting .tar file generation for 'crowded' dirs (if any)") for dir in dirs: self.fileGrouper.recursive(self.path + '/' + dir) # Second scan after file grouper execution self.logger.info("First-level scan of the 'store' directory") [ dirs, files ] = self.systemUtils.scan(self.path) timestamp = dt.now().strftime("%Y_%m_%d-%H_%M_%S") # Case 1: /home/user/store contains both files and dirs if files and dirs: self.logger.debug("The 'store' directory contains both files and dirs") destPath = self.path + '/' + timestamp + "-vos_wrapper" try: os.mkdir(destPath) except OSError as error: sys.exit(f"FATAL: {error}") for file in files: srcPath = self.path + '/' + file shutil.move(srcPath, destPath) Loading @@ -109,51 +111,38 @@ class StorePreprocessor(TaskExecutor): self.md5calc.recursive(destPath) # Case 2: /home/user/store contains only files elif files and not dirs: self.logger.debug("The 'store' directory contains only files") destPath = self.path + '/' + timestamp + "-vos_wrapper" try: os.mkdir(destPath) except OSError as error: sys.exit(f"FATAL: {error}") for file in files: srcPath = self.path + '/' + file shutil.move(srcPath, destPath) self.md5calc.recursive(destPath) # Case 3: /home/user/store contains only dirs elif not files and dirs: self.logger.debug("The 'store' directory contains only dirs") for dir in dirs: destPath = self.path + '/' + dir self.md5calc.recursive(destPath) # Case 4: /home/user/store is empty # Case 4: /home/user/store is empty (this should be handled by data_rpc_server.py) else: sys.exit("The 'store' directory is empty.") self.logger.critical("FATAL: the 'store' directory is empty, the application MUST to be restarted.") # Third scan after directory structure 'check & repair' self.logger.info("Recursive scan of the 'store' directory") [ dirs, files ] = self.systemUtils.scanRecursive(self.path) # File catalog update #out = open("store_preprocessor_log.txt", "a") #self.userId = self.dbConn.getUserId(self.username) self.userId = self.jobObj.ownerId #out.write(f"USER: {self.username}\n") #out.write(f"USER_ID: {self.userId}\n") self.logger.info("Checksum calculation and file catalog update") pathPrefix = self.storageStorePath.replace("{username}", self.username) tstampWrapperDirPattern = re.compile("/[0-9]{4}_[0-9]{2}_[0-9]{2}-[0-9]{2}_[0-9]{2}_[0-9]{2}-vos_wrapper") for dir in dirs: #out.write(f"DIR: {dir}\n") #out.write(f"pathPrefix: {pathPrefix}\n") basePath = os.path.dirname(dir).replace(pathPrefix, "/" + self.username) #out.write(f"basePath: {basePath}\n") nodeName = os.path.basename(dir) #out.write(f"nodeName: {nodeName}\n") cnode = Node(nodeName, "container") if not tstampWrapperDirPattern.match("/" + nodeName): if tstampWrapperDirPattern.search(basePath): tstampWrapperDir = tstampWrapperDirPattern.search(basePath).group(0).lstrip('/') #out.write(f"tstampWrapperDir: {tstampWrapperDir}\n") basePath = tstampWrapperDirPattern.sub("", basePath) #out.write(f"newBasePath: {basePath}\n") cnode.setWrapperDir(tstampWrapperDir) cnode.setParentPath(basePath) locationId = self.dbConn.getLocationId(self.storageId) Loading @@ -177,23 +166,15 @@ class StorePreprocessor(TaskExecutor): now = dt.now().isoformat() self.nodeList.append([ now, dir, vospacePath, "container", "SKIPPED" ]) #out.write("\n\n") for flist in files: for file in flist: #out.write(f"FILE: {file}\n") if self.md5calc.fileIsValid(file): #out.write(f"pathPrefix: {pathPrefix}\n") basePath = os.path.dirname(file).replace(pathPrefix, "/" + self.username) #out.write(f"basePath: {basePath}\n") nodeName = os.path.basename(file) #out.write(f"nodeName: {nodeName}\n") dnode = Node(nodeName, "data") if tstampWrapperDirPattern.search(basePath): tstampWrapperDir = tstampWrapperDirPattern.search(basePath).group(0).lstrip('/') #out.write(f"tstampWrapperDir: {tstampWrapperDir}\n") basePath = tstampWrapperDirPattern.sub("", basePath) #out.write(f"newBasePath: {basePath}\n") dnode.setWrapperDir(tstampWrapperDir) dnode.setParentPath(basePath) locationId = self.dbConn.getLocationId(self.storageId) Loading @@ -217,13 +198,16 @@ class StorePreprocessor(TaskExecutor): # node already exists, skip it... now = dt.now().isoformat() self.nodeList.append([ now, file, vospacePath, "data", "SKIPPED" ]) self.logger.info("Overall data size calculation") self.jobObj.jobInfo["dataSize"] = self.systemUtils.getSize(self.path) #out.write("\n") #out.close() self.logger.info("========== End of preprocessing phase ==========") except Exception: self.logger.exception("FATAL: something went wrong during the preprocessing phase.") time.sleep(5) sys.exit(1) def update(self): self.logger.info("Job phase updated to QUEUED") self.jobObj.setPhase("QUEUED") self.dbConn.setPhase(self.jobId, "QUEUED") Loading @@ -240,6 +224,7 @@ class StorePreprocessor(TaskExecutor): your job has been QUEUED. Job ID: {self.jobId} Job type: {self.jobObj.type} Storage type: {self.storageType} Storage ID: {self.storageId} Owner ID: {self.jobObj.ownerId} Loading @@ -261,6 +246,7 @@ class StorePreprocessor(TaskExecutor): self.jobId = self.jobObj.jobId self.storageId = self.jobObj.jobInfo["storageId"] self.storageType = self.jobObj.jobInfo["storageType"] self.userId = self.jobObj.ownerId self.username = self.jobObj.jobInfo["userName"] self.prepare(self.username) self.execute() Loading