Loading etl/prs/prs_prepare_backup.py 0 → 100644 +99 −0 Original line number Diff line number Diff line import os import shutil import pandas as pd import sqlalchemy from sqlalchemy.orm import Session from sqlalchemy import and_, or_ from typing import Union, Tuple, List from contextlib import closing from assets.commons import (setup_logger, get_pg_engine, make_archive, cleanup_directory) from stg.stg_build_db_structure import (GridPars, ModelPars, RatioMaps, GridFiles, MomentZeroMaps, StarsPars) def get_filenames( lines: Union[list, tuple], run_id: str) -> List: """ Get results from the database, given the parameters of the model :param lines: the lines used to compute the ratio :param run_id: the run_id of the model :return: """ engine = get_pg_engine(logger=logger) with closing(Session(engine)) as session: # Query database results = session.query(GridPars.zipped_grid_name, ModelPars.fits_cube_name, RatioMaps.ratio_map_name, GridFiles.fits_grid_name) \ .join(GridFiles) \ .join(StarsPars, isouter=True) \ .join(ModelPars) \ .join(MomentZeroMaps) \ .join(RatioMaps, or_( and_(RatioMaps.mom_zero_map_1, ModelPars.iline == lines[0]), and_(RatioMaps.mom_zero_map_2, ModelPars.iline == lines[1]))) \ .filter( and_(GridPars.run_id == run_id)).order_by(GridPars.created_on.desc()).all() engine.dispose() return results def copy_and_compress_files(object_category: str): column_name_map = { 'cubes': 'fits_cube_name', 'ratios': 'ratio_map_name', 'grids': 'fits_grid_name', } for filename in df_all[column_name_map[object_category]].unique(): shutil.copyfile(os.path.join('prs', 'fits', object_category, filename), os.path.join(backup_dir, run_id, object_category, filename)) make_archive(output_filename=f'{object_category}.zip', source_dir=os.path.join(backup_dir, run_id, object_category), archive_path=os.path.join(backup_dir, run_id)) cleanup_directory(directory=os.path.join(backup_dir, run_id, object_category), logger=logger) def try_mkdir(dir: str): try: os.mkdir(dir) except FileExistsError: pass if __name__ == '__main__': logger = setup_logger(name='PRS - UPLOAD') cwd = os.getcwd() run_id = '9c603062-2c27-4397-b404-2cb185febe18' backup_dir = os.path.join('prs', 'fits', 'backup') objects_to_save = ['cubes', 'ratios', 'grids'] try_mkdir(backup_dir) try_mkdir(os.path.join(backup_dir, run_id)) for object_category in objects_to_save: try_mkdir(os.path.join(backup_dir, run_id, object_category)) line_pairs = [['87', '86'], ['88', '87'], ['257', '256'], ['381', '380']] df_list = [] for lines in line_pairs: rows = get_filenames(lines=lines, run_id=run_id) df = pd.DataFrame(data=rows, columns=['zipped_grid_name', 'fits_cube_name', 'ratio_map_name', 'fits_grid_name']) df_list.append(df) df_all = pd.concat(df_list) for object_category in objects_to_save: copy_and_compress_files(object_category=object_category) for object_category in objects_to_save: os.rmdir(os.path.join(backup_dir, run_id, object_category)) shutil.copyfile('full_dataset.csv', os.path.join(backup_dir, run_id, 'full_dataset.csv')) Loading
etl/prs/prs_prepare_backup.py 0 → 100644 +99 −0 Original line number Diff line number Diff line import os import shutil import pandas as pd import sqlalchemy from sqlalchemy.orm import Session from sqlalchemy import and_, or_ from typing import Union, Tuple, List from contextlib import closing from assets.commons import (setup_logger, get_pg_engine, make_archive, cleanup_directory) from stg.stg_build_db_structure import (GridPars, ModelPars, RatioMaps, GridFiles, MomentZeroMaps, StarsPars) def get_filenames( lines: Union[list, tuple], run_id: str) -> List: """ Get results from the database, given the parameters of the model :param lines: the lines used to compute the ratio :param run_id: the run_id of the model :return: """ engine = get_pg_engine(logger=logger) with closing(Session(engine)) as session: # Query database results = session.query(GridPars.zipped_grid_name, ModelPars.fits_cube_name, RatioMaps.ratio_map_name, GridFiles.fits_grid_name) \ .join(GridFiles) \ .join(StarsPars, isouter=True) \ .join(ModelPars) \ .join(MomentZeroMaps) \ .join(RatioMaps, or_( and_(RatioMaps.mom_zero_map_1, ModelPars.iline == lines[0]), and_(RatioMaps.mom_zero_map_2, ModelPars.iline == lines[1]))) \ .filter( and_(GridPars.run_id == run_id)).order_by(GridPars.created_on.desc()).all() engine.dispose() return results def copy_and_compress_files(object_category: str): column_name_map = { 'cubes': 'fits_cube_name', 'ratios': 'ratio_map_name', 'grids': 'fits_grid_name', } for filename in df_all[column_name_map[object_category]].unique(): shutil.copyfile(os.path.join('prs', 'fits', object_category, filename), os.path.join(backup_dir, run_id, object_category, filename)) make_archive(output_filename=f'{object_category}.zip', source_dir=os.path.join(backup_dir, run_id, object_category), archive_path=os.path.join(backup_dir, run_id)) cleanup_directory(directory=os.path.join(backup_dir, run_id, object_category), logger=logger) def try_mkdir(dir: str): try: os.mkdir(dir) except FileExistsError: pass if __name__ == '__main__': logger = setup_logger(name='PRS - UPLOAD') cwd = os.getcwd() run_id = '9c603062-2c27-4397-b404-2cb185febe18' backup_dir = os.path.join('prs', 'fits', 'backup') objects_to_save = ['cubes', 'ratios', 'grids'] try_mkdir(backup_dir) try_mkdir(os.path.join(backup_dir, run_id)) for object_category in objects_to_save: try_mkdir(os.path.join(backup_dir, run_id, object_category)) line_pairs = [['87', '86'], ['88', '87'], ['257', '256'], ['381', '380']] df_list = [] for lines in line_pairs: rows = get_filenames(lines=lines, run_id=run_id) df = pd.DataFrame(data=rows, columns=['zipped_grid_name', 'fits_cube_name', 'ratio_map_name', 'fits_grid_name']) df_list.append(df) df_all = pd.concat(df_list) for object_category in objects_to_save: copy_and_compress_files(object_category=object_category) for object_category in objects_to_save: os.rmdir(os.path.join(backup_dir, run_id, object_category)) shutil.copyfile('full_dataset.csv', os.path.join(backup_dir, run_id, 'full_dataset.csv'))