Loading docker-compose.yaml +1 −1 Original line number Diff line number Diff line Loading @@ -9,7 +9,7 @@ services: - POSTGRES_USER=$DB_USER - POSTGRES_PASSWORD=$DB_PASS ports: - '5432:5432' - '31000:5432' volumes: - db:/var/lib/postgresql/data networks: Loading etl/assets/constants/__init__.py +1 −0 Original line number Diff line number Diff line Loading @@ -60,6 +60,7 @@ radmc_options_mapping = { aggregation_function_mapping = { 'mean': np.nanmean, 'sum': np.nansum, 'median': np.nanmedian } etl/main.py +21 −34 Original line number Diff line number Diff line import glob import os import uuid from itertools import product from multiprocessing import Pool from assets.commons import (load_config_file, parse_grid_overrides) parse_grid_overrides, cleanup_directory, setup_logger) from stg.stg_radmc_input_generator import main as stg_main from mdl.mdl_execute_radmc_command import main as execute_radmc_script from prs.prs_compute_integrated_fluxes_and_ratios import main as prs_main Loading @@ -19,7 +22,8 @@ def compute_grid(tdust, nh2, lines, density_keyword): } } tarname = stg_main(override_config=overrides, path_radmc_files=scratch_dir) path_radmc_files=scratch_dir, run_id=run_id) cube_fits = [] for line in lines: mdl_overrides = { Loading @@ -32,11 +36,14 @@ def compute_grid(tdust, nh2, lines, density_keyword): } cube_fits.append(execute_radmc_script(grid_tarfile=tarname, override_config=mdl_overrides, radmc_input_path=scratch_dir)) prs_main(cube_fits_list=cube_fits) radmc_input_path=scratch_dir, run_id=run_id)) prs_main(cube_fits_list=cube_fits, run_id=run_id) def build_model_grid(): def build_model_grid(run_id: str, cleanup_scratches: bool = True): stg_config = load_config_file(os.path.join('stg', 'config', 'config.yml')) pl_density_index = float(stg_config['grid']['density_powerlaw_idx']) _model_type = 'spherical' if pl_density_index != 0 else 'homogeneous' Loading @@ -50,40 +57,20 @@ def build_model_grid(): config=config) lines = config['overrides']['lines_to_process'] grid_tarfiles = [] density_keyword = 'central_density' if _model_type == 'homogeneous' else 'density_at_reference' # scratch_dir = os.path.join('mdl', 'scratches', str(uuid.uuid4())) # for (tdust, nh2) in product(dust_temperatures, densities): # overrides = { # 'grid': { # 'dust_temperature': tdust, # density_keyword: nh2, # } # } # grid_tarfiles.append(stg_main(override_config=overrides, # path_radmc_files=scratch_dir)) # cube_fits = [] # ratio_fits = [] # for line in lines: # mdl_overrides = { # 'grid_lines': overrides, # 'model': { # 'radmc_observation': { # 'iline': line # } # } # } # cube_fits.append(execute_radmc_script(grid_tarfile=grid_tarfiles[-1], # override_config=mdl_overrides, # radmc_input_path=scratch_dir)) # ratio_fits.append(prs_main(cube_fits_list=cube_fits)) parallel_args = product(dust_temperatures, densities, [lines], [density_keyword]) with Pool(4) as pool: pool.starmap(compute_grid, parallel_args) if cleanup_scratches is True: scratches_dirs = glob.glob(os.path.join('mdl', 'scratches', '*')) for scratches in scratches_dirs: cleanup_directory(directory=scratches, logger=logger) os.rmdir(scratches) prs_inspection_main() if __name__ == '__main__': build_model_grid() logger = setup_logger(name='MAIN') run_id = str(uuid.uuid4()) build_model_grid(run_id=run_id) etl/mdl/mdl_execute_radmc_command.py +25 −13 Original line number Diff line number Diff line Loading @@ -93,7 +93,8 @@ def populate_model_table(config_mdl: dict, grid_tarfile: str, cube_filename: str, engine: sqlalchemy.engine, executed_on: datetime.timestamp): executed_on: datetime.timestamp, run_id: str): """ Populate the model_parameters table in the DB :param config_mdl: the model configuration Loading @@ -101,6 +102,7 @@ def populate_model_table(config_mdl: dict, :param cube_filename: the name of the fits cube :param engine: the SQLAlchemy engine :param executed_on: the timestamp of execution :param run_id: the run unique identifier """ model_pars_dict = { 'zipped_grid_name': grid_tarfile, Loading @@ -116,7 +118,8 @@ def populate_model_table(config_mdl: dict, 'width_kms': get_value_if_specified(config_mdl['radmc_observation'], 'width_kms'), 'nchannels': get_value_if_specified(config_mdl['radmc_observation'], 'nchannels'), 'npix': get_value_if_specified(config_mdl['radmc_observation'], 'npix'), 'created_on': executed_on 'created_on': executed_on, 'run_id': run_id } upsert( table_object=ModelPars, Loading @@ -129,13 +132,15 @@ def populate_model_table(config_mdl: dict, def populate_species_and_partner_table(config_lines: dict, engine: sqlalchemy.engine, executed_on: datetime.timestamp, grid_tarfile: str): grid_tarfile: str, run_id: str): """ Populate the species_and_partners table in the DB :param config_lines: the line configuration :param engine: the SQLAlchemy engine :param executed_on: the timestamp of execution :param grid_tarfile: the name of the grid tarfile :param run_id: the run unique identifier """ for (species, collision_partner) in product(config_lines['species_to_include'], config_lines['collision_partners']): species_partner_dict = { Loading @@ -144,7 +149,8 @@ def populate_species_and_partner_table(config_lines: dict, 'molecular_abundance': config_lines['molecular_abundances'][species], 'collision_partner': collision_partner, 'molecular_abundance_collision_partner': config_lines['molecular_abundances'][collision_partner], 'created_on': executed_on 'created_on': executed_on, 'run_id': run_id } upsert( table_object=SpeciesAndPartners, Loading @@ -159,18 +165,21 @@ def populate_species_and_partner_table(config_lines: dict, def populate_line_table(config_lines: dict, engine: sqlalchemy.engine, executed_on: datetime.timestamp, grid_tarfile: str): grid_tarfile: str, run_id: str): """ Populate the lines table in the DB :param config_lines: the dictionary containing the line configuration :param engine: the SQLAlchemy engine to use :param executed_on: the timestamp of execution, to add to the record :param grid_tarfile: the grid tarfile name, to be used as key :param run_id: the run unique identifier """ line_pars_dict = { 'zipped_grid_name': f'{grid_tarfile}', 'lines_mode': config_lines['lines_mode'], 'created_on': executed_on 'created_on': executed_on, 'run_id': run_id } upsert( table_object=LinePars, Loading @@ -181,6 +190,7 @@ def populate_line_table(config_lines: dict, def main(grid_tarfile: str, run_id: str, override_config: Union[dict, None] = None, radmc_input_path: Union[str, None] = None) -> str: # This is necessary, because the lines_mode is needed both in the lines.inp and radmc3d.inp files Loading Loading @@ -211,9 +221,6 @@ def main(grid_tarfile: str, os.chdir(_radmc_input_path) os.system(radmc_command) os.chdir(execution_dir) if radmc_input_path is not None: cleanup_directory(directory=_radmc_input_path, logger=logger) os.rmdir(_radmc_input_path) config_full = config_mdl.copy() config_full.update(config_stg) Loading @@ -226,20 +233,25 @@ def main(grid_tarfile: str, populate_line_table(config_lines=config_lines, engine=engine, executed_on=executed_on, grid_tarfile=grid_tarfile) grid_tarfile=grid_tarfile, run_id=run_id) populate_species_and_partner_table(config_lines=config_lines, engine=engine, executed_on=executed_on, grid_tarfile=grid_tarfile) grid_tarfile=grid_tarfile, run_id=run_id) populate_model_table(config_mdl=config_mdl, grid_tarfile=grid_tarfile, cube_filename=cube_filename, engine=engine, executed_on=executed_on) executed_on=executed_on, run_id=run_id) return cube_filename if __name__ == '__main__': main(grid_tarfile='459295aa894dffa8c521e606d14dbb6927638a2c.tgz') main(grid_tarfile='459295aa894dffa8c521e606d14dbb6927638a2c.tgz', run_id='test_run') etl/prs/config/config.yml +2 −1 Original line number Diff line number Diff line Loading @@ -2,4 +2,5 @@ flux_computation: central_frequency: 230.000 central_frequency_units: "GHz" integration_limits: "all" aggregation_function: 'mean' No newline at end of file moment_zero_aggregation_function: 'sum' aggregation_function: 'weighted_mean' No newline at end of file Loading
docker-compose.yaml +1 −1 Original line number Diff line number Diff line Loading @@ -9,7 +9,7 @@ services: - POSTGRES_USER=$DB_USER - POSTGRES_PASSWORD=$DB_PASS ports: - '5432:5432' - '31000:5432' volumes: - db:/var/lib/postgresql/data networks: Loading
etl/assets/constants/__init__.py +1 −0 Original line number Diff line number Diff line Loading @@ -60,6 +60,7 @@ radmc_options_mapping = { aggregation_function_mapping = { 'mean': np.nanmean, 'sum': np.nansum, 'median': np.nanmedian }
etl/main.py +21 −34 Original line number Diff line number Diff line import glob import os import uuid from itertools import product from multiprocessing import Pool from assets.commons import (load_config_file, parse_grid_overrides) parse_grid_overrides, cleanup_directory, setup_logger) from stg.stg_radmc_input_generator import main as stg_main from mdl.mdl_execute_radmc_command import main as execute_radmc_script from prs.prs_compute_integrated_fluxes_and_ratios import main as prs_main Loading @@ -19,7 +22,8 @@ def compute_grid(tdust, nh2, lines, density_keyword): } } tarname = stg_main(override_config=overrides, path_radmc_files=scratch_dir) path_radmc_files=scratch_dir, run_id=run_id) cube_fits = [] for line in lines: mdl_overrides = { Loading @@ -32,11 +36,14 @@ def compute_grid(tdust, nh2, lines, density_keyword): } cube_fits.append(execute_radmc_script(grid_tarfile=tarname, override_config=mdl_overrides, radmc_input_path=scratch_dir)) prs_main(cube_fits_list=cube_fits) radmc_input_path=scratch_dir, run_id=run_id)) prs_main(cube_fits_list=cube_fits, run_id=run_id) def build_model_grid(): def build_model_grid(run_id: str, cleanup_scratches: bool = True): stg_config = load_config_file(os.path.join('stg', 'config', 'config.yml')) pl_density_index = float(stg_config['grid']['density_powerlaw_idx']) _model_type = 'spherical' if pl_density_index != 0 else 'homogeneous' Loading @@ -50,40 +57,20 @@ def build_model_grid(): config=config) lines = config['overrides']['lines_to_process'] grid_tarfiles = [] density_keyword = 'central_density' if _model_type == 'homogeneous' else 'density_at_reference' # scratch_dir = os.path.join('mdl', 'scratches', str(uuid.uuid4())) # for (tdust, nh2) in product(dust_temperatures, densities): # overrides = { # 'grid': { # 'dust_temperature': tdust, # density_keyword: nh2, # } # } # grid_tarfiles.append(stg_main(override_config=overrides, # path_radmc_files=scratch_dir)) # cube_fits = [] # ratio_fits = [] # for line in lines: # mdl_overrides = { # 'grid_lines': overrides, # 'model': { # 'radmc_observation': { # 'iline': line # } # } # } # cube_fits.append(execute_radmc_script(grid_tarfile=grid_tarfiles[-1], # override_config=mdl_overrides, # radmc_input_path=scratch_dir)) # ratio_fits.append(prs_main(cube_fits_list=cube_fits)) parallel_args = product(dust_temperatures, densities, [lines], [density_keyword]) with Pool(4) as pool: pool.starmap(compute_grid, parallel_args) if cleanup_scratches is True: scratches_dirs = glob.glob(os.path.join('mdl', 'scratches', '*')) for scratches in scratches_dirs: cleanup_directory(directory=scratches, logger=logger) os.rmdir(scratches) prs_inspection_main() if __name__ == '__main__': build_model_grid() logger = setup_logger(name='MAIN') run_id = str(uuid.uuid4()) build_model_grid(run_id=run_id)
etl/mdl/mdl_execute_radmc_command.py +25 −13 Original line number Diff line number Diff line Loading @@ -93,7 +93,8 @@ def populate_model_table(config_mdl: dict, grid_tarfile: str, cube_filename: str, engine: sqlalchemy.engine, executed_on: datetime.timestamp): executed_on: datetime.timestamp, run_id: str): """ Populate the model_parameters table in the DB :param config_mdl: the model configuration Loading @@ -101,6 +102,7 @@ def populate_model_table(config_mdl: dict, :param cube_filename: the name of the fits cube :param engine: the SQLAlchemy engine :param executed_on: the timestamp of execution :param run_id: the run unique identifier """ model_pars_dict = { 'zipped_grid_name': grid_tarfile, Loading @@ -116,7 +118,8 @@ def populate_model_table(config_mdl: dict, 'width_kms': get_value_if_specified(config_mdl['radmc_observation'], 'width_kms'), 'nchannels': get_value_if_specified(config_mdl['radmc_observation'], 'nchannels'), 'npix': get_value_if_specified(config_mdl['radmc_observation'], 'npix'), 'created_on': executed_on 'created_on': executed_on, 'run_id': run_id } upsert( table_object=ModelPars, Loading @@ -129,13 +132,15 @@ def populate_model_table(config_mdl: dict, def populate_species_and_partner_table(config_lines: dict, engine: sqlalchemy.engine, executed_on: datetime.timestamp, grid_tarfile: str): grid_tarfile: str, run_id: str): """ Populate the species_and_partners table in the DB :param config_lines: the line configuration :param engine: the SQLAlchemy engine :param executed_on: the timestamp of execution :param grid_tarfile: the name of the grid tarfile :param run_id: the run unique identifier """ for (species, collision_partner) in product(config_lines['species_to_include'], config_lines['collision_partners']): species_partner_dict = { Loading @@ -144,7 +149,8 @@ def populate_species_and_partner_table(config_lines: dict, 'molecular_abundance': config_lines['molecular_abundances'][species], 'collision_partner': collision_partner, 'molecular_abundance_collision_partner': config_lines['molecular_abundances'][collision_partner], 'created_on': executed_on 'created_on': executed_on, 'run_id': run_id } upsert( table_object=SpeciesAndPartners, Loading @@ -159,18 +165,21 @@ def populate_species_and_partner_table(config_lines: dict, def populate_line_table(config_lines: dict, engine: sqlalchemy.engine, executed_on: datetime.timestamp, grid_tarfile: str): grid_tarfile: str, run_id: str): """ Populate the lines table in the DB :param config_lines: the dictionary containing the line configuration :param engine: the SQLAlchemy engine to use :param executed_on: the timestamp of execution, to add to the record :param grid_tarfile: the grid tarfile name, to be used as key :param run_id: the run unique identifier """ line_pars_dict = { 'zipped_grid_name': f'{grid_tarfile}', 'lines_mode': config_lines['lines_mode'], 'created_on': executed_on 'created_on': executed_on, 'run_id': run_id } upsert( table_object=LinePars, Loading @@ -181,6 +190,7 @@ def populate_line_table(config_lines: dict, def main(grid_tarfile: str, run_id: str, override_config: Union[dict, None] = None, radmc_input_path: Union[str, None] = None) -> str: # This is necessary, because the lines_mode is needed both in the lines.inp and radmc3d.inp files Loading Loading @@ -211,9 +221,6 @@ def main(grid_tarfile: str, os.chdir(_radmc_input_path) os.system(radmc_command) os.chdir(execution_dir) if radmc_input_path is not None: cleanup_directory(directory=_radmc_input_path, logger=logger) os.rmdir(_radmc_input_path) config_full = config_mdl.copy() config_full.update(config_stg) Loading @@ -226,20 +233,25 @@ def main(grid_tarfile: str, populate_line_table(config_lines=config_lines, engine=engine, executed_on=executed_on, grid_tarfile=grid_tarfile) grid_tarfile=grid_tarfile, run_id=run_id) populate_species_and_partner_table(config_lines=config_lines, engine=engine, executed_on=executed_on, grid_tarfile=grid_tarfile) grid_tarfile=grid_tarfile, run_id=run_id) populate_model_table(config_mdl=config_mdl, grid_tarfile=grid_tarfile, cube_filename=cube_filename, engine=engine, executed_on=executed_on) executed_on=executed_on, run_id=run_id) return cube_filename if __name__ == '__main__': main(grid_tarfile='459295aa894dffa8c521e606d14dbb6927638a2c.tgz') main(grid_tarfile='459295aa894dffa8c521e606d14dbb6927638a2c.tgz', run_id='test_run')
etl/prs/config/config.yml +2 −1 Original line number Diff line number Diff line Loading @@ -2,4 +2,5 @@ flux_computation: central_frequency: 230.000 central_frequency_units: "GHz" integration_limits: "all" aggregation_function: 'mean' No newline at end of file moment_zero_aggregation_function: 'sum' aggregation_function: 'weighted_mean' No newline at end of file