Commit 9b21dc19 authored by vertighel's avatar vertighel
Browse files

Debugged devices alpaca2, astelco2, stx2 (the new ones)

parent 95a95d65
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -46,7 +46,7 @@ class AlpacaDevice(BaseDevice):
        """
        str: The full API endpoint address for the device.
        """
        return f"{self.url}/api/v1/{self._dev}/{self.device_number}"
        return f"{self.url}/{self._dev}/{self.device_number}"

    @check.request_errors
    def get(self, method, params=None, timeout=None):
+105 −35
Original line number Diff line number Diff line
@@ -40,12 +40,13 @@ class OpenTSI(BaseDevice):
        password : str, optional
            The password for authentication, by default "admin".
        """
        
        super().__init__(url)
        self.host, self.port = url.split(":")
        self.port = int(self.port)
        self.username = username
        self.password = password
        self.timeout = 5
        self.timeout = 3
        self._cmd_id_counter = 0
        self._lock = threading.Lock()

@@ -88,8 +89,8 @@ class OpenTSI(BaseDevice):
            is a dict with 'status' and 'data' keys. Returns None if the
            connection or authentication fails.
        """
        
        tn = telnetlib.Telnet(self.host, self.port, timeout=self.timeout)
        tn.read_until(b"OpenTPL server", timeout=self.timeout)

        auth_cmd = f'auth plain "{self.username}" "{self.password}"\n'
        tn.write(auth_cmd.encode('utf-8'))
@@ -109,7 +110,7 @@ class OpenTSI(BaseDevice):
        completed_commands = 0
        while completed_commands < len(commands) and (time.time() - start_time) < self.timeout:
            try:
                line_bytes = tn.read_until(b'\n', timeout=1)
                line_bytes = tn.read_until(b'\n', timeout=self.timeout)
                if not line_bytes:
                    continue

@@ -254,13 +255,21 @@ class Telescope(OpenTSI):
        bool or None: Reports if any telescope axis is currently moving.

        Checks Bit 0 of TELESCOPE.MOTION_STATE.
        
           Bit 0 One or more axes are moving
           Bit 1 Trajectories are being executed
           Bit 2 Movement is blocked (e.g. by limits)
           Bit 3 Telescope is on target position (i.e. stopped or, if tracking, following the target)
           Bit 4 Movement is restricted by given jerk/acceleration/velocity limits
           Bit 5 Telescope is “unparked”, i.e. moved on startup position by READY=1 or PARK=0
           Bit 6 Telescope is “parked”, i.e. moved on park position by READY=0 or PARK=1
        """

        res = self.get("TELESCOPE.MOTION_STATE")
        if self.error:
            return None
        try:
            return bool(int(res) & 1)
            return int(res) >> 0 & 1 == 0
        except (TypeError, ValueError):
            return None

@@ -422,10 +431,15 @@ class Telescope(OpenTSI):
            Target azimuth in degrees.
        """

        self.put("POINTING.SETUP.DEROTATOR.SYNCMODE", 2)
        self.put("OBJECT.HORIZONTAL.ALT", alt)
        self.put("OBJECT.HORIZONTAL.AZ", az)
        self.put("POINTING.TRACK", 2)  # Go and stay there
        cmd_ids = self._get_cmd_id(), self._get_cmd_id(), self._get_cmd_id(), self._get_cmd_id()
        commands = [
            (cmd_ids[0], f"SET POINTING.SETUP.DEROTATOR.SYNCMODE=2"),
            (cmd_ids[1], f"SET OBJECT.HORIZONTAL.ALT={alt}"),
            (cmd_ids[2], f"SET OBJECT.HORIZONTAL.AZ={az}"),
            (cmd_ids[3], f"SET POINTING.TRACK=2"),  # Go and stay there
        ]
        res = self._send(commands)


    @property
    def radec(self):
@@ -444,6 +458,7 @@ class Telescope(OpenTSI):
            (cmd_id2, "GET POSITION.EQUATORIAL.DEC_J2000")
        ]
        res = self._send(commands)
        
        if self.error or not res:
            return [None, None]
        return [res[cmd_id1]["data"], res[cmd_id2]["data"]]
@@ -471,11 +486,14 @@ class Telescope(OpenTSI):
        dec : float
            Target Declination in degrees.
        """

        self.put("POINTING.SETUP.DEROTATOR.SYNCMODE", 2)
        self.put("OBJECT.EQUATORIAL.RA", ra)
        self.put("OBJECT.EQUATORIAL.DEC", dec)
        self.put("POINTING.TRACK", 1)  # Go and track
        cmd_ids = self._get_cmd_id(), self._get_cmd_id(), self._get_cmd_id(), self._get_cmd_id()
        commands = [
            (cmd_ids[0], f"SET POINTING.SETUP.DEROTATOR.SYNCMODE=2"),
            (cmd_ids[1], f"SET OBJECT.EQUATORIAL.RA={ra}"),
            (cmd_ids[2], f"SET OBJECT.EQUATORIAL.DEC={dec}"),
            (cmd_ids[3], f"SET POINTING.TRACK=1"),  # Go and track
        ]        
        res = self._send(commands)

    @property
    def offset(self):
@@ -496,6 +514,7 @@ class Telescope(OpenTSI):
            (cmd_id2, "GET POSITION.INSTRUMENTAL.AZ.OFFSET")
        ]
        res = self._send(commands)
        
        if self.error or not res:
            return [None, None]
        return [res[cmd_id1]["data"], res[cmd_id2]["data"]]
@@ -511,11 +530,16 @@ class Telescope(OpenTSI):
            A list or tuple containing [zd_offset, az_offset] in degrees.
        """

        if abs(a[0]) > 0.9 or abs(a[1]) > 0.9:
            log.error("Offset > 0.9 deg is too large. Aborting.")
        if abs(a[0]) > 0.9 or abs(a[1]) > 0.9: # 54 arcmin
            log.error("Offset > 0.9 deg is too large. Maybe arcsec instead of deg?")
            return
        self.put("POSITION.INSTRUMENTAL.ZD.OFFSET", a[0])
        self.put("POSITION.INSTRUMENTAL.AZ.OFFSET", a[1])
        
        cmd_id1, cmd_id2 = self._get_cmd_id(), self._get_cmd_id()        
        commands = [
            (cmd_id1, f"SET POSITION.INSTRUMENTAL.ZD.OFFSET={a[0]}"),
            (cmd_id2, f"SET POSITION.INSTRUMENTAL.AZ.OFFSET={a[1]}")
        ]
        res = self._send(commands)

    @property
    def coordinates(self):
@@ -556,27 +580,12 @@ class Focuser(OpenTSI):
    """
    Interface for controlling a telescope focuser.
    """
    def __init__(self, url, index=0, **kwargs):
        """
        Initializes the Focuser interface.

        Parameters
        ----------
        url : str
            The device URL in "host:port" format.
        index : int, optional
            The device index for the focuser (e.g., for hexapods),
            by default 0.
        """

        super().__init__(url, **kwargs)
        self.index = index

    @property
    def is_moving(self):
        """bool or None: Reports if the focuser is currently moving."""

        res = self.get(f"POSITION.INSTRUMENTAL.FOCUS[{self.index}].MOTION_STATE")
        res = self.get(f"POSITION.INSTRUMENTAL.FOCUS.MOTION_STATE")
        if self.error:
            return None
        try:
@@ -594,7 +603,7 @@ class Focuser(OpenTSI):
        The driver converts the OpenTSI value (in mm) to microns.
        """

        res = self.get(f"POSITION.INSTRUMENTAL.FOCUS[{self.index}].REALPOS")
        res = self.get(f"POSITION.INSTRUMENTAL.FOCUS.REALPOS")
        if self.error:
            return None
        try:
@@ -614,7 +623,29 @@ class Focuser(OpenTSI):
        """

        pos_mm = s / 1000
        self.put(f"POSITION.INSTRUMENTAL.FOCUS[{self.index}].TARGETPOS", pos_mm)
        self.put(f"POSITION.INSTRUMENTAL.FOCUS.TARGETPOS", pos_mm)

    @property
    def offset(self):
        """float or None: The current focus offset in degrees."""

        res = self.get(f"POSITION.INSTRUMENTAL.FOCUS.OFFSET")
        if self.error:
            return None
        return res

    @offset.setter
    def offset(self, s):
        """
        Moves the focuser to a new absolute position.

        Parameters
        ----------
        s : float
            The target position in degrees.
        """

        self.put(f"POSITION.INSTRUMENTAL.FOCUS.OFFSET", s)


class Rotator(OpenTSI):
@@ -670,12 +701,50 @@ class Rotator(OpenTSI):

        self.put(f"POSITION.INSTRUMENTAL.DEROTATOR[{self.index}].TARGETPOS", s)

    @property
    def offset(self):
        """float or None: The current rotator offset in degrees."""

        res = self.get(f"POSITION.INSTRUMENTAL.DEROTATOR[{self.index}].OFFSET")
        if self.error:
            return None
        return res

    @offset.setter
    def offset(self, s):
        """
        Moves the rotator to a new offset position.

        Parameters
        ----------
        s : float
            The offset position in degrees.
        """

        self.put(f"POSITION.INSTRUMENTAL.DEROTATOR[{self.index}].OFFSET", s)
        

class Sensor(OpenTSI):
    """
    Interface for reading environmental sensors.
    """
    
    def __init__(self, url, temp_id, hum_id, **kwargs):
        """
        Initializes the Sensor interface.

        Parameters
        ----------
        url : str
            The device URL in "host:port" format.
        outlet1 / outlet2 : int
            The device index for the outlets.
        """

        super().__init__(url, **kwargs)
        self.id = temp_id
        self.hum_id = hum_id

    @property
    def temperature(self):
        """
@@ -694,6 +763,7 @@ class Sensor(OpenTSI):
            (cmd_id3, "GET AUXILIARY.SENSOR[4].VALUE"),
        ]
        res = self._send(commands)

        if self.error or not res:
            return [None, None, None]
        return [res[cmd_id1]["data"], res[cmd_id2]["data"], res[cmd_id3]["data"]]
+34 −22
Original line number Diff line number Diff line
@@ -7,7 +7,7 @@ Interface with a SBIG STX camera device

# System modules
import time
from datetime import datetime
from datetime import datetime as dt
from urllib.parse import urlencode

# Third-party modules
@@ -44,7 +44,8 @@ class STX(BaseDevice):

    def _wait_if_needed(self):
        """
        Ensures the 50ms command interval is respected between commands.
        Ensures the 50ms command interval is respected
        between commands.
        """
        elapsed = time.time() - self._last_command_time
        if elapsed < self._command_interval:
@@ -130,6 +131,7 @@ class STX(BaseDevice):
        str or list of str or None
            The response text from the camera, or None on error.
        """
        
        self._wait_if_needed()
        if params is None:
            params = {}
@@ -156,7 +158,7 @@ class Camera(STX):
        """Aborts the current exposure."""
        self.put("ImagerAbortExposure")

    def start(self, duration, frametype, dt=None):
    def start(self, duration, frametype, datetime=None):
        """
        Starts an exposure if the camera is idle.

@@ -166,10 +168,11 @@ class Camera(STX):
            Exposure time in seconds.
        frametype : int
            The type of frame (0=Dark, 1=Light, 2=Bias, 3=Flat).
        dt : str, optional
        datetime : str, optional
            ISO-formatted datetime string for the FITS header.
            If None, the current UTC time is used.
        """
        
        if self.state != 0:
            log.error(
                f"Cannot start exposure, camera is not idle. State: {
@@ -177,14 +180,15 @@ class Camera(STX):
            self.error.append("Camera not idle")
            return

        if dt is None:
            dt = datetime.utcnow().isoformat(timespec='milliseconds')
        if datetime is None:
            datetime = dt.utcnow().isoformat(timespec='milliseconds')

        params = {"Duration": duration,
                  "FrameType": frametype,
                  "DateTime": dt}
                  "DateTime": datetime}
        self.put("ImagerStartExposure", params=params)

        
    def download(self, filepath=temp_fits):
        """
        Downloads the last completed image from the camera buffer.
@@ -194,10 +198,18 @@ class Camera(STX):
        filepath : str, optional
            The local path to save the FITS file to.
        """
        
        self._wait_if_needed()
        
        log.debug(f"Getting original FITS")
        res = requests.get(f"{self.addr}/Imager.FIT")
        log.debug(f"Got original FITS")
        
        with open(filepath, 'wb') as f:
            log.debug(f"Writing on disk")
            f.write(res.content)
            log.debug(f"Written")

            
    def set_window(self, start_x, start_y, width, height):
        """
@@ -215,6 +227,7 @@ class Camera(STX):
        height : int
            The height of the sub-frame in pixels.
        """
        
        if self.state != 0:
            log.error(
                f"Cannot set window, camera is not idle. State: {
@@ -264,7 +277,6 @@ class Camera(STX):
        if self.error:
            return [None, None]

        cam_x, cam_y = res
        start_x = int(cam_x) * 9 // 20
        start_y = int(cam_y) * 9 // 20
        width = int(cam_x) // 10
@@ -307,6 +319,19 @@ class Camera(STX):

        return res

    @filter.setter
    def filter(self, n):

        if self.is_moving != 0:
            log.error(
                f"Cannot change filter, filter wheel is moving. State: {
                    self.is_moving}")
            self.error.append("Filter wheel busy")
            return

        params = {"NewPosition": n}
        self.put("ChangeFilter", params=params)

    @property
    def cooler(self):
        """bool: The current state of the CCD cooler (True=On, False=Off)."""
@@ -331,19 +356,6 @@ class Camera(STX):
        params = {"CoolerState": "1" if b else "0"}
        self.put("ImagerSetSettings", params=params)

    @filter.setter
    def filter(self, n):

        if self.is_moving != 0:
            log.error(
                f"Cannot change filter, filter wheel is moving. State: {
                    self.is_moving}")
            self.error.append("Filter wheel busy")
            return

        params = {"NewPosition": n}
        self.put("ChangeFilter", params=params)

    @property
    def temperature(self):
        """float: The current CCD temperature in degrees Celsius."""
@@ -470,7 +482,7 @@ class Camera(STX):
        """list of int: The maximum sensor dimensions [X, Y] for current binning."""

        params = ["CameraXSize", "CameraYSize"]
        b_x, b_y = self.get("ImagerGetSettings", params=params)
        res = self.get("ImagerGetSettings", params=params)
        if self.error:
            return [None, None]