Commit 3e411508 authored by vertighel's avatar vertighel
Browse files

Added atik device, updated api/camera.py, updated pyproject and installation script

parent b286e256
Loading
Loading
Loading
Loading
Loading
+8 −5
Original line number Diff line number Diff line

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

@@ -70,7 +71,8 @@ class Filter(BaseResource):
    async def get(self):
        """Retrieve the current filter."""
        
        raw = await self.run_blocking(lambda: self.dev.filter)
        raw = await self.run_blocking(lambda: getattr(self.dev, 'filter', 0))        
        #raw = await self.run_blocking(lambda: self.dev.filter)
        res = constants.filter_name.get(raw, "Undef.")
        return self.make_response(res, raw_data=raw)

@@ -80,7 +82,8 @@ class FilterMovement(BaseResource):
    async def get(self):
        """Check if the filter wheel is moving."""

        raw = await self.run_blocking(lambda: self.dev.is_moving)
        raw = await self.run_blocking(lambda: getattr(self.dev, 'is_moving', 0))
        # raw = await self.run_blocking(lambda: self.dev.is_moving)
        res = constants.filter_state.get(raw, "Off")
        return self.make_response(res, raw_data=raw)

@@ -258,8 +261,8 @@ class CoolerWarmup(BaseResource):

    async def post(self):
        """Start the warm up the CCD."""
        
        return self.make_response("Warmup sequence initiated")
        res = await self.run_blocking(lambda: self.dev.put('cooler', False))
        return self.make_response(res)

    async def delete(self):
        """Stop the warm up of the CCD."""
+5 −0
Original line number Diff line number Diff line
@@ -24,6 +24,11 @@ module = stx
class = Camera
node = STX

[cam2]
module = atik
class = Camera
node = DUMMY

[pdu_cam]
module = netio
class = Switch

noctua/devices/atik.py

0 → 100644
+260 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
Interface with a Atik camera device using the SDK
SDK Downloaded from https://downloads.atik-cameras.com/AtikCamerasSDK_2025_11_11_Master_2111.zip

sudo cp ../lib/Linux/atik.rules /usr/lib/udev/rules.d/
sudo udevadm control --reload-rules && sudo udevadm trigger
sudo cp ../lib/Linux/64/NoFlyCapture/libatikcameras.so /usr/lib/
sudo cp ../include/Atik* /usr/include/

"""

# System modules
import ctypes
import time
import numpy as np
from datetime import datetime

# Third-party modules
from astropy.io import fits

# Custom modules
from .basedevice import BaseDevice
from ..utils.logger import log
from ..config.constants import temp_fits


class ArtemisProperties(ctypes.Structure):
    _fields_ = [
        ("Protocol", ctypes.c_int),
        ("nPixelsX", ctypes.c_int),
        ("nPixelsY", ctypes.c_int),
        ("PixelMicronsX", ctypes.c_float),
        ("PixelMicronsY", ctypes.c_float),
        ("ccdflags", ctypes.c_int),
        ("cameraflags", ctypes.c_int),
        ("Description", ctypes.c_char * 40),
        ("Manufacturer", ctypes.c_char * 40)
    ]


class Camera(BaseDevice):
    def __init__(self, url):
        super().__init__(url)
        self._lib = None
        self._handle = None
        self._props = ArtemisProperties()

        try:
            self._lib = ctypes.CDLL("libatikcameras.so")
            self._lib.ArtemisConnect.restype = ctypes.c_void_p
            self._lib.ArtemisImageBuffer.restype = ctypes.c_void_p
            self._lib.ArtemisExposureTimeRemaining.restype = ctypes.c_float
        except Exception as e:
            log.error(f"Atik SDK load failed: {e}")
            
    def _check_connection(self):
        """Internal method to manage the persistent camera handle."""
        
        if self._handle is None and self._lib:
            count = self._lib.ArtemisDeviceCount()
            log.debug(f"SDK reported {count} devices.")
            
            if count > 0:
                self._handle = self._lib.ArtemisConnect(0)
                if self._handle:
                    self._lib.ArtemisProperties(self._handle, ctypes.byref(self._props))
                else:
                    msg = "Atik connected but failed to acquire handle"
                    if msg not in self.error: self.error.append(msg)
            else:
                # FIX: Aggiungiamo l'errore qui se la camera non c'è
                msg = "No Atik devices detected on USB"
                if msg not in self.error: 
                    self.error.append(msg)
                    log.error(msg)
                    
        return self._handle
    
    def __del__(self):
        """Safely disconnect from the camera and unload library."""
        if self._handle:
            try:
                self._lib.ArtemisDisconnect(self._handle)
                log.info("Atik camera disconnected.")
            except:
                pass

    # --- Generic Get/Put for compatibility ---

    def get(self, key):
        """Generic getter wrapper."""
        return getattr(self, key)

    def put(self, key, value):
        """Generic setter wrapper."""
        setattr(self, key, value)

    @property
    def connection(self):
        if not self._lib: return False
        h = self._check_connection()
        return bool(self._lib.ArtemisIsConnected(h)) if h else False

    # --- Exposure Methods ---

    def start(self, duration, frametype, dt=None):
        h = self._check_connection()
        if not h: return
        # Using put internally for consistency
        is_dark = True if frametype in [0, 2, "Dark", "Bias"] else False
        self._lib.ArtemisSetDarkMode(h, is_dark)
        self._lib.ArtemisStartExposure(h, ctypes.c_float(duration))

    def abort(self):
        h = self._check_connection()
        if h: self._lib.ArtemisAbortExposure(h)

    def download(self, filepath=temp_fits):
        h = self._check_connection()
        if not h: return
        while not self._lib.ArtemisImageReady(h):
            time.sleep(0.1)
        
        x, y, w, h_img, bx, by = [ctypes.c_int() for _ in range(6)]
        self._lib.ArtemisGetImageData(h, ctypes.byref(x), ctypes.byref(y),
                                      ctypes.byref(w), ctypes.byref(h_img),
                                      ctypes.byref(bx), ctypes.byref(by))

        buf_ptr = self._lib.ArtemisImageBuffer(h)
        if buf_ptr:
            size = w.value * h_img.value
            buffer = (ctypes.c_uint16 * size).from_address(buf_ptr)
            data = np.frombuffer(buffer, dtype=np.uint16).reshape(h_img.value, w.value)
            fits.PrimaryHDU(data).writeto(filepath, overwrite=True)

    # --- Windowing Methods ---

    def full_frame(self):
        h = self._check_connection()
        if h:
            # Internal use of put is possible but here we call the SDK directly
            self._lib.ArtemisSubframe(h, 0, 0, self._props.nPixelsX, self._props.nPixelsY)
        return self.get('max_range')

    def half_frame(self):
        h = self._check_connection()
        if h:
            m = self.get('max_range')
            w, h_size = m[0] // 2, m[1] // 2
            x, y = w // 2, h_size // 2
            self._lib.ArtemisSubframe(h, x, y, w, h_size)
        return [self._props.nPixelsX // 2, self._props.nPixelsY // 2]

    def small_frame(self):
        h = self._check_connection()
        if h:
            m = self.get('max_range')
            w, h_size = m[0] // 10, m[1] // 10
            x, y = (m[0] * 9) // 20, (m[1] * 9) // 20
            self._lib.ArtemisSubframe(h, x, y, w, h_size)
        return [self._props.nPixelsX // 10, self._props.nPixelsY // 10]

    # --- Properties ---

    @property
    def state(self):
        h = self._check_connection()
        return self._lib.ArtemisCameraState(h) if h else -1

    @property
    def temperature(self):
        h = self._check_connection()
        if not h: return None
        temp = ctypes.c_int()
        self._lib.ArtemisTemperatureSensorInfo(h, 1, ctypes.byref(temp))
        return temp.value / 100.0

    @temperature.setter
    def temperature(self, t):
        h = self._check_connection()
        if h: self._lib.ArtemisSetCooling(h, int(t * 100))

    @property
    def cooler(self):
        h = self._check_connection()
        if not h: return False
        flags = ctypes.c_int()
        self._lib.ArtemisCoolingInfo(h, ctypes.byref(flags), ctypes.byref(ctypes.c_int()), 
                                     ctypes.byref(ctypes.c_int()), ctypes.byref(ctypes.c_int()), 
                                     ctypes.byref(ctypes.c_int()))
        return bool(flags.value & 64)

    @cooler.setter
    def cooler(self, b):
        h = self._check_connection()
        if not h: return
        if b: self.put('temperature', -10.0)
        else: self._lib.ArtemisCoolerWarmUp(h)

    @property
    def binning(self):
        h = self._check_connection()
        bx, by = ctypes.c_int(), ctypes.c_int()
        self._lib.ArtemisGetBin(h, ctypes.byref(bx), ctypes.byref(by))
        return [bx.value, by.value]

    @binning.setter
    def binning(self, b):
        h = self._check_connection()
        if h: self._lib.ArtemisBin(h, b[0], b[1])

    @property
    def filter(self):
        return 0

    @property
    def max_range(self):
        return [self._props.nPixelsX, self._props.nPixelsY]
    
    @property
    def all(self):
        """
        Get a dictionary of all relevant camera settings and status.
        Matches the structure of the STX driver.
        """
        h = self._check_connection()
        if not h:
            return {}

        # Get Cooling Information from SDK
        # ArtemisCoolingInfo returns: flags, level, minlvl, maxlvl, setpoint
        flags, level, minl, maxl, setp = [ctypes.c_int() for _ in range(5)]
        self._lib.ArtemisCoolingInfo(h, ctypes.byref(flags), ctypes.byref(level), 
                                     ctypes.byref(minl), ctypes.byref(maxl), 
                                     ctypes.byref(setp))

        # Get Binning
        bx, by = ctypes.c_int(), ctypes.c_int()
        self._lib.ArtemisGetBin(h, ctypes.byref(bx), ctypes.byref(by))

        # Calculate fan/cooler power percentage (level is usually 0-255)
        # We scale it to 0-100 to match Noctua standards.
        fan_power = round((level.value / 255.0) * 100) if maxl.value > 0 else 0

        res = {
            "ambient": None,  # Not available on Atik 11000 via standard cooling call
            "setpoint": setp.value / 100.0,
            "temperature": self.temperature,
            "cooler": bool(flags.value & 64), # Bit 6 is Cooling ON
            "fan": fan_power,
            "binning": [bx.value, by.value],
            "max_range": self.max_range,
            "state": self.state,
            "description": self._props.Description.decode()
        }

        return res
+40 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3
import argparse
import sys
from noctua.utils import install_vimba_x, install_atik_sdk
from noctua.utils.logger import log

def main():
    parser = argparse.ArgumentParser(description="Noctua System Installer for external SDKs")
    
    # Subcommands or direct flags
    parser.add_argument("component", nargs="?", choices=["vimba", "atik", "all"], default="all",
                        help="Select which component to install (default: all)")
    
    parser.add_argument("--force", action="store_true", help="Force reinstallation of the selected component")
    parser.add_argument("--force-all", action="store_true", help="Force reinstallation of everything")

    args = parser.parse_args()

    # Determine force state
    force_vmb = args.force_all or (args.force and args.component in ["vimba", "all"])
    force_atik = args.force_all or (args.force and args.component in ["atik", "all"])

    results = []

    if args.component in ["all", "vimba"]:
        res = install_vimba_x.install(force=force_vmb)
        results.append(("Vimba X", res))

    if args.component in ["all", "atik"]:
        res = install_atik_sdk.install(force=force_atik)
        results.append(("Atik SDK", res))

    # Summary
    print("\n--- Installation Summary ---")
    for name, success in results:
        status = "OK" if success else "FAILED"
        print(f"{name}: {status}")

if __name__ == "__main__":
    main()
+68 −0
Original line number Diff line number Diff line
import os
import subprocess
import zipfile
import shutil
from pathlib import Path
from noctua.utils.logger import log

SDK_URL = "https://downloads.atik-cameras.com/AtikCamerasSDK_2025_11_11_Master"
ZIP_NAME = "AtikSDK.zip"

def is_installed():
    """Check if Atik SDK is already installed in the system."""
    lib_exists = Path("/usr/lib/libatikcameras.so").exists()
    headers_exists = Path("/usr/include/AtikCameras.h").exists()
    rules_exists = Path("/usr/lib/udev/rules.d/atik.rules").exists()
    return lib_exists and headers_exists and rules_exists

def install(force=False):
    """Download and install Atik SDK."""
    if is_installed() and not force:
        log.info("Atik SDK is already installed. Use --force to reinstall.")
        return True

    log.info("Starting Atik SDK installation...")

    # 1. Download if zip is not present locally
    if not Path(ZIP_NAME).exists():
        log.info(f"Downloading Atik SDK from {SDK_URL}...")
        try:
            import requests
            r = requests.get(SDK_URL, stream=True)
            with open(ZIP_NAME, 'wb') as f:
                for chunk in r.iter_content(chunk_size=8192): f.write(chunk)
        except Exception as e:
            log.error(f"Download failed: {e}")
            return False
    else:
        log.info(f"Using local file {ZIP_NAME}")

    # 2. Extract
    tmp_dir = "atik_tmp"
    with zipfile.ZipFile(ZIP_NAME, 'r') as zip_ref:
        zip_ref.extractall(tmp_dir)
    
    # 3. System commands (needs sudo)
    try:
        sdk_path = next(Path(tmp_dir).iterdir()) # Entry folder inside zip
        
        lib_src = sdk_path / "lib/Linux/64/NoFlyCapture/libatikcameras.so"
        rules_src = sdk_path / "lib/Linux/atik.rules"
        
        subprocess.run(["sudo", "cp", str(lib_src), "/usr/lib/"], check=True)
        subprocess.run(["sudo", "cp", str(rules_src), "/usr/lib/udev/rules.d/"], check=True)
        # Copy headers
        for h in (sdk_path / "include").glob("Atik*"):
            subprocess.run(["sudo", "cp", str(h), "/usr/include/"], check=True)
            
        subprocess.run(["sudo", "udevadm", "control", "--reload-rules"], check=True)
        subprocess.run(["sudo", "udevadm", "trigger"], check=True)
        subprocess.run(["sudo", "ldconfig"], check=True)
        
        log.info("Atik SDK installed successfully.")
    except Exception as e:
        log.error(f"Installation failed: {e}")
        return False
    finally:
        if Path(tmp_dir).exists(): shutil.rmtree(tmp_dir)
    return True
Loading