Skip to content
utils.py 10.7 KiB
Newer Older
Alice Donini's avatar
Alice Donini committed
import argparse
import os
import pandas as pd
import numpy as np
import yaml
from operator import attrgetter
from astropy.coordinates import SkyCoord
from glob import glob
from argparse import HelpFormatter


class SortingHelpFormatter(HelpFormatter):
    def add_arguments(self, actions):
        actions = sorted(actions, key=attrgetter('option_strings'))
        super(SortingHelpFormatter, self).add_arguments(actions)


def parse_arguments(description, add_run=False, add_irf=False, add_job=False, add_dl3=False):
    parser = argparse.ArgumentParser(description=description, formatter_class=SortingHelpFormatter)

    parser.add_argument('--prod', '-p',
                        dest='prod', required=False, type=str, default='v0.9.4',
                        help='Prod to use (default: %(default)s)')
    parser.add_argument('--outdir', '-o',
                        dest='outdir', required=False, type=str, default=None,
                        help='Directory to store the output')
    parser.add_argument('--config', '-c',
                        type=str, default=None, dest='config',
                        help='Specify a personal config file for the analysis')
    parser.add_argument('--config-analysis',
                        type=str, default=None, dest='config_analysis',
                        help='Specify a config file which describes analysis profile to use')
    parser.add_argument('--verbose', '-v',
                        action='store_true', dest='verbose',
                        help='Increase output verbosity')

    if add_run:
        parser.add_argument('--source_name', '-n', required=True,
                            default=None, dest='source_name', type=str,
                            help='Name of the source')
        parser.add_argument('--tcuname',
                            default=None, dest='tcuname', type=str,
                            help='Apply run selection based on TCU source name')
        parser.add_argument('--runlist', '-rl',
                            default=None, dest='runlist', type=str,
                            help='File with a list of run and the associated night to be analysed')
        parser.add_argument('--distance', '-dis',
                            type=float, dest='distance', default=-1,
                            help='Max distance in degrees between the target position and the run pointing \
                                  position for the run selection, negative value means no selection using \
                                  this parameter (default: %(default)s).')
        parser.add_argument('--ra',
                            type=float, dest='ra', default=-1,
                            help='RA coordinate of the target. To add if you want to use custom position')
        parser.add_argument('--dec',
                            type=float, dest='dec', default=-91,
                            help='Dec coordinate of the target. To add if you want to use custom position')

    if add_job:
        parser.add_argument('--submit',
                            default=False, dest='submit', action='store_true', required=False,
                            help='Submit the cmd to slurm on site')
        parser.add_argument('--dry',
                            default=False, required=False, action='store_true', dest='dry',
                            help='Make a dry run, no true submission')
        parser.add_argument('--globber', '-g',
                            dest='globber', action='store_true', required=False, default=False,
                            help='If True, overwrites existing output file without asking')

    if add_irf:
        parser.add_argument('--gh_cut',
                            required=False, type=float, dest='gh_cut',
                            help='Fixed selection cut for gh_score (gammaness)')
        parser.add_argument('--theta_cut',
                            required=False, type=float, dest='theta_cut',
                            help='Fixed selection cut for theta')
        parser.add_argument('--obs_time',
                            required=False, type=float, dest='obs_time',
                            help='Observation time for IRF in hours')

    if add_dl3:
        parser.add_argument('--cut_file', '-cut',
                            default=None, dest='cut_file', type=str,
                            help='Cut file')
        parser.add_argument('--gh_cut',
                            required=False, type=float, dest='gh_cut',
                            help='Fixed selection cut for gh_score (gammaness)')
        parser.add_argument('--theta_cut',
                            required=False, type=float, dest='theta_cut',
                            help='Fixed selection cut for theta')
    args = parser.parse_args()
    return args


def get_db(database_filename):
    """
    Parameters
    ----------
    database_filename : name of database file
    
    ---------
    Returns the DB
    """

    db_file = os.environ.get('CONFIG_FOLDER') + '/' + database_filename
    database = pd.read_csv(db_file, index_col=0, parse_dates=True)
    return database


def get_config(config_filename):
    """
    Parameters
    ----------
    config_filename : name of the configuration file
    
    ---------
    Returns the configuration file
    """

    config_file = os.environ.get('CONFIG_FOLDER') + '/' + config_filename
    with open(config_file) as f:
        config_analysis = yaml.load(f, Loader=yaml.FullLoader)
    return config_analysis


def get_runs_database(args, database):

    # make the run list into a np array
    databaseRuns = np.array(database.index)

    # Apply selection of data if argument is specified. Based on tcuname, run or coordinates
    selection = database

    #if args.tcuname[0] == 'all' and args.night[0] == 'all' and args.runlist[0] == 'none':
    #    raise RuntimeError("Cannont make a run selection. Either tcuname, night or runlist or distance is needed")

    if args.tcuname is not None:
        selection = database.loc[database['Target']].isin([args.tcuname])
        if args.verbose:
            print("Selection of runs based on the TCU name", args.tcuname, ". Only run with the name in the TCU are kept")
            print(selection.index)

    # if args.night[0] != 'all':
    #     selection = database.loc[database['day'].isin(args.night)]
    #     if args.verbose:
    #         print("selection of night", args.night[0])
    #         print(selection.index)
    
    if args.distance > 0:
        if args.ra == -1 or args.dec == -91:
            raise RuntimeError("Cannont make a run selection. Ra and Dec value of the source are not given.")
        selection = database.loc[((database['RA_Obs'] - args.ra)**2 + (database['Dec_Obs'] - args.dec)**2) < args.distance**2]
        if args.verbose:
            print("Selection based on angular distance ", args.distance, "°")
            print(selection.index)

    databaseRuns = np.array(selection.index)
    if args.runlist is not None:
        rl = np.loadtxt(args.runlist, unpack=True, dtype=int)
        #databaseRuns = np.array([a for a in rl if a in databaseRuns])
        databaseRuns = np.array([a for a in rl])

    if args.verbose:
        print("Final run selection", databaseRuns)
    return databaseRuns


def create_DL1_list(args, config, runs, night):
    """
    create list with all the DL1 run path to analyze
    
    Parameters
        ----------
        runs: List of run numbers
        night: night to analyze
    """

    version = config['dl1_data']['version']
    cleaning = config['dl1_data']['cleaning']
    folder = config['data_folder'] + '/DL1/' + night + '/' + version + '/' + cleaning + '/dl1*'  # Current format of LST data path
    if args.verbose:
        print("Looking for files here :", folder)

    filepath_glob = glob(folder)

    # initializa data frame
    filelist = pd.DataFrame(columns=['path', 'night'])

    # Create a list of files with matching run numbers
    newEntry = {}
    for filename in filepath_glob:
        for run in runs:
            if (f"Run{run:>05}.h5" in filename):
                newEntry['path'] = filename
                newEntry['night'] = night
                filelist.loc[run] = newEntry['path'], newEntry['night']
    return filelist


def create_DL2_list(args, config, runs, night):
    """
        create list with all the DL2 run path to analyze
    
    Parameters
    ----------
    runs: List of run numbers
    """
    
    version = config['dl2_data']['version']
    cleaning = config['dl2_data']['cleaning']
    folder = config['data_folder'] + '/DL2/' + night + '/' + version + '/' + cleaning + '/dl2*'  # Current format of LST data path
    if args.verbose:
        print("looking for files here :", folder)

    filepath_glob = glob(folder)

    filelist = pd.DataFrame(columns=['path'])

    # Create a list of files with matching run numbers
    newEntry = {}
    for filename in filepath_glob:
        for run in runs:
            if (f"Run{run:>05}.h5" in filename):
                newEntry['path'] = filename
                filelist.loc[run] = newEntry
    return filelist


def manage_submission(args, config, cmd, run, level="3"):
    """
    Parameters
    ----------
    config: personal configuration file with analysis settings
    cmd : command to be run
    run : run number
    """

    print("Submission of the jobs to the slurm farm.")

    os.makedirs(config['jobmanager'], exist_ok=True)
    template = open(os.environ.get('CODE_DIR') + "/SubmitTemplate_dl" + level + ".sh", "r").readlines()

    scriptname = config['jobmanager'] + "/Script_dl" + level + "_" + str(run) + ".sh"
    logfile = config['jobmanager'] + "/Slurm_dl" + level + "_" + str(run) + ".out"
    script = open(scriptname, "w")
    for t in template:
        script.write(t.replace("jobname", "Job_DL" + level + "_" + f"Run{run:>05}").replace("logfile", logfile))

    script.write("\n")
    script.write(cmd)
    script.close()
    os.system("chmod +x " + scriptname)
    return scriptname


def get_coordinates(args):
    """
    returns the name and the Ra/Dec of the source
    """

    if args.source_name is None:
        raise ValueError("Please provide the name of the analysed source by using --source_name")

    if args.verbose:
        print("Search Coordinates for ", args.source_name, " using Astropy and based on the target name.")
    try:
        c = SkyCoord.from_name(args.source_name)
        ra = c.ra.value
        dec = c.dec.value
    except:
        print("Cannot resolve target name", args.source_name, "using Astropy. Switch to the Ra and Dec provided.")
        if args.ra >= 0 and args.ra < 360 and args.dec >= -90 and args.dec < 90:
            ra = args.ra
            dec = args.dec
            print("Using user provided RA and Dec.")
        else:
            print("Please provide RA and Dec values by using --ra and --dec")
            exit(0)
    return ra, dec