Loading noctua/api/camera.py +8 −5 Original line number Diff line number Diff line #!/usr/bin/env python3 # -*- coding: utf-8 -*- Loading Loading @@ -70,7 +71,8 @@ class Filter(BaseResource): async def get(self): """Retrieve the current filter.""" raw = await self.run_blocking(lambda: self.dev.filter) raw = await self.run_blocking(lambda: getattr(self.dev, 'filter', 0)) #raw = await self.run_blocking(lambda: self.dev.filter) res = constants.filter_name.get(raw, "Undef.") return self.make_response(res, raw_data=raw) Loading @@ -80,7 +82,8 @@ class FilterMovement(BaseResource): async def get(self): """Check if the filter wheel is moving.""" raw = await self.run_blocking(lambda: self.dev.is_moving) raw = await self.run_blocking(lambda: getattr(self.dev, 'is_moving', 0)) # raw = await self.run_blocking(lambda: self.dev.is_moving) res = constants.filter_state.get(raw, "Off") return self.make_response(res, raw_data=raw) Loading Loading @@ -258,8 +261,8 @@ class CoolerWarmup(BaseResource): async def post(self): """Start the warm up the CCD.""" return self.make_response("Warmup sequence initiated") res = await self.run_blocking(lambda: self.dev.put('cooler', False)) return self.make_response(res) async def delete(self): """Stop the warm up of the CCD.""" Loading noctua/api/stage.py 0 → 100644 +88 −0 Original line number Diff line number Diff line #!/usr/bin/env python3 # -*- coding: utf-8 -*- '''REST API for PI Mercury Linear Stage operations''' # Custom modules from .baseresource import BaseResource class Position(BaseResource): """Manage the absolute position of the stage in mm.""" async def get(self): """Retrieve the current stage position.""" res = await self.run_blocking(lambda: self.dev.position) return self.make_response(res) async def put(self): """Move the stage to an absolute position.""" target = await self.get_payload() # expected float def action(): self.dev.position = target return self.dev.position res = await self.run_blocking(action) return self.make_response(res) class Named(BaseResource): """Manage named positions (e.g., 'imaging', 'spectroscopy').""" async def get(self): """Get the name of the current position if matched.""" res = await self.run_blocking(lambda: self.dev.named) return self.make_response(res) async def put(self): """Move the stage to a named position.""" target_name = await self.get_payload() # expected string def action(): self.dev.named = target_name return self.dev.named try: res = await self.run_blocking(action) return self.make_response(res) except KeyError as e: return self.make_response(None, errors=[str(e)], status_code=400) class Initialization(BaseResource): """Handle stage homing and status.""" async def get(self): """Check if the stage is initialized and referenced.""" res = await self.run_blocking(lambda: self.dev.is_init) return self.make_response(res) async def post(self): """Start the initialization (FNL/FPL) sequence.""" await self.run_blocking(self.dev.init) res = await self.run_blocking(lambda: self.dev.is_init) return self.make_response(res) class Movement(BaseResource): """Monitor stage motion status.""" async def get(self): """Check if the stage is currently moving.""" res = await self.run_blocking(lambda: self.dev.is_moving) return self.make_response(res) class Limits(BaseResource): """Manage software movement limits.""" async def get(self): """Retrieve the software limits [min, max].""" res = await self.run_blocking(lambda: self.dev.limits) return self.make_response(res) async def put(self): """Set new software limits.""" target_limits = await self.get_payload() # expected [float, float] def action(): self.dev.limits = target_limits return self.dev.limits res = await self.run_blocking(action) return self.make_response(res) noctua/config/api.ini +25 −0 Original line number Diff line number Diff line Loading @@ -248,6 +248,30 @@ device = cam # resource = Status # device = cam ############## # stage ############## [/stage/position] resource = Position device = stage [/stage/named] resource = Named device = stage [/stage/status] resource = Initialization device = stage [/stage/movement] resource = Movement device = stage [/stage/limits] resource = Limits device = stage ############## # webcam ############## Loading Loading @@ -283,3 +307,4 @@ device = ipcam # [/environment/internal/reception] # resource = Temperature # device = rec noctua/config/devices.ini +16 −5 Original line number Diff line number Diff line Loading @@ -19,6 +19,22 @@ pos_imaging = 120 pos_spectro = 110 pos_echelle = 100 [cam] module = stx class = Camera node = STX [cam2] module = atik class = Camera node = DUMMY [pdu_cam] module = netio class = Switch node = PDU outlet = 3 [pdu_teccam] module = netio class = Switch Loading Loading @@ -90,11 +106,6 @@ outlet = 7 # node = ASCOM_REMOTE # outlet = 3 # [cam] # module = stx # class = Camera # node = STX # [cab] # module = siemens # class = Switch Loading noctua/devices/atik.py 0 → 100644 +260 −0 Original line number Diff line number Diff line #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Interface with a Atik camera device using the SDK SDK Downloaded from https://downloads.atik-cameras.com/AtikCamerasSDK_2025_11_11_Master_2111.zip sudo cp ../lib/Linux/atik.rules /usr/lib/udev/rules.d/ sudo udevadm control --reload-rules && sudo udevadm trigger sudo cp ../lib/Linux/64/NoFlyCapture/libatikcameras.so /usr/lib/ sudo cp ../include/Atik* /usr/include/ """ # System modules import ctypes import time import numpy as np from datetime import datetime # Third-party modules from astropy.io import fits # Custom modules from .basedevice import BaseDevice from ..utils.logger import log from ..config.constants import temp_fits class ArtemisProperties(ctypes.Structure): _fields_ = [ ("Protocol", ctypes.c_int), ("nPixelsX", ctypes.c_int), ("nPixelsY", ctypes.c_int), ("PixelMicronsX", ctypes.c_float), ("PixelMicronsY", ctypes.c_float), ("ccdflags", ctypes.c_int), ("cameraflags", ctypes.c_int), ("Description", ctypes.c_char * 40), ("Manufacturer", ctypes.c_char * 40) ] class Camera(BaseDevice): def __init__(self, url): super().__init__(url) self._lib = None self._handle = None self._props = ArtemisProperties() try: self._lib = ctypes.CDLL("libatikcameras.so") self._lib.ArtemisConnect.restype = ctypes.c_void_p self._lib.ArtemisImageBuffer.restype = ctypes.c_void_p self._lib.ArtemisExposureTimeRemaining.restype = ctypes.c_float except Exception as e: log.error(f"Atik SDK load failed: {e}") def _check_connection(self): """Internal method to manage the persistent camera handle.""" if self._handle is None and self._lib: count = self._lib.ArtemisDeviceCount() log.debug(f"SDK reported {count} devices.") if count > 0: self._handle = self._lib.ArtemisConnect(0) if self._handle: self._lib.ArtemisProperties(self._handle, ctypes.byref(self._props)) else: msg = "Atik connected but failed to acquire handle" if msg not in self.error: self.error.append(msg) else: # FIX: Aggiungiamo l'errore qui se la camera non c'è msg = "No Atik devices detected on USB" if msg not in self.error: self.error.append(msg) log.error(msg) return self._handle def __del__(self): """Safely disconnect from the camera and unload library.""" if self._handle: try: self._lib.ArtemisDisconnect(self._handle) log.info("Atik camera disconnected.") except: pass # --- Generic Get/Put for compatibility --- def get(self, key): """Generic getter wrapper.""" return getattr(self, key) def put(self, key, value): """Generic setter wrapper.""" setattr(self, key, value) @property def connection(self): if not self._lib: return False h = self._check_connection() return bool(self._lib.ArtemisIsConnected(h)) if h else False # --- Exposure Methods --- def start(self, duration, frametype, dt=None): h = self._check_connection() if not h: return # Using put internally for consistency is_dark = True if frametype in [0, 2, "Dark", "Bias"] else False self._lib.ArtemisSetDarkMode(h, is_dark) self._lib.ArtemisStartExposure(h, ctypes.c_float(duration)) def abort(self): h = self._check_connection() if h: self._lib.ArtemisAbortExposure(h) def download(self, filepath=temp_fits): h = self._check_connection() if not h: return while not self._lib.ArtemisImageReady(h): time.sleep(0.1) x, y, w, h_img, bx, by = [ctypes.c_int() for _ in range(6)] self._lib.ArtemisGetImageData(h, ctypes.byref(x), ctypes.byref(y), ctypes.byref(w), ctypes.byref(h_img), ctypes.byref(bx), ctypes.byref(by)) buf_ptr = self._lib.ArtemisImageBuffer(h) if buf_ptr: size = w.value * h_img.value buffer = (ctypes.c_uint16 * size).from_address(buf_ptr) data = np.frombuffer(buffer, dtype=np.uint16).reshape(h_img.value, w.value) fits.PrimaryHDU(data).writeto(filepath, overwrite=True) # --- Windowing Methods --- def full_frame(self): h = self._check_connection() if h: # Internal use of put is possible but here we call the SDK directly self._lib.ArtemisSubframe(h, 0, 0, self._props.nPixelsX, self._props.nPixelsY) return self.get('max_range') def half_frame(self): h = self._check_connection() if h: m = self.get('max_range') w, h_size = m[0] // 2, m[1] // 2 x, y = w // 2, h_size // 2 self._lib.ArtemisSubframe(h, x, y, w, h_size) return [self._props.nPixelsX // 2, self._props.nPixelsY // 2] def small_frame(self): h = self._check_connection() if h: m = self.get('max_range') w, h_size = m[0] // 10, m[1] // 10 x, y = (m[0] * 9) // 20, (m[1] * 9) // 20 self._lib.ArtemisSubframe(h, x, y, w, h_size) return [self._props.nPixelsX // 10, self._props.nPixelsY // 10] # --- Properties --- @property def state(self): h = self._check_connection() return self._lib.ArtemisCameraState(h) if h else -1 @property def temperature(self): h = self._check_connection() if not h: return None temp = ctypes.c_int() self._lib.ArtemisTemperatureSensorInfo(h, 1, ctypes.byref(temp)) return temp.value / 100.0 @temperature.setter def temperature(self, t): h = self._check_connection() if h: self._lib.ArtemisSetCooling(h, int(t * 100)) @property def cooler(self): h = self._check_connection() if not h: return False flags = ctypes.c_int() self._lib.ArtemisCoolingInfo(h, ctypes.byref(flags), ctypes.byref(ctypes.c_int()), ctypes.byref(ctypes.c_int()), ctypes.byref(ctypes.c_int()), ctypes.byref(ctypes.c_int())) return bool(flags.value & 64) @cooler.setter def cooler(self, b): h = self._check_connection() if not h: return if b: self.put('temperature', -10.0) else: self._lib.ArtemisCoolerWarmUp(h) @property def binning(self): h = self._check_connection() bx, by = ctypes.c_int(), ctypes.c_int() self._lib.ArtemisGetBin(h, ctypes.byref(bx), ctypes.byref(by)) return [bx.value, by.value] @binning.setter def binning(self, b): h = self._check_connection() if h: self._lib.ArtemisBin(h, b[0], b[1]) @property def filter(self): return 0 @property def max_range(self): return [self._props.nPixelsX, self._props.nPixelsY] @property def all(self): """ Get a dictionary of all relevant camera settings and status. Matches the structure of the STX driver. """ h = self._check_connection() if not h: return {} # Get Cooling Information from SDK # ArtemisCoolingInfo returns: flags, level, minlvl, maxlvl, setpoint flags, level, minl, maxl, setp = [ctypes.c_int() for _ in range(5)] self._lib.ArtemisCoolingInfo(h, ctypes.byref(flags), ctypes.byref(level), ctypes.byref(minl), ctypes.byref(maxl), ctypes.byref(setp)) # Get Binning bx, by = ctypes.c_int(), ctypes.c_int() self._lib.ArtemisGetBin(h, ctypes.byref(bx), ctypes.byref(by)) # Calculate fan/cooler power percentage (level is usually 0-255) # We scale it to 0-100 to match Noctua standards. fan_power = round((level.value / 255.0) * 100) if maxl.value > 0 else 0 res = { "ambient": None, # Not available on Atik 11000 via standard cooling call "setpoint": setp.value / 100.0, "temperature": self.temperature, "cooler": bool(flags.value & 64), # Bit 6 is Cooling ON "fan": fan_power, "binning": [bx.value, by.value], "max_range": self.max_range, "state": self.state, "description": self._props.Description.decode() } return res Loading
noctua/api/camera.py +8 −5 Original line number Diff line number Diff line #!/usr/bin/env python3 # -*- coding: utf-8 -*- Loading Loading @@ -70,7 +71,8 @@ class Filter(BaseResource): async def get(self): """Retrieve the current filter.""" raw = await self.run_blocking(lambda: self.dev.filter) raw = await self.run_blocking(lambda: getattr(self.dev, 'filter', 0)) #raw = await self.run_blocking(lambda: self.dev.filter) res = constants.filter_name.get(raw, "Undef.") return self.make_response(res, raw_data=raw) Loading @@ -80,7 +82,8 @@ class FilterMovement(BaseResource): async def get(self): """Check if the filter wheel is moving.""" raw = await self.run_blocking(lambda: self.dev.is_moving) raw = await self.run_blocking(lambda: getattr(self.dev, 'is_moving', 0)) # raw = await self.run_blocking(lambda: self.dev.is_moving) res = constants.filter_state.get(raw, "Off") return self.make_response(res, raw_data=raw) Loading Loading @@ -258,8 +261,8 @@ class CoolerWarmup(BaseResource): async def post(self): """Start the warm up the CCD.""" return self.make_response("Warmup sequence initiated") res = await self.run_blocking(lambda: self.dev.put('cooler', False)) return self.make_response(res) async def delete(self): """Stop the warm up of the CCD.""" Loading
noctua/api/stage.py 0 → 100644 +88 −0 Original line number Diff line number Diff line #!/usr/bin/env python3 # -*- coding: utf-8 -*- '''REST API for PI Mercury Linear Stage operations''' # Custom modules from .baseresource import BaseResource class Position(BaseResource): """Manage the absolute position of the stage in mm.""" async def get(self): """Retrieve the current stage position.""" res = await self.run_blocking(lambda: self.dev.position) return self.make_response(res) async def put(self): """Move the stage to an absolute position.""" target = await self.get_payload() # expected float def action(): self.dev.position = target return self.dev.position res = await self.run_blocking(action) return self.make_response(res) class Named(BaseResource): """Manage named positions (e.g., 'imaging', 'spectroscopy').""" async def get(self): """Get the name of the current position if matched.""" res = await self.run_blocking(lambda: self.dev.named) return self.make_response(res) async def put(self): """Move the stage to a named position.""" target_name = await self.get_payload() # expected string def action(): self.dev.named = target_name return self.dev.named try: res = await self.run_blocking(action) return self.make_response(res) except KeyError as e: return self.make_response(None, errors=[str(e)], status_code=400) class Initialization(BaseResource): """Handle stage homing and status.""" async def get(self): """Check if the stage is initialized and referenced.""" res = await self.run_blocking(lambda: self.dev.is_init) return self.make_response(res) async def post(self): """Start the initialization (FNL/FPL) sequence.""" await self.run_blocking(self.dev.init) res = await self.run_blocking(lambda: self.dev.is_init) return self.make_response(res) class Movement(BaseResource): """Monitor stage motion status.""" async def get(self): """Check if the stage is currently moving.""" res = await self.run_blocking(lambda: self.dev.is_moving) return self.make_response(res) class Limits(BaseResource): """Manage software movement limits.""" async def get(self): """Retrieve the software limits [min, max].""" res = await self.run_blocking(lambda: self.dev.limits) return self.make_response(res) async def put(self): """Set new software limits.""" target_limits = await self.get_payload() # expected [float, float] def action(): self.dev.limits = target_limits return self.dev.limits res = await self.run_blocking(action) return self.make_response(res)
noctua/config/api.ini +25 −0 Original line number Diff line number Diff line Loading @@ -248,6 +248,30 @@ device = cam # resource = Status # device = cam ############## # stage ############## [/stage/position] resource = Position device = stage [/stage/named] resource = Named device = stage [/stage/status] resource = Initialization device = stage [/stage/movement] resource = Movement device = stage [/stage/limits] resource = Limits device = stage ############## # webcam ############## Loading Loading @@ -283,3 +307,4 @@ device = ipcam # [/environment/internal/reception] # resource = Temperature # device = rec
noctua/config/devices.ini +16 −5 Original line number Diff line number Diff line Loading @@ -19,6 +19,22 @@ pos_imaging = 120 pos_spectro = 110 pos_echelle = 100 [cam] module = stx class = Camera node = STX [cam2] module = atik class = Camera node = DUMMY [pdu_cam] module = netio class = Switch node = PDU outlet = 3 [pdu_teccam] module = netio class = Switch Loading Loading @@ -90,11 +106,6 @@ outlet = 7 # node = ASCOM_REMOTE # outlet = 3 # [cam] # module = stx # class = Camera # node = STX # [cab] # module = siemens # class = Switch Loading
noctua/devices/atik.py 0 → 100644 +260 −0 Original line number Diff line number Diff line #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Interface with a Atik camera device using the SDK SDK Downloaded from https://downloads.atik-cameras.com/AtikCamerasSDK_2025_11_11_Master_2111.zip sudo cp ../lib/Linux/atik.rules /usr/lib/udev/rules.d/ sudo udevadm control --reload-rules && sudo udevadm trigger sudo cp ../lib/Linux/64/NoFlyCapture/libatikcameras.so /usr/lib/ sudo cp ../include/Atik* /usr/include/ """ # System modules import ctypes import time import numpy as np from datetime import datetime # Third-party modules from astropy.io import fits # Custom modules from .basedevice import BaseDevice from ..utils.logger import log from ..config.constants import temp_fits class ArtemisProperties(ctypes.Structure): _fields_ = [ ("Protocol", ctypes.c_int), ("nPixelsX", ctypes.c_int), ("nPixelsY", ctypes.c_int), ("PixelMicronsX", ctypes.c_float), ("PixelMicronsY", ctypes.c_float), ("ccdflags", ctypes.c_int), ("cameraflags", ctypes.c_int), ("Description", ctypes.c_char * 40), ("Manufacturer", ctypes.c_char * 40) ] class Camera(BaseDevice): def __init__(self, url): super().__init__(url) self._lib = None self._handle = None self._props = ArtemisProperties() try: self._lib = ctypes.CDLL("libatikcameras.so") self._lib.ArtemisConnect.restype = ctypes.c_void_p self._lib.ArtemisImageBuffer.restype = ctypes.c_void_p self._lib.ArtemisExposureTimeRemaining.restype = ctypes.c_float except Exception as e: log.error(f"Atik SDK load failed: {e}") def _check_connection(self): """Internal method to manage the persistent camera handle.""" if self._handle is None and self._lib: count = self._lib.ArtemisDeviceCount() log.debug(f"SDK reported {count} devices.") if count > 0: self._handle = self._lib.ArtemisConnect(0) if self._handle: self._lib.ArtemisProperties(self._handle, ctypes.byref(self._props)) else: msg = "Atik connected but failed to acquire handle" if msg not in self.error: self.error.append(msg) else: # FIX: Aggiungiamo l'errore qui se la camera non c'è msg = "No Atik devices detected on USB" if msg not in self.error: self.error.append(msg) log.error(msg) return self._handle def __del__(self): """Safely disconnect from the camera and unload library.""" if self._handle: try: self._lib.ArtemisDisconnect(self._handle) log.info("Atik camera disconnected.") except: pass # --- Generic Get/Put for compatibility --- def get(self, key): """Generic getter wrapper.""" return getattr(self, key) def put(self, key, value): """Generic setter wrapper.""" setattr(self, key, value) @property def connection(self): if not self._lib: return False h = self._check_connection() return bool(self._lib.ArtemisIsConnected(h)) if h else False # --- Exposure Methods --- def start(self, duration, frametype, dt=None): h = self._check_connection() if not h: return # Using put internally for consistency is_dark = True if frametype in [0, 2, "Dark", "Bias"] else False self._lib.ArtemisSetDarkMode(h, is_dark) self._lib.ArtemisStartExposure(h, ctypes.c_float(duration)) def abort(self): h = self._check_connection() if h: self._lib.ArtemisAbortExposure(h) def download(self, filepath=temp_fits): h = self._check_connection() if not h: return while not self._lib.ArtemisImageReady(h): time.sleep(0.1) x, y, w, h_img, bx, by = [ctypes.c_int() for _ in range(6)] self._lib.ArtemisGetImageData(h, ctypes.byref(x), ctypes.byref(y), ctypes.byref(w), ctypes.byref(h_img), ctypes.byref(bx), ctypes.byref(by)) buf_ptr = self._lib.ArtemisImageBuffer(h) if buf_ptr: size = w.value * h_img.value buffer = (ctypes.c_uint16 * size).from_address(buf_ptr) data = np.frombuffer(buffer, dtype=np.uint16).reshape(h_img.value, w.value) fits.PrimaryHDU(data).writeto(filepath, overwrite=True) # --- Windowing Methods --- def full_frame(self): h = self._check_connection() if h: # Internal use of put is possible but here we call the SDK directly self._lib.ArtemisSubframe(h, 0, 0, self._props.nPixelsX, self._props.nPixelsY) return self.get('max_range') def half_frame(self): h = self._check_connection() if h: m = self.get('max_range') w, h_size = m[0] // 2, m[1] // 2 x, y = w // 2, h_size // 2 self._lib.ArtemisSubframe(h, x, y, w, h_size) return [self._props.nPixelsX // 2, self._props.nPixelsY // 2] def small_frame(self): h = self._check_connection() if h: m = self.get('max_range') w, h_size = m[0] // 10, m[1] // 10 x, y = (m[0] * 9) // 20, (m[1] * 9) // 20 self._lib.ArtemisSubframe(h, x, y, w, h_size) return [self._props.nPixelsX // 10, self._props.nPixelsY // 10] # --- Properties --- @property def state(self): h = self._check_connection() return self._lib.ArtemisCameraState(h) if h else -1 @property def temperature(self): h = self._check_connection() if not h: return None temp = ctypes.c_int() self._lib.ArtemisTemperatureSensorInfo(h, 1, ctypes.byref(temp)) return temp.value / 100.0 @temperature.setter def temperature(self, t): h = self._check_connection() if h: self._lib.ArtemisSetCooling(h, int(t * 100)) @property def cooler(self): h = self._check_connection() if not h: return False flags = ctypes.c_int() self._lib.ArtemisCoolingInfo(h, ctypes.byref(flags), ctypes.byref(ctypes.c_int()), ctypes.byref(ctypes.c_int()), ctypes.byref(ctypes.c_int()), ctypes.byref(ctypes.c_int())) return bool(flags.value & 64) @cooler.setter def cooler(self, b): h = self._check_connection() if not h: return if b: self.put('temperature', -10.0) else: self._lib.ArtemisCoolerWarmUp(h) @property def binning(self): h = self._check_connection() bx, by = ctypes.c_int(), ctypes.c_int() self._lib.ArtemisGetBin(h, ctypes.byref(bx), ctypes.byref(by)) return [bx.value, by.value] @binning.setter def binning(self, b): h = self._check_connection() if h: self._lib.ArtemisBin(h, b[0], b[1]) @property def filter(self): return 0 @property def max_range(self): return [self._props.nPixelsX, self._props.nPixelsY] @property def all(self): """ Get a dictionary of all relevant camera settings and status. Matches the structure of the STX driver. """ h = self._check_connection() if not h: return {} # Get Cooling Information from SDK # ArtemisCoolingInfo returns: flags, level, minlvl, maxlvl, setpoint flags, level, minl, maxl, setp = [ctypes.c_int() for _ in range(5)] self._lib.ArtemisCoolingInfo(h, ctypes.byref(flags), ctypes.byref(level), ctypes.byref(minl), ctypes.byref(maxl), ctypes.byref(setp)) # Get Binning bx, by = ctypes.c_int(), ctypes.c_int() self._lib.ArtemisGetBin(h, ctypes.byref(bx), ctypes.byref(by)) # Calculate fan/cooler power percentage (level is usually 0-255) # We scale it to 0-100 to match Noctua standards. fan_power = round((level.value / 255.0) * 100) if maxl.value > 0 else 0 res = { "ambient": None, # Not available on Atik 11000 via standard cooling call "setpoint": setp.value / 100.0, "temperature": self.temperature, "cooler": bool(flags.value & 64), # Bit 6 is Cooling ON "fan": fan_power, "binning": [bx.value, by.value], "max_range": self.max_range, "state": self.state, "description": self._props.Description.decode() } return res