Loading etl/main.py +13 −38 Original line number Diff line number Diff line Loading @@ -20,33 +20,7 @@ from prs.prs_compute_integrated_fluxes_and_ratios import main as prs_main from prs.prs_inspect_results import main as prs_inspection_main def compute_grid(tdust, nh2, line, density_keyword, dust_temperature_keyword): scratch_dir = os.path.join('mdl', 'scratches', str(uuid.uuid4())) stg_overrides = { 'grid': { dust_temperature_keyword: tdust, density_keyword: nh2, } } overrides = { 'grid_lines': stg_overrides, 'model': { 'radmc_observation': { 'iline': line } } } tarname = stg_main(override_config=overrides, path_radmc_files=scratch_dir, run_id=run_id) cube_fits_name = execute_radmc_script(grid_zipfile=tarname, override_config=overrides, radmc_input_path=scratch_dir, run_id=run_id) return cube_fits_name def compute_full_grid(tdust, nh2, line, density_keyword, dust_temperature_keyword): def compute_full_grid(tdust, nh2, line, density_keyword, dust_temperature_keyword) -> Tuple[float, float, int, str]: scratch_dir = os.path.join('mdl', 'scratches', str(uuid.uuid4())) stg_overrides = { 'grid': { Loading @@ -72,9 +46,9 @@ def compute_full_grid(tdust, nh2, line, density_keyword, dust_temperature_keywor return tdust, nh2, line, cube_fits_name def initialize_queue(engine, run_id, run_arguments): def initialize_queue(engine: sqlalchemy.engine, run_id: str, run_arguments: Iterator): for arguments in run_arguments: raw_insert_entry = {'run_id': run_id, 'dust_temperature': arguments[0], Loading Loading @@ -144,18 +118,20 @@ def compute_model(run_id: str): engine = get_pg_engine(logger=logger) parameters_set = get_run_pars(engine=engine, run_id=run_id) engine.dispose() if parameters_set is not None: fits_cube_name = compute_grid(tdust=parameters_set[2], _, _, _, fits_cube_name = compute_full_grid(tdust=parameters_set[2], nh2=parameters_set[3], line=parameters_set[4], density_keyword=parameters_set[5], dust_temperature_keyword=parameters_set[6]) engine = get_pg_engine(logger=logger) insert_fits_name(engine=engine, row_id=parameters_set[0], fits_cube_name=fits_cube_name) engine.dispose() else: logger.info('All models were completed.') engine.dispose() def initialize_run(): Loading @@ -172,11 +148,11 @@ def initialize_run(): def compute_remaining_models(run_id: Union[None, str] = None) -> int: _run_id = validate_parameter(run_id, default=os.getenv('run_id')) logger.info(_run_id) engine = get_pg_engine(logger=logger) sql_query = sqlalchemy.text(f"""SELECT count(*) FROM tmp_execution_queue WHERE run_id = '{run_id}' AND done = false""") engine = get_pg_engine(logger=logger) n_models = engine.execution_options(autocommit=True).execute(sql_query).first()[0] engine.dispose() sys.stdout.write(str(n_models)) Loading Loading @@ -256,7 +232,6 @@ parser.add_argument('--cleanup_scratches') parser.add_argument('--distributed') args = parser.parse_args() if __name__ == '__main__': run_id = initialize_run() assert run_id is not None Loading etl/mdl/mdl_execute_radmc_command.py +4 −4 Original line number Diff line number Diff line Loading @@ -196,9 +196,6 @@ def main(grid_zipfile: str, radmc_command = f'radmc3d image {" ".join(options_set)}' outfile.write(radmc_command) if engine is None: engine = get_pg_engine(logger=logger, engine_kwargs={'pool_size': 3}) config_full = config_mdl.copy() config_full.update(config_stg) cube_filename = f'{compute_unique_hash_filename(config=config_full)}.fits' Loading @@ -219,6 +216,9 @@ def main(grid_zipfile: str, else: logger.info('Computation performed already! Skipping...') if engine is None: engine = get_pg_engine(logger=logger, engine_kwargs={'pool_size': 2}) populate_line_table(config_lines=config_lines, engine=engine, executed_on=executed_on, Loading @@ -237,7 +237,7 @@ def main(grid_zipfile: str, engine=engine, executed_on=executed_on, run_id=run_id) engine.dispose() return cube_filename Loading etl/sbatch_command.sh 0 → 100644 +13 −0 Original line number Diff line number Diff line #!/bin/bash #SBATCH --job-name=rdp05 #SBATCH --output=pleiadi_05.txt ##SBATCH --time=240:00:00 ##SBATCH --partition arc # #SBATCH --ntasks=1 #SBATCH --cpus-per-task=2 ##SBATCH --mem-per-cpu=800 apptainer-setup 1.0.1-centos7 echo $run_id srun singularity exec -B .:$HOME swiss_army_knife_latest.sif python main.py --distributed True --run_id $1 > run.log etl/slurm_template.sh +12 −13 Original line number Diff line number Diff line #!/bin/bash #SBATCH --job-name=rdp05 #SBATCH --output=pleiadi_05.txt ##SBATCH --time=240:00:00 ##SBATCH --partition arc # ##SBATCH --ntasks=1 #SBATCH --cpus-per-task=2 ##SBATCH --mem-per-cpu=800 pwd cd /homes/agianne/agianne/sak/swiss_army_knife_lr_ple05/etl pwd apptainer-setup 1.0.1-centos7 run_id_output=$(python -c'import main_parallel; main_parallel.initialize_run()') run_id_output=$(singularity exec -B .:$HOME swiss_army_knife_latest.sif python -c'import main; main.initialize_run()') run_id=$(echo $run_id_output | rev | cut -d" " -f1 | rev) remaining_tasks_output=$(python -c'import main_parallel; main_parallel.compute_remaining_models("'$run_id'")') remaining_tasks=$((echo $remaining_tasks_output | rev | cut -d" " -f1 | rev)) remaining_tasks_output=$(singularity exec -B .:$HOME swiss_army_knife_latest.sif python -c'import main; main.compute_remaining_models("'$run_id'")') remaining_tasks=$(echo $remaining_tasks_output | rev | cut -d" " -f1 | rev) # batch=10 # jobs_to_submit=$(( remaining_tasks < batch ? remaining_tasks : batch )) for i in {1..$remaining_tasks} more sbatch_command.sh for i in $(seq 1 $remaining_tasks) do srun singularity run -B .:$HOME swiss_army_knife_latest.sif > run.log echo $i sbatch sbatch_command.sh $run_id done etl/stg/stg_radmc_input_generator.py +28 −18 Original line number Diff line number Diff line Loading @@ -517,9 +517,6 @@ def main(run_id: str, config_lines = config['lines'] config_mdl = load_config_file(os.path.join('mdl', 'config', 'config.yml'), override_config=_override_config['model']) if engine is None: engine = get_pg_engine(logger=logger) input_files_dir = validate_parameter(path_radmc_files, default=os.path.join('mdl', 'radmc_files')) cleanup_directory(directory=input_files_dir, logger=logger) Loading @@ -527,21 +524,7 @@ def main(run_id: str, grid_metadata = extract_grid_metadata(config=config) if 'stars' in config: stars_metadata = config['stars'] if stars_metadata['spacing'] == 'log': wavelengths_micron = np.logspace(np.log10(stars_metadata['lambdas_micron_limits'][0]), np.log10(stars_metadata['lambdas_micron_limits'][1]), stars_metadata['nlambdas']) elif stars_metadata['spacing'] == 'linear': wavelengths_micron = np.linspace(stars_metadata['lambdas_micron_limits'][0], stars_metadata['lambdas_micron_limits'][1], stars_metadata['nlambdas']) else: raise (NotImplemented('Spacing not defined. Choose between {linear, log}')) write_stellar_input_file(stars_metadata=stars_metadata, grid_metadata=grid_metadata, path=input_files_dir, wavelengths_micron=wavelengths_micron) wavelengths_micron = manage_wavelength_grid_with_stars(config, grid_metadata, input_files_dir) else: wavelengths_micron = np.logspace(np.log10(5), np.log10(1300), Loading @@ -565,16 +548,20 @@ def main(run_id: str, grid_metadata=grid_metadata, line_config=config['lines'], path=input_files_dir) if engine is None: engine = get_pg_engine(logger=logger) zip_filename = populate_grid_table(config=config, engine=engine, grid_metadata=grid_metadata, run_id=run_id) engine.dispose() write_radmc_main_input_file(config_mdl=config_mdl, config_lines=config_lines, path=input_files_dir) execution_dir = os.getcwd() os.chdir(input_files_dir) # Recompute dust temperature distribution if needed, based on star positions and properties if 'stars' in config: if compute_dust_temperature is True: logger.info('Computing dust temperature distribution using the stars in the configuration file') Loading @@ -593,6 +580,8 @@ def main(run_id: str, os.path.join('.', 'dust_temperature.dat')) os.chdir(execution_dir) if engine is None: engine = get_pg_engine(logger=logger) for quantity_name in ('gas_number_density', 'dust_temperature'): save_and_persist_grid(engine=engine, profiles=profiles, Loading @@ -607,6 +596,27 @@ def main(run_id: str, return zip_filename def manage_wavelength_grid_with_stars(config: dict, grid_metadata: dict, input_files_dir: str) -> np.array: stars_metadata = config['stars'] if stars_metadata['spacing'] == 'log': wavelengths_micron = np.logspace(np.log10(stars_metadata['lambdas_micron_limits'][0]), np.log10(stars_metadata['lambdas_micron_limits'][1]), stars_metadata['nlambdas']) elif stars_metadata['spacing'] == 'linear': wavelengths_micron = np.linspace(stars_metadata['lambdas_micron_limits'][0], stars_metadata['lambdas_micron_limits'][1], stars_metadata['nlambdas']) else: raise (NotImplemented('Spacing not defined. Choose between {linear, log}')) write_stellar_input_file(stars_metadata=stars_metadata, grid_metadata=grid_metadata, path=input_files_dir, wavelengths_micron=wavelengths_micron) return wavelengths_micron def save_and_persist_grid(engine: sqla_engine, profiles: dict, run_id: str, Loading Loading
etl/main.py +13 −38 Original line number Diff line number Diff line Loading @@ -20,33 +20,7 @@ from prs.prs_compute_integrated_fluxes_and_ratios import main as prs_main from prs.prs_inspect_results import main as prs_inspection_main def compute_grid(tdust, nh2, line, density_keyword, dust_temperature_keyword): scratch_dir = os.path.join('mdl', 'scratches', str(uuid.uuid4())) stg_overrides = { 'grid': { dust_temperature_keyword: tdust, density_keyword: nh2, } } overrides = { 'grid_lines': stg_overrides, 'model': { 'radmc_observation': { 'iline': line } } } tarname = stg_main(override_config=overrides, path_radmc_files=scratch_dir, run_id=run_id) cube_fits_name = execute_radmc_script(grid_zipfile=tarname, override_config=overrides, radmc_input_path=scratch_dir, run_id=run_id) return cube_fits_name def compute_full_grid(tdust, nh2, line, density_keyword, dust_temperature_keyword): def compute_full_grid(tdust, nh2, line, density_keyword, dust_temperature_keyword) -> Tuple[float, float, int, str]: scratch_dir = os.path.join('mdl', 'scratches', str(uuid.uuid4())) stg_overrides = { 'grid': { Loading @@ -72,9 +46,9 @@ def compute_full_grid(tdust, nh2, line, density_keyword, dust_temperature_keywor return tdust, nh2, line, cube_fits_name def initialize_queue(engine, run_id, run_arguments): def initialize_queue(engine: sqlalchemy.engine, run_id: str, run_arguments: Iterator): for arguments in run_arguments: raw_insert_entry = {'run_id': run_id, 'dust_temperature': arguments[0], Loading Loading @@ -144,18 +118,20 @@ def compute_model(run_id: str): engine = get_pg_engine(logger=logger) parameters_set = get_run_pars(engine=engine, run_id=run_id) engine.dispose() if parameters_set is not None: fits_cube_name = compute_grid(tdust=parameters_set[2], _, _, _, fits_cube_name = compute_full_grid(tdust=parameters_set[2], nh2=parameters_set[3], line=parameters_set[4], density_keyword=parameters_set[5], dust_temperature_keyword=parameters_set[6]) engine = get_pg_engine(logger=logger) insert_fits_name(engine=engine, row_id=parameters_set[0], fits_cube_name=fits_cube_name) engine.dispose() else: logger.info('All models were completed.') engine.dispose() def initialize_run(): Loading @@ -172,11 +148,11 @@ def initialize_run(): def compute_remaining_models(run_id: Union[None, str] = None) -> int: _run_id = validate_parameter(run_id, default=os.getenv('run_id')) logger.info(_run_id) engine = get_pg_engine(logger=logger) sql_query = sqlalchemy.text(f"""SELECT count(*) FROM tmp_execution_queue WHERE run_id = '{run_id}' AND done = false""") engine = get_pg_engine(logger=logger) n_models = engine.execution_options(autocommit=True).execute(sql_query).first()[0] engine.dispose() sys.stdout.write(str(n_models)) Loading Loading @@ -256,7 +232,6 @@ parser.add_argument('--cleanup_scratches') parser.add_argument('--distributed') args = parser.parse_args() if __name__ == '__main__': run_id = initialize_run() assert run_id is not None Loading
etl/mdl/mdl_execute_radmc_command.py +4 −4 Original line number Diff line number Diff line Loading @@ -196,9 +196,6 @@ def main(grid_zipfile: str, radmc_command = f'radmc3d image {" ".join(options_set)}' outfile.write(radmc_command) if engine is None: engine = get_pg_engine(logger=logger, engine_kwargs={'pool_size': 3}) config_full = config_mdl.copy() config_full.update(config_stg) cube_filename = f'{compute_unique_hash_filename(config=config_full)}.fits' Loading @@ -219,6 +216,9 @@ def main(grid_zipfile: str, else: logger.info('Computation performed already! Skipping...') if engine is None: engine = get_pg_engine(logger=logger, engine_kwargs={'pool_size': 2}) populate_line_table(config_lines=config_lines, engine=engine, executed_on=executed_on, Loading @@ -237,7 +237,7 @@ def main(grid_zipfile: str, engine=engine, executed_on=executed_on, run_id=run_id) engine.dispose() return cube_filename Loading
etl/sbatch_command.sh 0 → 100644 +13 −0 Original line number Diff line number Diff line #!/bin/bash #SBATCH --job-name=rdp05 #SBATCH --output=pleiadi_05.txt ##SBATCH --time=240:00:00 ##SBATCH --partition arc # #SBATCH --ntasks=1 #SBATCH --cpus-per-task=2 ##SBATCH --mem-per-cpu=800 apptainer-setup 1.0.1-centos7 echo $run_id srun singularity exec -B .:$HOME swiss_army_knife_latest.sif python main.py --distributed True --run_id $1 > run.log
etl/slurm_template.sh +12 −13 Original line number Diff line number Diff line #!/bin/bash #SBATCH --job-name=rdp05 #SBATCH --output=pleiadi_05.txt ##SBATCH --time=240:00:00 ##SBATCH --partition arc # ##SBATCH --ntasks=1 #SBATCH --cpus-per-task=2 ##SBATCH --mem-per-cpu=800 pwd cd /homes/agianne/agianne/sak/swiss_army_knife_lr_ple05/etl pwd apptainer-setup 1.0.1-centos7 run_id_output=$(python -c'import main_parallel; main_parallel.initialize_run()') run_id_output=$(singularity exec -B .:$HOME swiss_army_knife_latest.sif python -c'import main; main.initialize_run()') run_id=$(echo $run_id_output | rev | cut -d" " -f1 | rev) remaining_tasks_output=$(python -c'import main_parallel; main_parallel.compute_remaining_models("'$run_id'")') remaining_tasks=$((echo $remaining_tasks_output | rev | cut -d" " -f1 | rev)) remaining_tasks_output=$(singularity exec -B .:$HOME swiss_army_knife_latest.sif python -c'import main; main.compute_remaining_models("'$run_id'")') remaining_tasks=$(echo $remaining_tasks_output | rev | cut -d" " -f1 | rev) # batch=10 # jobs_to_submit=$(( remaining_tasks < batch ? remaining_tasks : batch )) for i in {1..$remaining_tasks} more sbatch_command.sh for i in $(seq 1 $remaining_tasks) do srun singularity run -B .:$HOME swiss_army_knife_latest.sif > run.log echo $i sbatch sbatch_command.sh $run_id done
etl/stg/stg_radmc_input_generator.py +28 −18 Original line number Diff line number Diff line Loading @@ -517,9 +517,6 @@ def main(run_id: str, config_lines = config['lines'] config_mdl = load_config_file(os.path.join('mdl', 'config', 'config.yml'), override_config=_override_config['model']) if engine is None: engine = get_pg_engine(logger=logger) input_files_dir = validate_parameter(path_radmc_files, default=os.path.join('mdl', 'radmc_files')) cleanup_directory(directory=input_files_dir, logger=logger) Loading @@ -527,21 +524,7 @@ def main(run_id: str, grid_metadata = extract_grid_metadata(config=config) if 'stars' in config: stars_metadata = config['stars'] if stars_metadata['spacing'] == 'log': wavelengths_micron = np.logspace(np.log10(stars_metadata['lambdas_micron_limits'][0]), np.log10(stars_metadata['lambdas_micron_limits'][1]), stars_metadata['nlambdas']) elif stars_metadata['spacing'] == 'linear': wavelengths_micron = np.linspace(stars_metadata['lambdas_micron_limits'][0], stars_metadata['lambdas_micron_limits'][1], stars_metadata['nlambdas']) else: raise (NotImplemented('Spacing not defined. Choose between {linear, log}')) write_stellar_input_file(stars_metadata=stars_metadata, grid_metadata=grid_metadata, path=input_files_dir, wavelengths_micron=wavelengths_micron) wavelengths_micron = manage_wavelength_grid_with_stars(config, grid_metadata, input_files_dir) else: wavelengths_micron = np.logspace(np.log10(5), np.log10(1300), Loading @@ -565,16 +548,20 @@ def main(run_id: str, grid_metadata=grid_metadata, line_config=config['lines'], path=input_files_dir) if engine is None: engine = get_pg_engine(logger=logger) zip_filename = populate_grid_table(config=config, engine=engine, grid_metadata=grid_metadata, run_id=run_id) engine.dispose() write_radmc_main_input_file(config_mdl=config_mdl, config_lines=config_lines, path=input_files_dir) execution_dir = os.getcwd() os.chdir(input_files_dir) # Recompute dust temperature distribution if needed, based on star positions and properties if 'stars' in config: if compute_dust_temperature is True: logger.info('Computing dust temperature distribution using the stars in the configuration file') Loading @@ -593,6 +580,8 @@ def main(run_id: str, os.path.join('.', 'dust_temperature.dat')) os.chdir(execution_dir) if engine is None: engine = get_pg_engine(logger=logger) for quantity_name in ('gas_number_density', 'dust_temperature'): save_and_persist_grid(engine=engine, profiles=profiles, Loading @@ -607,6 +596,27 @@ def main(run_id: str, return zip_filename def manage_wavelength_grid_with_stars(config: dict, grid_metadata: dict, input_files_dir: str) -> np.array: stars_metadata = config['stars'] if stars_metadata['spacing'] == 'log': wavelengths_micron = np.logspace(np.log10(stars_metadata['lambdas_micron_limits'][0]), np.log10(stars_metadata['lambdas_micron_limits'][1]), stars_metadata['nlambdas']) elif stars_metadata['spacing'] == 'linear': wavelengths_micron = np.linspace(stars_metadata['lambdas_micron_limits'][0], stars_metadata['lambdas_micron_limits'][1], stars_metadata['nlambdas']) else: raise (NotImplemented('Spacing not defined. Choose between {linear, log}')) write_stellar_input_file(stars_metadata=stars_metadata, grid_metadata=grid_metadata, path=input_files_dir, wavelengths_micron=wavelengths_micron) return wavelengths_micron def save_and_persist_grid(engine: sqla_engine, profiles: dict, run_id: str, Loading