Commit e50acd61 authored by Andrea Giannetti's avatar Andrea Giannetti
Browse files

Added mom0 and ratio computation.

parent 7fa305ee
Loading
Loading
Loading
Loading
+4 −0
Original line number Diff line number Diff line
flux_computation:
    central_frequency: 230.000
    central_frequency_units: "GHz"
    integration_limits: "all"
 No newline at end of file
+79 −0
Original line number Diff line number Diff line
import os
from astropy.io import fits
from typing import Union


def compute_moment_zero(cube: Union[str, fits.PrimaryHDU],
                        cube_path: Union[str, None] = None,
                        moment_zero_path: Union[str, None] = None,
                        moment_zero_fits_name: Union[str, None] = None,
                        hdu_idx: int = 0):
    _cube_path = cube_path if cube_path is not None else os.path.join('prs', 'fits', 'cubes')
    _moment_zero_path = moment_zero_path if moment_zero_path is not None else os.path.join('prs', 'fits', 'moments')
    _moment_zero_fits_name = moment_zero_fits_name if moment_zero_fits_name is not None else 'test_mom0.fits'
    fitsfile = open_fits_file_duck_typing(fitsfile=cube, fits_path=_cube_path)
    header = fitsfile[hdu_idx].header.copy()
    data = fitsfile[hdu_idx].data
    mom0 = data.sum(axis=0) * abs(header['CDELT3'])
    keywords_to_delete = ['NAXIS3', 'CRPIX3', 'CDELT3', 'CRVAL3', 'CUNIT3', 'CTYPE3']
    header['BUNIT'] += f' {header["CUNIT3"].replace(" ", "")}'
    header['BTYPE'] = 'MOMENT0'
    header['NAXIS'] = 2
    for key in keywords_to_delete:
        del header[key]
    fits.writeto(os.path.join(_moment_zero_path, _moment_zero_fits_name), data=mom0, header=header, overwrite=True)


def open_fits_file_duck_typing(fitsfile: Union[str, fits.PrimaryHDU],
                               fits_path: str = None) -> fits.PrimaryHDU:
    _fits_path = fits_path if fits_path is not None else '.'
    try:
        hdu = fits.open(os.path.join(_fits_path, fitsfile))
    except TypeError:
        hdu = fitsfile
    return hdu


def compute_image_ratios(fits1: str,
                         fits2: str,
                         fits_path: Union[str, None] = None,
                         ratio_fits_path: Union[str, None] = None,
                         ratio_fits_name: Union[str, None] = None,
                         hdu1_idx: int = 0,
                         hdu2_idx: int = 0):
    _fits_path = fits_path if fits_path is not None else os.path.join('prs', 'fits', 'moments')
    _ratio_fits_path = ratio_fits_path if ratio_fits_path is not None else os.path.join('prs', 'fits', 'ratios')
    _ratio_fits_name = ratio_fits_name if ratio_fits_name is not None else 'ratio.fits'
    hdu1 = open_fits_file_duck_typing(fitsfile=fits1,
                                      fits_path=_fits_path)
    hdu2 = open_fits_file_duck_typing(fitsfile=fits2,
                                      fits_path=_fits_path)
    ratio_image_data = hdu1[hdu1_idx].data / hdu2[hdu2_idx].data
    output_header = hdu1[hdu1_idx].header
    output_header['BUNIT'] = ''
    output_header['BTYPE'] = 'INT.RATIO'
    fits.writeto(os.path.join(_ratio_fits_path, _ratio_fits_name),
                 data=ratio_image_data,
                 header=output_header,
                 overwrite=True)


def main(cube_fits1: str,
         cube_fits2: str,
         mom0_out_cube1:str,
         mom0_out_cube2:str,
         ratio_fits_name: Union[str, None] = None):
    compute_moment_zero(cube=cube_fits1,
                               moment_zero_fits_name=mom0_out_cube1)
    compute_moment_zero(cube=cube_fits2,
                               moment_zero_fits_name=mom0_out_cube2)
    compute_image_ratios(fits1=mom0_out_cube1,
                         fits2=mom0_out_cube2,
                         ratio_fits_name=ratio_fits_name)


if __name__ == '__main__':
    main(cube_fits1='test_cube.fits',
         cube_fits2='test_cube.fits',
         mom0_out_cube1='test_cube_mom0.fits',
         mom0_out_cube2='test_cube_mom0.fits')