Loading noctua/templates/snapshot.py 0 → 100644 +263 −0 Original line number Diff line number Diff line #!/usr/bin/env python3 # -*- coding: utf-8 -*- # System modules from time import sleep # Third-party modules from astropy.time import Time # Other templates from ..config.constants import (camera_state, filter_name, filter_number, filter_state, frame_number, frame_type, image_state, temp_fits, viewer_fits_path) from .. import devices as _dev from ..devices import dom, tel from ..utils.coordinates import apply_offset from ..utils.logger import log from .basetemplate import BaseTemplate from .fillheader import Template as FillHeader from .lampsoff import Template as LampsOff class Template(BaseTemplate): def __init__(self): super().__init__() self.name = "snapshot" self.description = "Observes a field with the imaging or spectroscopy camera" # self.box = 300 # self.recenter = False # #self.domeslewing = None # if not hasattr(self, "domeslewing"): # log.warning("domeslewing attribute undefined, setting to False") # self.domeslewing = False def content(self, params): ######################## ##### Params check ##### ######################## try: camera_name = params.get("camera") or "cam" cam = getattr(_dev, camera_name) fits_file = viewer_fits_path(getattr(cam, '_viewer_key', None)) objname = params.get("object") or "test" repeat = params.get("repeat") or 1 imagetype = params.get("imagetype") or "Light" binning = params.get("binning") or cam.binning filt = params.get("filter") or ( filter_name.get(cam.filter) if hasattr(cam, 'filter') else None) xystart = params.get("xystart") or cam.xystart xyend = params.get("xyend") or cam.xyend exptime = params["exptime"] self.domeslewing = params.get("domeslewing") or False self.recenter = params.get("recenter") or False except KeyError as e: msg = f"Parameter {e} not found" log.error(msg) self.error.append(msg) return ######################################### ##### Switch off lamps if not Flat ##### ######################################### if imagetype != "Flat": log.info("Not a 'Flat' imagetype: switching off lamps") lao = LampsOff() lao.run({}) if imagetype != "Light": log.info("Not a 'Light' imagetype: switching off recentering") self.recenter = False ######################## ##### Camera setup ##### ######################## # Bringing back the camera to Idle state. if cam.state != 0: log.warning( f"Resetting camera from state {camera_state[cam.state]}.") cam.abort() sleep(0.1) log.info(f"Camera is {camera_state[cam.state]}") # Changing binning if not the same as the current one if binning != cam.binning: log.info(f"Changing binning to {binning}") cam.binning = [binning, binning] sleep(0.1) log.info(f"Camera is {camera_state[cam.state]}") #################################### ##### Filter setup, if imaging ##### #################################### # Changing filter if not the same as the current one (only if camera has a filter wheel) if filt and hasattr(cam, 'filter'): # On bad ctrl-c, filter can be 15 (?!) log.warning(f"Filter is number {cam.filter}") log.warning( f"Filter is {filter_name[cam.filter]}, we have selected {filt}") if filter_number[filt] != cam.filter: msg = f"Changing filter to {filt}, " msg += f"Which is number {filter_number[filt]}" log.info(msg) cam.filter = filter_number[filt] sleep(0.2) status = 1 while status != 0: self.check_pause_or_abort() try: status = int(cam.is_moving) except ValueError as e: msg = f"Error trying to contact the camera: {e}" log.error(msg) self.error.append(msg) if status == 2: msg = f"Filter {filter_state[status]}" log.error(msg) self.error.append(msg) return else: log.warning(f"Filter {filter_state[status]}") sleep(0.75) # Changing windowing if xystart and xyend: log.info( f"Changing windowing to xystart:{xystart}, xyend:{xyend}") cam.set_window(xystart[0], xystart[1], xyend[0]-xystart[0], xyend[1]-xystart[1]) else: msg = f"No windowing: full frame in bininng {binning}" log.warning(msg) cam.full_frame() ######################### ##### Exposure loop ##### ######################### log.info(f"Starting {repeat} {imagetype} frames of {exptime}s") # Looping on the number of exposures for exposure in range(repeat): if self.check_pause_or_abort(): return # RA, DEC, ALT, AZ, UTC are asked to cabinet # simultaneously in order to avoid latency. log.info(f"Asking cabinet time and coordinates") basic_cabinet = tel.coordinates gps = Time(tel.coordinates["utc"], format="unix") log.info(f"Starting Exposure {exposure + 1} of {repeat}") ########################## ##### Start exposure ##### ########################## log.debug(frame_number[imagetype]) try: cam.start(exptime, frame_number[imagetype], datetime=gps.isot) except Exception as e: msg = f"An error occurred : {e}" log.error(msg) self.error.append(msg) return # Some debug time counter now = Time.now() # Checking if the exposure is still running status = 1 while status != 0: if self.check_pause_or_abort(): log.debug("check_pause_or_abort returned true. I'm here") return try: status = int(cam.state) except TypeError as e: msg = f"Error trying to read camera status: {e}" log.error(msg) self.error.append(msg) if status == 5: # i.e. error in camera msg = f"Camera {camera_state[status]}" log.error(msg) self.error.append(msg) return else: elapsed = (Time.now() - now).sec.item() msg = f"Camera { camera_state[status]}, { elapsed:.1f}/{exptime}s " msg += f"of exposure {exposure}/{repeat - 1}" log.warning(msg) sleep(0.75) log.debug("check_pause_or_abort before the download") if self.check_pause_or_abort(): log.debug("check_pause_or_abort returned true. I'm here") return ################################ ##### Download from camera ##### ################################ log.info(f"Downloading frame as {temp_fits}") # Checking if the image is ready to be downloaded while cam.ready != 1: log.debug(f"Downloading... (cam state {image_state[cam.ready]})") sleep(0.5) if self.check_pause_or_abort(): log.warning(f"Buffer still {image_state[cam.ready]}") return log.debug(f"Downloading... (cam state {image_state[cam.ready]})") cam.download() log.info(f"Elapsed {(Time.now() - now).sec.item():.2f}") ################################## ##### Update header and save ##### ################################## fih_params = { "object": objname, "camera": camera_name, "imagetyp": imagetype, "fits_file": fits_file, } fih_params.update(basic_cabinet) # radec, altaz, utc, lst fih = FillHeader() fih.run(fih_params) log.debug(f"observation {fih.filename}") self.filename = fih.filename self.filenames.append(fih.filename) log.debug(f"filenames from obs tpl {self.filenames}") return Loading
noctua/templates/snapshot.py 0 → 100644 +263 −0 Original line number Diff line number Diff line #!/usr/bin/env python3 # -*- coding: utf-8 -*- # System modules from time import sleep # Third-party modules from astropy.time import Time # Other templates from ..config.constants import (camera_state, filter_name, filter_number, filter_state, frame_number, frame_type, image_state, temp_fits, viewer_fits_path) from .. import devices as _dev from ..devices import dom, tel from ..utils.coordinates import apply_offset from ..utils.logger import log from .basetemplate import BaseTemplate from .fillheader import Template as FillHeader from .lampsoff import Template as LampsOff class Template(BaseTemplate): def __init__(self): super().__init__() self.name = "snapshot" self.description = "Observes a field with the imaging or spectroscopy camera" # self.box = 300 # self.recenter = False # #self.domeslewing = None # if not hasattr(self, "domeslewing"): # log.warning("domeslewing attribute undefined, setting to False") # self.domeslewing = False def content(self, params): ######################## ##### Params check ##### ######################## try: camera_name = params.get("camera") or "cam" cam = getattr(_dev, camera_name) fits_file = viewer_fits_path(getattr(cam, '_viewer_key', None)) objname = params.get("object") or "test" repeat = params.get("repeat") or 1 imagetype = params.get("imagetype") or "Light" binning = params.get("binning") or cam.binning filt = params.get("filter") or ( filter_name.get(cam.filter) if hasattr(cam, 'filter') else None) xystart = params.get("xystart") or cam.xystart xyend = params.get("xyend") or cam.xyend exptime = params["exptime"] self.domeslewing = params.get("domeslewing") or False self.recenter = params.get("recenter") or False except KeyError as e: msg = f"Parameter {e} not found" log.error(msg) self.error.append(msg) return ######################################### ##### Switch off lamps if not Flat ##### ######################################### if imagetype != "Flat": log.info("Not a 'Flat' imagetype: switching off lamps") lao = LampsOff() lao.run({}) if imagetype != "Light": log.info("Not a 'Light' imagetype: switching off recentering") self.recenter = False ######################## ##### Camera setup ##### ######################## # Bringing back the camera to Idle state. if cam.state != 0: log.warning( f"Resetting camera from state {camera_state[cam.state]}.") cam.abort() sleep(0.1) log.info(f"Camera is {camera_state[cam.state]}") # Changing binning if not the same as the current one if binning != cam.binning: log.info(f"Changing binning to {binning}") cam.binning = [binning, binning] sleep(0.1) log.info(f"Camera is {camera_state[cam.state]}") #################################### ##### Filter setup, if imaging ##### #################################### # Changing filter if not the same as the current one (only if camera has a filter wheel) if filt and hasattr(cam, 'filter'): # On bad ctrl-c, filter can be 15 (?!) log.warning(f"Filter is number {cam.filter}") log.warning( f"Filter is {filter_name[cam.filter]}, we have selected {filt}") if filter_number[filt] != cam.filter: msg = f"Changing filter to {filt}, " msg += f"Which is number {filter_number[filt]}" log.info(msg) cam.filter = filter_number[filt] sleep(0.2) status = 1 while status != 0: self.check_pause_or_abort() try: status = int(cam.is_moving) except ValueError as e: msg = f"Error trying to contact the camera: {e}" log.error(msg) self.error.append(msg) if status == 2: msg = f"Filter {filter_state[status]}" log.error(msg) self.error.append(msg) return else: log.warning(f"Filter {filter_state[status]}") sleep(0.75) # Changing windowing if xystart and xyend: log.info( f"Changing windowing to xystart:{xystart}, xyend:{xyend}") cam.set_window(xystart[0], xystart[1], xyend[0]-xystart[0], xyend[1]-xystart[1]) else: msg = f"No windowing: full frame in bininng {binning}" log.warning(msg) cam.full_frame() ######################### ##### Exposure loop ##### ######################### log.info(f"Starting {repeat} {imagetype} frames of {exptime}s") # Looping on the number of exposures for exposure in range(repeat): if self.check_pause_or_abort(): return # RA, DEC, ALT, AZ, UTC are asked to cabinet # simultaneously in order to avoid latency. log.info(f"Asking cabinet time and coordinates") basic_cabinet = tel.coordinates gps = Time(tel.coordinates["utc"], format="unix") log.info(f"Starting Exposure {exposure + 1} of {repeat}") ########################## ##### Start exposure ##### ########################## log.debug(frame_number[imagetype]) try: cam.start(exptime, frame_number[imagetype], datetime=gps.isot) except Exception as e: msg = f"An error occurred : {e}" log.error(msg) self.error.append(msg) return # Some debug time counter now = Time.now() # Checking if the exposure is still running status = 1 while status != 0: if self.check_pause_or_abort(): log.debug("check_pause_or_abort returned true. I'm here") return try: status = int(cam.state) except TypeError as e: msg = f"Error trying to read camera status: {e}" log.error(msg) self.error.append(msg) if status == 5: # i.e. error in camera msg = f"Camera {camera_state[status]}" log.error(msg) self.error.append(msg) return else: elapsed = (Time.now() - now).sec.item() msg = f"Camera { camera_state[status]}, { elapsed:.1f}/{exptime}s " msg += f"of exposure {exposure}/{repeat - 1}" log.warning(msg) sleep(0.75) log.debug("check_pause_or_abort before the download") if self.check_pause_or_abort(): log.debug("check_pause_or_abort returned true. I'm here") return ################################ ##### Download from camera ##### ################################ log.info(f"Downloading frame as {temp_fits}") # Checking if the image is ready to be downloaded while cam.ready != 1: log.debug(f"Downloading... (cam state {image_state[cam.ready]})") sleep(0.5) if self.check_pause_or_abort(): log.warning(f"Buffer still {image_state[cam.ready]}") return log.debug(f"Downloading... (cam state {image_state[cam.ready]})") cam.download() log.info(f"Elapsed {(Time.now() - now).sec.item():.2f}") ################################## ##### Update header and save ##### ################################## fih_params = { "object": objname, "camera": camera_name, "imagetyp": imagetype, "fits_file": fits_file, } fih_params.update(basic_cabinet) # radec, altaz, utc, lst fih = FillHeader() fih.run(fih_params) log.debug(f"observation {fih.filename}") self.filename = fih.filename self.filenames.append(fih.filename) log.debug(f"filenames from obs tpl {self.filenames}") return