Commit f8b09919 authored by Andrea Giannetti's avatar Andrea Giannetti
Browse files

Trying to solve the too many connections error by explicitly closing the session for each process.

parent ec345d4b
Loading
Loading
Loading
Loading
+5 −3
Original line number Diff line number Diff line
@@ -10,6 +10,7 @@ import urllib.request
import base64
import json
import sqlalchemy as sqla
from contextlib import closing
from sqlalchemy.orm import Session
from sqlalchemy.dialects.postgresql import insert
from hashlib import sha1
@@ -360,7 +361,7 @@ def upsert(table_object: sqla.orm.decl_api.DeclarativeMeta,
    statement = insert(table_object).values(row_dict)
    statement = statement.on_conflict_do_update(
        index_elements=conflict_keys, set_=row_dict)
    with Session(engine) as session:
    with closing(Session(engine)) as session:
        session.execute(statement)
        session.commit()

@@ -370,16 +371,17 @@ def convert_frequency_to_wavelength(frequency: astropy.units.Quantity,
    return frequency.to(output_units, equivalencies=u.spectral())


def get_pg_engine(logger: logging.Logger) -> sqla.engine.Engine:
def get_pg_engine(logger: logging.Logger, engine_kwargs: Union[dict, None] = None) -> sqla.engine.Engine:
    """
    Return the SQLAlchemy engine, given the credentials in the external file
    :param logger: the logger to use
    :return: the SQLAlchemy engine
    """
    _kwargs = validate_parameter(engine_kwargs, default={})
    credentials = get_credentials(logger=logger,
                                  credentials_filename=os.path.join('credentials', 'db_credentials.yml'))
    url = f'postgresql://{credentials["DB_USER"]}:{credentials["DB_PASS"]}@{credentials["DB_HOST"]}:{credentials["DB_PORT"]}/{credentials["DB_NAME"]}'
    engine = sqla.create_engine(url)
    engine = sqla.create_engine(url, **_kwargs)
    return engine


+7 −3
Original line number Diff line number Diff line
@@ -7,7 +7,8 @@ from multiprocessing import Pool
from assets.commons import (load_config_file,
                            parse_grid_overrides,
                            cleanup_directory,
                            setup_logger)
                            setup_logger,
                            get_pg_engine)
from stg.stg_radmc_input_generator import main as stg_main
from mdl.mdl_execute_radmc_command import main as execute_radmc_script
from prs.prs_compute_integrated_fluxes_and_ratios import main as prs_main
@@ -69,11 +70,13 @@ def build_model_grid(run_id: str,
    for (nh2, tdust, line, cube_fits_name) in results:
        results_map[f'{str(nh2)}_{str(tdust)}_{line}'] = cube_fits_name

    engine = get_pg_engine(logger=logger)
    for line_pair in line_pairs:
        for tdust, nh2 in product(dust_temperatures, densities):
            prs_main(cube_fits_list=[results_map[f'{str(nh2)}_{str(tdust)}_{line_pair[0]}'],
                                     results_map[f'{str(nh2)}_{str(tdust)}_{line_pair[1]}']],
                     run_id=run_id)
                     run_id=run_id,
                     engine=engine)

    if cleanup_scratches is True:
        scratches_dirs = glob.glob(os.path.join('mdl', 'scratches', '*'))
@@ -81,7 +84,8 @@ def build_model_grid(run_id: str,
            cleanup_directory(directory=scratches, logger=logger)
            os.rmdir(scratches)
    prs_inspection_main(run_id=run_id,
                        is_isothermal=_tdust_model_type == 'isothermal')
                        is_isothermal=_tdust_model_type == 'isothermal',
                        engine=engine)


if __name__ == '__main__':
+4 −2
Original line number Diff line number Diff line
@@ -176,7 +176,8 @@ def populate_line_table(config_lines: dict,
def main(grid_zipfile: str,
         run_id: str,
         override_config: Union[dict, None] = None,
         radmc_input_path: Union[str, None] = None) -> str:
         radmc_input_path: Union[str, None] = None,
         engine: sqlalchemy.engine = None) -> str:
    # This is necessary, because the lines_mode is needed both in the lines.inp and radmc3d.inp files
    # The reason for splitting the main input file from the rest is that some parameters can be changed
    # independently of the grid for the modeling. The mdl hash should depend on all the mdl parameters, not a subset
@@ -195,7 +196,8 @@ def main(grid_zipfile: str,
        radmc_command = f'radmc3d image {" ".join(options_set)}'
        outfile.write(radmc_command)

    engine = get_pg_engine(logger=logger)
    if engine is None:
        engine = get_pg_engine(logger=logger, engine_kwargs={'pool_size': 3})

    # Execute radmc
    logger.debug(f'Executing command: {radmc_command}')
+4 −2
Original line number Diff line number Diff line
@@ -194,12 +194,14 @@ def compute_image_ratios(fits1: str,
def main(cube_fits_list: List[str],
         run_id: str,
         mom0_out_cube1: Union[str, None] = None,
         mom0_out_cube2: Union[str, None] = None) -> str:
         mom0_out_cube2: Union[str, None] = None,
         engine: sqlalchemy.engine = None) -> str:
    assert len(cube_fits_list) == 2
    _mom0_out_cube1 = validate_parameter(mom0_out_cube1, default=cube_fits_list[0].replace('.fits', '_mom0.fits'))
    _mom0_out_cube2 = validate_parameter(mom0_out_cube2, default=cube_fits_list[1].replace('.fits', '_mom0.fits'))
    config_prs = load_config_file(os.path.join('prs', 'config', 'config.yml'))['flux_computation']
    executed_on = datetime.now()
    if engine is None:
        engine = get_pg_engine(logger=logger)
    config_prs.update({
        'cube_fits_list': cube_fits_list,
+9 −5
Original line number Diff line number Diff line
@@ -3,10 +3,12 @@ import numpy as np
import os
import xarray as xr
from typing import Union, Tuple
from contextlib import closing
from itertools import product
from astropy.io import fits
from sqlalchemy.orm import Session
from sqlalchemy import and_, or_
from sqlalchemy import engine as sqla_engine
from stg.stg_build_db_structure import (GridPars,
                                        GridFiles,
                                        StarsPars,
@@ -92,11 +94,13 @@ def get_density_distribution(


def main(run_id: str,
         is_isothermal: bool):
         is_isothermal: bool,
         engine: sqla_engine = None):
    if engine is None:
        engine = get_pg_engine(logger=logger)
    config = load_config_file(os.path.join('config', 'config.yml'))

    with Session(engine) as session:
    with closing(Session(engine)) as session:
        # grid definition
        dust_temperatures = parse_grid_overrides(par_name='dust_temperature',
                                                 config=config)
@@ -154,5 +158,5 @@ def main(run_id: str,


if __name__ == '__main__':
    # main(run_id='55d05c03-192a-47da-9dcf-c41df1882868', is_isothermal=False)
    main(run_id='ba7fd3ed-5947-4dc6-bcef-38f151f19b77', is_isothermal=False)
    main(run_id='55d05c03-192a-47da-9dcf-c41df1882868', is_isothermal=False)
    # main(run_id='ba7fd3ed-5947-4dc6-bcef-38f151f19b77', is_isothermal=False)
Loading