Loading etl/prs/prs_prepare_backup.py +81 −16 Original line number Diff line number Diff line Loading @@ -2,6 +2,7 @@ import os import shutil import pandas as pd import sqlalchemy from sqlalchemy.ext.serializer import dumps from sqlalchemy.orm import Session from sqlalchemy import and_, or_ from typing import Union, Tuple, List Loading Loading @@ -50,7 +51,10 @@ def get_filenames( return results def copy_and_compress_files(object_category: str): def copy_and_compress_files(object_category: str, df_all: pd.DataFrame, backup_dir: str, run_id): column_name_map = { 'cubes': 'fits_cube_name', 'ratios': 'ratio_map_name', Loading @@ -66,6 +70,13 @@ def copy_and_compress_files(object_category: str): logger=logger) def dump_db_table(table: sqlalchemy.orm.declarative_base, session: Session, path: str): with open(os.path.join(path, f'{table.__tablename__}.dat'), 'wb') as file_handler: file_handler.write(dumps(session.query(table).all())) def try_mkdir(dir: str): try: os.mkdir(dir) Loading @@ -73,29 +84,83 @@ def try_mkdir(dir: str): pass if __name__ == '__main__': logger = setup_logger(name='PRS - UPLOAD') cwd = os.getcwd() run_id = '9c603062-2c27-4397-b404-2cb185febe18' def main(run_id: str): backup_dir = os.path.join('prs', 'fits', 'backup') objects_to_save = ['cubes', 'ratios', 'grids'] try_mkdir(backup_dir) try_mkdir(os.path.join(backup_dir, run_id)) for object_category in objects_to_save: try_mkdir(os.path.join(backup_dir, run_id, object_category)) prepare_directory_structure(backup_dir=backup_dir, objects_to_save=objects_to_save, run_id=run_id) line_pairs = [['87', '86'], ['88', '87'], ['257', '256'], ['381', '380']] df_list = [] for lines in line_pairs: rows = get_filenames(lines=lines, df_all = retrieve_all_filenames(line_pairs=line_pairs, run_id=run_id) df = pd.DataFrame(data=rows, columns=['zipped_grid_name', 'fits_cube_name', 'ratio_map_name', 'fits_grid_name']) df_list.append(df) df_all = pd.concat(df_list) for object_category in objects_to_save: copy_and_compress_files(object_category=object_category) copy_and_compress_files(object_category=object_category, df_all=df_all, backup_dir=backup_dir, run_id=run_id) for object_category in objects_to_save: os.rmdir(os.path.join(backup_dir, run_id, object_category)) shutil.copyfile('full_dataset.csv', os.path.join(backup_dir, run_id, 'full_dataset.csv')) save_output_images(backup_dir=backup_dir, run_id=run_id) table_list = [ GridPars, ModelPars, RatioMaps, GridFiles, MomentZeroMaps, StarsPars ] engine = get_pg_engine(logger=logger) with closing(Session(engine)) as session: for table in table_list: dump_db_table(table=table, session=session, path=backup_dir) engine.dispose() def save_output_images(backup_dir, run_id): try: shutil.copytree(os.path.join('prs', 'output'), os.path.join(backup_dir, run_id, 'output')) except FileExistsError: pass make_archive(output_filename=f'output.zip', source_dir=os.path.join(backup_dir, run_id, 'output'), archive_path=os.path.join(backup_dir, run_id)) cleanup_directory(directory=os.path.join(backup_dir, run_id, 'output'), logger=logger) os.rmdir(os.path.join(backup_dir, run_id, 'output')) def retrieve_all_filenames(line_pairs: List[List[str]], run_id: str): df_list = [] for lines in line_pairs: rows = get_filenames(lines=lines, run_id=run_id) df = pd.DataFrame(data=rows, columns=['zipped_grid_name', 'fits_cube_name', 'ratio_map_name', 'fits_grid_name']) df_list.append(df) df_all = pd.concat(df_list) return df_all def prepare_directory_structure(backup_dir: str, objects_to_save: List[str], run_id: str): try_mkdir(backup_dir) try_mkdir(os.path.join(backup_dir, run_id)) for object_category in objects_to_save: try_mkdir(os.path.join(backup_dir, run_id, object_category)) logger = setup_logger(name='PRS - UPLOAD') if __name__ == '__main__': main(run_id='9c603062-2c27-4397-b404-2cb185febe18') Loading
etl/prs/prs_prepare_backup.py +81 −16 Original line number Diff line number Diff line Loading @@ -2,6 +2,7 @@ import os import shutil import pandas as pd import sqlalchemy from sqlalchemy.ext.serializer import dumps from sqlalchemy.orm import Session from sqlalchemy import and_, or_ from typing import Union, Tuple, List Loading Loading @@ -50,7 +51,10 @@ def get_filenames( return results def copy_and_compress_files(object_category: str): def copy_and_compress_files(object_category: str, df_all: pd.DataFrame, backup_dir: str, run_id): column_name_map = { 'cubes': 'fits_cube_name', 'ratios': 'ratio_map_name', Loading @@ -66,6 +70,13 @@ def copy_and_compress_files(object_category: str): logger=logger) def dump_db_table(table: sqlalchemy.orm.declarative_base, session: Session, path: str): with open(os.path.join(path, f'{table.__tablename__}.dat'), 'wb') as file_handler: file_handler.write(dumps(session.query(table).all())) def try_mkdir(dir: str): try: os.mkdir(dir) Loading @@ -73,29 +84,83 @@ def try_mkdir(dir: str): pass if __name__ == '__main__': logger = setup_logger(name='PRS - UPLOAD') cwd = os.getcwd() run_id = '9c603062-2c27-4397-b404-2cb185febe18' def main(run_id: str): backup_dir = os.path.join('prs', 'fits', 'backup') objects_to_save = ['cubes', 'ratios', 'grids'] try_mkdir(backup_dir) try_mkdir(os.path.join(backup_dir, run_id)) for object_category in objects_to_save: try_mkdir(os.path.join(backup_dir, run_id, object_category)) prepare_directory_structure(backup_dir=backup_dir, objects_to_save=objects_to_save, run_id=run_id) line_pairs = [['87', '86'], ['88', '87'], ['257', '256'], ['381', '380']] df_list = [] for lines in line_pairs: rows = get_filenames(lines=lines, df_all = retrieve_all_filenames(line_pairs=line_pairs, run_id=run_id) df = pd.DataFrame(data=rows, columns=['zipped_grid_name', 'fits_cube_name', 'ratio_map_name', 'fits_grid_name']) df_list.append(df) df_all = pd.concat(df_list) for object_category in objects_to_save: copy_and_compress_files(object_category=object_category) copy_and_compress_files(object_category=object_category, df_all=df_all, backup_dir=backup_dir, run_id=run_id) for object_category in objects_to_save: os.rmdir(os.path.join(backup_dir, run_id, object_category)) shutil.copyfile('full_dataset.csv', os.path.join(backup_dir, run_id, 'full_dataset.csv')) save_output_images(backup_dir=backup_dir, run_id=run_id) table_list = [ GridPars, ModelPars, RatioMaps, GridFiles, MomentZeroMaps, StarsPars ] engine = get_pg_engine(logger=logger) with closing(Session(engine)) as session: for table in table_list: dump_db_table(table=table, session=session, path=backup_dir) engine.dispose() def save_output_images(backup_dir, run_id): try: shutil.copytree(os.path.join('prs', 'output'), os.path.join(backup_dir, run_id, 'output')) except FileExistsError: pass make_archive(output_filename=f'output.zip', source_dir=os.path.join(backup_dir, run_id, 'output'), archive_path=os.path.join(backup_dir, run_id)) cleanup_directory(directory=os.path.join(backup_dir, run_id, 'output'), logger=logger) os.rmdir(os.path.join(backup_dir, run_id, 'output')) def retrieve_all_filenames(line_pairs: List[List[str]], run_id: str): df_list = [] for lines in line_pairs: rows = get_filenames(lines=lines, run_id=run_id) df = pd.DataFrame(data=rows, columns=['zipped_grid_name', 'fits_cube_name', 'ratio_map_name', 'fits_grid_name']) df_list.append(df) df_all = pd.concat(df_list) return df_all def prepare_directory_structure(backup_dir: str, objects_to_save: List[str], run_id: str): try_mkdir(backup_dir) try_mkdir(os.path.join(backup_dir, run_id)) for object_category in objects_to_save: try_mkdir(os.path.join(backup_dir, run_id, object_category)) logger = setup_logger(name='PRS - UPLOAD') if __name__ == '__main__': main(run_id='9c603062-2c27-4397-b404-2cb185febe18')