Loading transfer_service/store_preprocessor.py 0 → 100644 +74 −0 Original line number Diff line number Diff line # TODO: # - permissions setup on folder # - inteface with the file catalog (insert) # - data cleanup # import os import shutil import sys from datetime import datetime as dt from checksum import Checksum class StorePreprocessor(object): def __init__(self): self.md5calc = Checksum() def scan(self): dirList = [] fileList = [] elementList = os.listdir(self.path) for el in elementList: elPath = self.path + '/' + el if os.path.isdir(elPath): dirList.append(el) elif os.path.isfile(elPath): fileList.append(el) else: sys.exit("FATAL: invalid file/dir.") return [ dirList, fileList ] def prepare(self, username): self.username = username self.path = "/home/" + username + "/store" def start(self): [ dirs, files ] = self.scan() timestamp = dt.now().strftime("%Y_%m_%d-%H_%M_%S") if files and dirs: destPath = self.path + '/' + timestamp try: os.mkdir(destPath) except OSError as error: sys.exit(f"FATAL: {error}") for file in files: srcPath = self.path + '/' + file shutil.move(srcPath, destPath) for dir in dirs: srcPath = self.path + '/' + dir shutil.move(srcPath, destPath) self.md5calc.recursive(destPath) elif files and not dirs: destPath = self.path + '/' + timestamp try: os.mkdir(destPath) except OSError as error: sys.exit(f"FATAL: {error}") for file in files: srcPath = self.path + '/' + file shutil.move(srcPath, destPath) self.md5calc.recursive(destPath) elif not files and dirs: for dir in dirs: destPath = self.path + '/' + dir self.md5calc.recursive(destPath) else: sys.exit("The 'store' directory is empty.") sp = StorePreprocessor() sp.prepare("cristiano") sp.start() Loading
transfer_service/store_preprocessor.py 0 → 100644 +74 −0 Original line number Diff line number Diff line # TODO: # - permissions setup on folder # - inteface with the file catalog (insert) # - data cleanup # import os import shutil import sys from datetime import datetime as dt from checksum import Checksum class StorePreprocessor(object): def __init__(self): self.md5calc = Checksum() def scan(self): dirList = [] fileList = [] elementList = os.listdir(self.path) for el in elementList: elPath = self.path + '/' + el if os.path.isdir(elPath): dirList.append(el) elif os.path.isfile(elPath): fileList.append(el) else: sys.exit("FATAL: invalid file/dir.") return [ dirList, fileList ] def prepare(self, username): self.username = username self.path = "/home/" + username + "/store" def start(self): [ dirs, files ] = self.scan() timestamp = dt.now().strftime("%Y_%m_%d-%H_%M_%S") if files and dirs: destPath = self.path + '/' + timestamp try: os.mkdir(destPath) except OSError as error: sys.exit(f"FATAL: {error}") for file in files: srcPath = self.path + '/' + file shutil.move(srcPath, destPath) for dir in dirs: srcPath = self.path + '/' + dir shutil.move(srcPath, destPath) self.md5calc.recursive(destPath) elif files and not dirs: destPath = self.path + '/' + timestamp try: os.mkdir(destPath) except OSError as error: sys.exit(f"FATAL: {error}") for file in files: srcPath = self.path + '/' + file shutil.move(srcPath, destPath) self.md5calc.recursive(destPath) elif not files and dirs: for dir in dirs: destPath = self.path + '/' + dir self.md5calc.recursive(destPath) else: sys.exit("The 'store' directory is empty.") sp = StorePreprocessor() sp.prepare("cristiano") sp.start()