Commit 3b82b60c authored by vertighel's avatar vertighel
Browse files

Refubrished device stx.py

parent df0b97cc
Loading
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -61,7 +61,7 @@ node = ASCOM_REMOTE
outlet = 3

[cam]
module = stx
module = stx3
class = Camera
node = STX

+15 −1
Original line number Diff line number Diff line
@@ -6,6 +6,7 @@ Interface with a SBIG STX camera device
"""

# System modules
import time
from datetime import datetime
from urllib.parse import urlencode

@@ -28,6 +29,17 @@ class STX(BaseDevice):
        self.url = url
        self.addr = self.url
        self.timeout = 3
        self._last_command_time = 0
        self._command_interval = 0.05 # 50 milliseconds

    def _wait_if_needed(self):
        """
        Ensures the 50ms command interval is respected between commands.
        """
        elapsed = time.time() - self._last_command_time
        if elapsed < self._command_interval:
            time.sleep(self._command_interval - elapsed)
        self._last_command_time = time.time()

    @property
    def connection(self):
@@ -48,6 +60,7 @@ class STX(BaseDevice):
    def get(self, method, params=[]):
        '''Send a HTTP GET request to the device address.'''

        self._wait_if_needed() # Wait before sending command
        res = requests.get(f"{self.addr}/{method}.cgi",
                           params="&".join(params),
                           timeout=self.timeout, verify=False)
@@ -72,6 +85,7 @@ class STX(BaseDevice):
    def put(self, method, params={}):
        '''Send a HTTP GET request to the device address.'''
        
        self._wait_if_needed() # Wait before sending command
        res = requests.get(f"{self.addr}/{method}.cgi",
                           params=urlencode(params),
                           timeout=self.timeout)

noctua/devices/stx2.py

0 → 100644
+529 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
Interface with a SBIG STX camera device
"""

# System modules
import time
from datetime import datetime
from urllib.parse import urlencode

# Third-party modules
import requests

# Other templates
from ..config.constants import temp_fits
from ..utils import check
from ..utils.logger import log
from .basedevice import BaseDevice


class STX(BaseDevice):
    """
    Base wrapper class for SBIG STX cameras.
    Handles low-level communication and timing constraints.
    """

    def __init__(self, url):
        """
        Initializes the STX device interface.

        Parameters
        ----------
        url : str
            The IP address or hostname of the camera.
        """
        super().__init__(url)
        self.url = url
        self.addr = self.url
        self.timeout = 3
        self._last_command_time = 0
        self._command_interval = 0.05 # 50 milliseconds

    def _wait_if_needed(self):
        """
        Ensures the 50ms command interval is respected between commands.
        """
        elapsed = time.time() - self._last_command_time
        if elapsed < self._command_interval:
            time.sleep(self._command_interval - elapsed)
        self._last_command_time = time.time()

    @property
    def connection(self):
        """
        Checks the connection to the camera.

        Returns
        -------
        bool
            True if the camera responds to a version query, False otherwise.
        """
        try:
            self._wait_if_needed()
            res = requests.get(f"{self.addr}/VersionNumbers.cgi",
                               timeout=self.timeout)
            res.raise_for_status()
            return True
        except Exception:
            self.error = ["Camera not connected"]
            return False

    @check.request_errors
    def get(self, method, params=None):
        """
        Sends a HTTP GET request to the device.

        Parameters
        ----------
        method : str
            The API method name (e.g., "ImagerGetSettings").
        params : list of str, optional
            A list of parameter names to query.

        Returns
        -------
        str or list of str or float or list of float or None or list of None
            A single value or a list of values from the camera.
            Type conversion to float is attempted. Returns None on error.
        """
        self._wait_if_needed()
        if params is None:
            params = []

        res = requests.get(f"{self.addr}/{method}.cgi",
                           params="&".join(params),
                           timeout=self.timeout, verify=False)
        res.raise_for_status()

        values = res.text.split("\r\n")
        if not values[-1]:
            values = values[:-1]

        processed_values = []
        for v in values:
            try:
                processed_values.append(float(v))
            except (ValueError, TypeError):
                processed_values.append(v)

        if len(processed_values) == 1:
            return processed_values[0]
        return processed_values

    @check.request_errors
    def put(self, method, params=None):
        """
        Sends a command (as a HTTP GET request) to the device.

        Parameters
        ----------
        method : str
            The API method name (e.g., "ImagerSetSettings").
        params : dict, optional
            A dictionary of parameter names and their values.

        Returns
        -------
        str or list of str or None
            The response text from the camera, or None on error.
        """
        self._wait_if_needed()
        if params is None:
            params = {}

        res = requests.get(f"{self.addr}/{method}.cgi",
                           params=urlencode(params),
                           timeout=self.timeout)
        res.raise_for_status()

        text = res.text.split("\r\n")
        if not text[-1]:
            text = text[:-1]
        if len(text) == 1:
            return text[0]
        return text


class Camera(STX):
    """
    High-level interface for the SBIG STX camera imaging CCD.
    """

    def abort(self):
        """Aborts the current exposure."""
        self.put("ImagerAbortExposure")

    def start(self, duration, frametype, dt=None):
        """
        Starts an exposure if the camera is idle.

        Parameters
        ----------
        duration : float
            Exposure time in seconds.
        frametype : int
            The type of frame (0=Dark, 1=Light, 2=Bias, 3=Flat).
        dt : str, optional
            ISO-formatted datetime string for the FITS header.
            If None, the current UTC time is used.
        """
        if self.state != 0:
            log.error(f"Cannot start exposure, camera is not idle. State: {self.state}")
            self.error.append("Camera not idle")
            return

        if dt is None:
            dt = datetime.utcnow().isoformat(timespec='milliseconds')

        params = {"Duration": duration,
                  "FrameType": frametype,
                  "DateTime": dt}
        self.put("ImagerStartExposure", params=params)

    def download(self, filepath=temp_fits):
        """
        Downloads the last completed image from the camera buffer.

        Parameters
        ----------
        filepath : str, optional
            The local path to save the FITS file to.
        """
        self._wait_if_needed()
        res = requests.get(f"{self.addr}/Imager.FIT")
        with open(filepath, 'wb') as f:
            f.write(res.content)

    def set_window(self, start_x, start_y, width, height):
        """
        Sets the imaging sub-frame in a single, safe command.
        Coordinates are for a 1x1 binned frame.

        Parameters
        ----------
        start_x : int
            The starting X pixel.
        start_y : int
            The starting Y pixel.
        width : int
            The width of the sub-frame in pixels.
        height : int
            The height of the sub-frame in pixels.
        """
        if self.state != 0:
            log.error(f"Cannot set window, camera is not idle. State: {self.state}")
            self.error.append("Camera not idle")
            return

        params = {
            "StartX": start_x,
            "StartY": start_y,
            "NumX": width,
            "NumY": height
        }
        
        self.put("ImagerSetSettings", params=params)

    def full_frame(self):
        """Sets the camera to use the full sensor area."""
        
        params = ["CameraXSize", "CameraYSize"]
        cam_x, cam_y = self.get("ImagerGetSettings", params=params)
        if self.error: return [None, None]
        
        self.set_window(0, 0, int(cam_x), int(cam_y))

    def half_frame(self):
        """Sets the camera to use a centered 50% sub-frame."""
        
        params = ["CameraXSize", "CameraYSize"]
        cam_x, cam_y = self.get("ImagerGetSettings", params=params)
        if self.error: return [None, None]

        start_x = int(cam_x) // 4
        start_y = int(cam_y) // 4
        width = int(cam_x) // 2
        height = int(cam_y) // 2
        
        self.set_window(start_x, start_y, width, height)

    def small_frame(self):
        """Sets the camera to use a centered 10% sub-frame."""
        
        params = ["CameraXSize", "CameraYSize"]
        cam_x, cam_y = self.get("ImagerGetSettings", params=params)
        if self.error: return [None, None]
        
        cam_x, cam_y = res
        start_x = int(cam_x) * 9 // 20
        start_y = int(cam_y) * 9 // 20
        width = int(cam_x) // 10
        height = int(cam_y) // 10
        
        self.set_window(start_x, start_y, width, height)

    @property
    def binning(self):
        """list of int: The current [X, Y] binning factor."""
        
        params = ["BinX", "BinY"]
        binx, biny = self.get("ImagerGetSettings", params=params)
        if self.error: return [None, None]
        
        return [int(binx), int(biny)]

    @binning.setter
    def binning(self, b):
        
        if self.state != 0:
            log.error(f"Cannot change binning, camera is not idle. State: {self.state}")
            self.error.append("Camera not idle")
            return
        
        params = {"BinX": b[0], "BinY": b[1]}
        self.put("ImagerSetSettings", params=params)

    @property
    def filter(self):
        """int: The currently selected filter position (1-8)."""
        
        params = ["CurrentFilter"]
        res = self.get("GetFilterSetting", params=params)
        if self.error: return None
        
        return res

    @property
    def cooler(self):
        """bool: The current state of the CCD cooler (True=On, False=Off)."""
        
        params = ["CoolerState"]
        res = self.get("ImagerGetSettings", params=params)
        if self.error: return None
        
        return bool(res)

    @cooler.setter
    def cooler(self, b):
        
        if self.state != 0:
            log.error(f"Cannot change cooler state, camera is not idle. State: {self.state}")
            self.error.append("Camera not idle")
            return
        
        params = {"CoolerState": "1" if b else "0"}
        self.put("ImagerSetSettings", params=params)

    @filter.setter
    def filter(self, n):
        
        if self.is_moving != 0:
            log.error(f"Cannot change filter, filter wheel is moving. State: {self.is_moving}")
            self.error.append("Filter wheel busy")
            return
        
        params = {"NewPosition": n}
        self.put("ChangeFilter", params=params)

    @property
    def temperature(self):
        """float: The current CCD temperature in degrees Celsius."""
        
        params = ["CCDTemperature"]
        res = self.get("ImagerGetSettings", params=params)
        if self.error: return None
        
        return round(res, 1)

    @temperature.setter
    def temperature(self, t):
        
        if self.state != 0:
            log.error(f"Cannot change temperature, camera is not idle. State: {self.state}")
            self.error.append("Camera not idle")
            return
        
        params = {"CCDTemperatureSetpoint": t}
        self.put("ImagerSetSettings", params=params)

    @property
    def all(self):
        """dict: A comprehensive dictionary of the current camera state."""
        
        params = ["AmbientTemperature", "CCDTemperatureSetpoint", "CCDTemperature",
                  "CoolerState", "CoolerPower", "BinX", "BinY", "CameraXSize",
                  "CameraYSize", "StartX", "StartY", "NumX", "NumY"]
        res = self.get("ImagerGetSettings", params=params)
        
        if self.error or not res or len(res) != len(params):
            return {}

        ambient, setpoint, temp, cool, fan, binx, biny, camx, camy, startx, starty, numx, numy = res
        b_x, b_y = int(binx), int(biny)
        x_start, y_start = int(startx) // b_x, int(starty) // b_y
        x_end = (int(startx) + int(numx)) // b_x
        y_end = (int(starty) + int(numy)) // b_y

        return {"ambient": round(ambient, 1),
                "setpoint": round(setpoint, 1),
                "temperature": round(temp, 1),
                "cooler": bool(cool),
                "fan": round(fan, 0),
                "binning": [b_x, b_y],
                "max_range": [int(camx) // b_x, int(camy) // b_y],
                "xystart": [x_start, y_start],
                "xyend": [x_end, y_end],
                "xrange": [x_start, x_end],
                "yrange": [y_start, y_end],
                "center": [int(camx) // b_x // 2, int(camy) // b_y // 2]}

    @property
    def ambient(self):
        """float: The ambient temperature in degrees Celsius."""
        
        params = ["AmbientTemperature"]
        res = self.get("ImagerGetSettings", params=params)        
        if self.error: return None
        
        return round(res, 1)

    @property
    def center(self):
        """list of int: The sensor's [X, Y] center coordinates for current binning."""
        
        params = ["CameraXSize", "CameraYSize", "BinX", "BinY"]
        camx, camy, binx, biny = self.get("ImagerGetSettings", params=params)
        if self.error: return [None, None]
        
        return [int(camx) // int(binx) // 2, int(camy) // int(biny) // 2]

    @property
    def description(self):
        """str: The camera model description string."""
        
        res = self.get("Description")
        if self.error: return None
        
        return res

    @property
    def fan(self):
        """int: The current cooler power level in percent."""
        
        params = ["CoolerPower"]
        res = self.get("ImagerGetSettings", params=params)
        if self.error: return None
        
        return round(res)

    @property
    def is_moving(self):
        """int: The current state of the filter wheel.
        0=Idle, 1=Moving, 2=Error.
        """
        
        res = self.get("FilterState")
        if self.error: return None
        
        return res

    @property
    def max_range(self):
        """list of int: The maximum sensor dimensions [X, Y] for current binning."""
        
        params = ["CameraXSize", "CameraYSize"]
        b_x, b_y = self.get("ImagerGetSettings", params=params)
        if self.error: return [None, None]

        b_x, b_y = self.binning
        return [int(res[0]) // b_x, int(res[1]) // b_y]

    @property
    def setpoint(self):
        """float: The current CCD temperature setpoint in degrees Celsius."""
        
        params = ["CCDTemperatureSetpoint"]
        res = self.get("ImagerGetSettings", params=params)
        if self.error: return None
        
        return res

    @property
    def state(self):
        """int: The current state of the imaging CCD.
        0=Idle, 2=Exposing, 3=Reading, 5=Error.
        """
        
        res = self.get("ImagerState")
        if self.error: return None
        
        return res

    @property
    def ready(self):
        """int: The status of the image buffer.
        1 if an image is ready for download, 0 otherwise.
        """
        
        res = self.get("ImagerImageReady")        
        if self.error: return None
        
        return res

    @property
    def version(self):
        """list of str: Camera firmware and API version numbers."""
        
        res = self.get("VersionNumbers")
        if self.error: return None
        
        return res

    @property
    def xrange(self):
        """list of int: The current [start, end] X-axis window for current binning."""
        
        params = ["StartX", "NumX", "BinX"]
        startx, numx, binx = self.get("ImagerGetSettings", params=params)
        if self.error: return [None, None]
        
        x_start = int(startx) // int(binx)
        x_end = (int(startx) + int(numx)) // int(binx)
        return [x_start, x_end]

    @property
    def yrange(self):
        """list of int: The current [start, end] Y-axis window for current binning."""
        
        params = ["StartY", "NumY", "BinY"]
        starty, numy, biny = self.get("ImagerGetSettings", params=params)
        if self.error: return [None, None]

        y_start = int(starty) // int(biny)
        y_end = (int(starty) + int(numy)) // int(biny)
        return [y_start, y_end]

    @property
    def xystart(self):
        """list of int: The current [X, Y] start coordinates for current binning."""
        
        params = ["StartX", "StartY", "BinX", "BinY"]
        startx, starty, binx, biny = self.get("ImagerGetSettings", params=params)
        if self.error: return [None, None]
        
        return [int(startx) // int(binx), int(starty) // int(biny)]

    @property
    def xyend(self):
        """list of int: The current [X, Y] end coordinates for current binning."""
        
        params = ["StartX", "StartY", "NumX", "NumY", "BinX", "BinY"]
        startx, starty, numx, numy, binx, biny = self.get("ImagerGetSettings", params=params)
        if self.error: return [None, None]
        
        x_end = (int(startx) + int(numx)) // int(binx)
        y_end = (int(starty) + int(numy)) // int(biny)
        return [x_end, y_end]