Loading astrort/testing/test_utils/test_wrap.py +39 −1 Original line number Diff line number Diff line Loading @@ -7,8 +7,11 @@ # ***************************************************************************** import pytest import logging import numpy as np from astrort.utils.wrap import load_yaml_conf, randomise_pointing_sim, get_point_source_info from astrort.utils.wrap import * from astrort.configure.logging import set_logger from rtasci.lib.RTACtoolsSimulation import RTACtoolsSimulation @pytest.mark.test_conf_file def test_load_yaml_conf(test_conf_file): Loading Loading @@ -42,4 +45,39 @@ def test_get_point_source_info(test_conf_file): assert type(pointing['source_ra']) == type(np.float64(1)) assert type(pointing['source_dec']) == type(np.float64(1)) @pytest.mark.test_conf_file def test_write_simulation_info(test_conf_file): conf = load_yaml_conf(test_conf_file) conf['simulator']['pointing'] = {'ra': 1, 'dec': 1} pointing = get_point_source_info(conf['simulator']) datfile = join(conf['simulator']['output'], 'tmp.dat') sim = RTACtoolsSimulation() clock = 1 write_simulation_info(sim, conf['simulator'], pointing, datfile, clock) assert isfile(datfile) del sim @pytest.mark.test_tmp_folder @pytest.mark.test_conf_file def test_merge_simulation_info(test_conf_file, test_tmp_folder): conf = load_yaml_conf(test_conf_file) conf['simulator']['pointing'] = {'ra': 1, 'dec': 1} pointing = get_point_source_info(conf['simulator']) sim = RTACtoolsSimulation() clock = 1 for i in range(5): sim.seed = i datfile = join(conf['simulator']['output'], f'job_{i}.dat') write_simulation_info(sim, conf['simulator'], pointing, datfile, clock) assert isfile(datfile) log = set_logger(logging.CRITICAL, join(test_tmp_folder, 'test_set_logger.log')) merge_simulation_info(conf['simulator'], log) assert isfile(join(conf['simulator']['output'], 'merged_sim_data.dat')) del sim def test_write_mapping_info(): return No newline at end of file astrort/utils/wrap.py +24 −1 Original line number Diff line number Diff line Loading @@ -8,12 +8,12 @@ import yaml import numpy as np import pandas as pd import astropy.units as u from os.path import dirname, abspath, join, basename, isfile from astrort.utils.utils import * from astrort.configure.check_configuration import CheckConfiguration from rtasci.lib.RTAManageXml import ManageXml from rtasci.lib.RTAUtils import check_energy_thresholds from astropy.coordinates import SkyCoord def load_yaml_conf(yamlfile): Loading Loading @@ -89,4 +89,27 @@ def write_simulation_info(simulator, configuration, pointing, datfile, clock): f.write('name seed start stop duration source_ra source_dec point_ra point_dec offset irf computation_time\n') with open(datfile, 'a') as f: f.write(f'{name} {seed} {tstart} {tstop} {duration} {source_ra} {source_dec} {point_ra} {point_dec} {offset} {irf} {clock}\n') def merge_simulation_info(configuration, log): folder = configuration['output'] datfiles = [join(folder, f) for f in listdir(folder) if '.dat' in f and 'job' in f] merger = join(folder, 'merged_sim_data.dat') # check merger file if isfile(merger): log.warning(f"Merger output already exists, overwrite {merger}") with open(merger, 'w+') as f: f.write('name seed start stop duration source_ra source_dec point_ra point_dec offset irf computation_time\n') # collect data for i, datfile in enumerate(datfiles): log.info(f"Collect data from {datfile}") data = pd.read_csv(join(datfile), sep=' ') if i == 0: table = data else: table = pd.concat([table, data], ignore_index=True) log.info(f"Lines in data: {len(table)}") # write merger file table.to_csv(merger, index=False, header=True, sep=' ', na_rep=np.nan) def write_mapping_info(): return No newline at end of file Loading
astrort/testing/test_utils/test_wrap.py +39 −1 Original line number Diff line number Diff line Loading @@ -7,8 +7,11 @@ # ***************************************************************************** import pytest import logging import numpy as np from astrort.utils.wrap import load_yaml_conf, randomise_pointing_sim, get_point_source_info from astrort.utils.wrap import * from astrort.configure.logging import set_logger from rtasci.lib.RTACtoolsSimulation import RTACtoolsSimulation @pytest.mark.test_conf_file def test_load_yaml_conf(test_conf_file): Loading Loading @@ -42,4 +45,39 @@ def test_get_point_source_info(test_conf_file): assert type(pointing['source_ra']) == type(np.float64(1)) assert type(pointing['source_dec']) == type(np.float64(1)) @pytest.mark.test_conf_file def test_write_simulation_info(test_conf_file): conf = load_yaml_conf(test_conf_file) conf['simulator']['pointing'] = {'ra': 1, 'dec': 1} pointing = get_point_source_info(conf['simulator']) datfile = join(conf['simulator']['output'], 'tmp.dat') sim = RTACtoolsSimulation() clock = 1 write_simulation_info(sim, conf['simulator'], pointing, datfile, clock) assert isfile(datfile) del sim @pytest.mark.test_tmp_folder @pytest.mark.test_conf_file def test_merge_simulation_info(test_conf_file, test_tmp_folder): conf = load_yaml_conf(test_conf_file) conf['simulator']['pointing'] = {'ra': 1, 'dec': 1} pointing = get_point_source_info(conf['simulator']) sim = RTACtoolsSimulation() clock = 1 for i in range(5): sim.seed = i datfile = join(conf['simulator']['output'], f'job_{i}.dat') write_simulation_info(sim, conf['simulator'], pointing, datfile, clock) assert isfile(datfile) log = set_logger(logging.CRITICAL, join(test_tmp_folder, 'test_set_logger.log')) merge_simulation_info(conf['simulator'], log) assert isfile(join(conf['simulator']['output'], 'merged_sim_data.dat')) del sim def test_write_mapping_info(): return No newline at end of file
astrort/utils/wrap.py +24 −1 Original line number Diff line number Diff line Loading @@ -8,12 +8,12 @@ import yaml import numpy as np import pandas as pd import astropy.units as u from os.path import dirname, abspath, join, basename, isfile from astrort.utils.utils import * from astrort.configure.check_configuration import CheckConfiguration from rtasci.lib.RTAManageXml import ManageXml from rtasci.lib.RTAUtils import check_energy_thresholds from astropy.coordinates import SkyCoord def load_yaml_conf(yamlfile): Loading Loading @@ -89,4 +89,27 @@ def write_simulation_info(simulator, configuration, pointing, datfile, clock): f.write('name seed start stop duration source_ra source_dec point_ra point_dec offset irf computation_time\n') with open(datfile, 'a') as f: f.write(f'{name} {seed} {tstart} {tstop} {duration} {source_ra} {source_dec} {point_ra} {point_dec} {offset} {irf} {clock}\n') def merge_simulation_info(configuration, log): folder = configuration['output'] datfiles = [join(folder, f) for f in listdir(folder) if '.dat' in f and 'job' in f] merger = join(folder, 'merged_sim_data.dat') # check merger file if isfile(merger): log.warning(f"Merger output already exists, overwrite {merger}") with open(merger, 'w+') as f: f.write('name seed start stop duration source_ra source_dec point_ra point_dec offset irf computation_time\n') # collect data for i, datfile in enumerate(datfiles): log.info(f"Collect data from {datfile}") data = pd.read_csv(join(datfile), sep=' ') if i == 0: table = data else: table = pd.concat([table, data], ignore_index=True) log.info(f"Lines in data: {len(table)}") # write merger file table.to_csv(merger, index=False, header=True, sep=' ', na_rep=np.nan) def write_mapping_info(): return No newline at end of file