Commit e59594b4 authored by vertighel's avatar vertighel
Browse files

Align atik/stx camera interfaces and add tests



- start(): rename dt→datetime kwarg in atik (matches stx)
- full/half/small_frame(): return [w, h] in both drivers
- all: merged keys — both dicts now contain ambient, setpoint,
  temperature, cooler, fan, binning, max_range, xystart, xyend,
  xrange, yrange, center, state, description
- atik: track _subframe state to compute xystart/xyend/xrange/yrange
- atik: filter.setter added as no-op (no filter wheel)
- tests/test_camera_stx.py, tests/test_camera_atik.py: 30 unit tests

Co-Authored-By: default avatarClaude Sonnet 4.6 <noreply@anthropic.com>
parent 8593660c
Loading
Loading
Loading
Loading
Loading
+38 −22
Original line number Diff line number Diff line
@@ -46,6 +46,7 @@ class Camera(BaseDevice):
        self._lib = None
        self._handle = None
        self._props = ArtemisProperties()
        self._subframe = [0, 0, 0, 0]

        try:
            self._lib = ctypes.CDLL("/usr/lib/libatikcameras.so")
@@ -66,6 +67,7 @@ class Camera(BaseDevice):
                self._handle = self._lib.ArtemisConnect(0)
                if self._handle:
                    self._lib.ArtemisProperties(self._handle, ctypes.byref(self._props))
                    self._subframe = [0, 0, self._props.nPixelsX, self._props.nPixelsY]
                else:
                    msg = "Atik connected but failed to acquire handle"
                    if msg not in self.error: self.error.append(msg)
@@ -110,7 +112,7 @@ class Camera(BaseDevice):
        if h: self._lib.ArtemisAbortExposure(h)

        
    def start(self, duration, frametype, dt=None):
    def start(self, duration, frametype, datetime=None):
        h = self._check_connection()
        if not h: return
        # Using put internally for consistency
@@ -146,28 +148,27 @@ class Camera(BaseDevice):

    def full_frame(self):
        h = self._check_connection()
        nx, ny = self._props.nPixelsX, self._props.nPixelsY
        if h:
            # Internal use of put is possible but here we call the SDK directly
            self._lib.ArtemisSubframe(h, 0, 0, self._props.nPixelsX, self._props.nPixelsY)
        return self.get('max_range')
            self._lib.ArtemisSubframe(h, 0, 0, nx, ny)
            self._subframe = [0, 0, nx, ny]
        return [nx, ny]

    def half_frame(self):
        h = self._check_connection()
        nx, ny = self._props.nPixelsX, self._props.nPixelsY
        if h:
            m = self.get('max_range')
            w, h_size = m[0] // 2, m[1] // 2
            x, y = w // 2, h_size // 2
            self._lib.ArtemisSubframe(h, x, y, w, h_size)
        return [self._props.nPixelsX // 2, self._props.nPixelsY // 2]
            self._lib.ArtemisSubframe(h, 0, ny // 4, nx, ny // 2)
            self._subframe = [0, ny // 4, nx, ny // 2]
        return [nx, ny // 2]

    def small_frame(self):
        h = self._check_connection()
        nx = self._props.nPixelsX
        if h:
            m = self.get('max_range')
            w, h_size = m[0] // 10, m[1] // 10
            x, y = (m[0] * 9) // 20, (m[1] * 9) // 20
            self._lib.ArtemisSubframe(h, x, y, w, h_size)
        return [self._props.nPixelsX // 10, self._props.nPixelsY // 10]
            self._lib.ArtemisSubframe(h, 0, 1500, nx, 500)
            self._subframe = [0, 1500, nx, 500]
        return [nx, 500]

    # --- Properties ---

@@ -222,6 +223,10 @@ class Camera(BaseDevice):
    def filter(self):
        return 0

    @filter.setter
    def filter(self, n):
        pass

    @property
    def max_range(self):
        return [self._props.nPixelsX, self._props.nPixelsY]
@@ -251,16 +256,27 @@ class Camera(BaseDevice):
        # We scale it to 0-100 to match Noctua standards.
        fan_power = round((level.value / 255.0) * 100) if maxl.value > 0 else 0

        res = {
            "ambient": None,  # Not available on Atik 11000 via standard cooling call
        b_x, b_y = bx.value, by.value
        x_sf, y_sf, w_sf, h_sf = self._subframe
        nx, ny = self._props.nPixelsX, self._props.nPixelsY
        x_start = x_sf // b_x if b_x else 0
        y_start = y_sf // b_y if b_y else 0
        x_end = (x_sf + w_sf) // b_x if b_x else 0
        y_end = (y_sf + h_sf) // b_y if b_y else 0

        return {
            "ambient": None,
            "setpoint": setp.value / 100.0,
            "temperature": self.temperature,
            "cooler": bool(flags.value & 64), # Bit 6 is Cooling ON
            "cooler": bool(flags.value & 64),
            "fan": fan_power,
            "binning": [bx.value, by.value],
            "max_range": self.max_range,
            "binning": [b_x, b_y],
            "max_range": [nx // b_x if b_x else nx, ny // b_y if b_y else ny],
            "xystart": [x_start, y_start],
            "xyend": [x_end, y_end],
            "xrange": [x_start, x_end],
            "yrange": [y_start, y_end],
            "center": [nx // b_x // 2 if b_x else nx // 2, ny // b_y // 2 if b_y else ny // 2],
            "state": self.state,
            "description": self._props.Description.decode()
            "description": self._props.Description.decode(),
        }

        return res
+7 −0
Original line number Diff line number Diff line
@@ -299,6 +299,7 @@ class Camera(STX):
            return [None, None]

        self.set_window(0, 0, int(cam_x), int(cam_y))
        return [int(cam_x), int(cam_y)]

    def half_frame(self):
        """Sets the camera to use a centered 50% sub-frame."""
@@ -320,6 +321,7 @@ class Camera(STX):
        height = int(cam_y) // 2

        self.set_window(start_x, start_y, width, height)
        return [width, height]

    def small_frame(self):
        """Sets the camera to use a centered 10% sub-frame."""
@@ -341,6 +343,7 @@ class Camera(STX):
        height = int(cam_y) // 10

        self.set_window(start_x, start_y, width, height)
        return [width, height]

    @property
    def binning(self):
@@ -492,6 +495,8 @@ class Camera(STX):
                    "xrange":  [None, None],
                    "yrange":  [None, None],
                    "center":  [None, None],
                    "state": None,
                    "description": None,
                    }

        return {"ambient": round(ambient, 1),
@@ -506,6 +511,8 @@ class Camera(STX):
                "xrange": [x_start, x_end],
                "yrange": [y_start, y_end],
                "center": [int(camx) // b_x // 2, int(camy) // b_y // 2],
                "state": self.state,
                "description": self.description,
                }

    @property
+135 −0
Original line number Diff line number Diff line
import ctypes
import os
import tempfile
import unittest
from unittest.mock import patch, MagicMock, ANY

from noctua.devices.atik import Camera


class TestAtikCameraMethods(unittest.TestCase):
    """Unit tests for atik.Camera (devices.cam2)."""

    def setUp(self):
        with patch('ctypes.CDLL'):
            self.camera = Camera("dummy")

        # Replace lib with a controllable mock; simulate a connected camera
        self.mock_lib = MagicMock()
        self.camera._lib = self.mock_lib
        self.camera._handle = MagicMock(name='fake_handle')  # truthy → _check_connection returns it
        self.camera._props.nPixelsX = 4096
        self.camera._props.nPixelsY = 4126
        self.camera._subframe = [0, 0, 4096, 4126]
        self.camera.error = []

    # --- abort ---

    def test_abort(self):
        self.camera.abort()
        self.mock_lib.ArtemisAbortExposure.assert_called_once_with(self.camera._handle)

    # --- start ---

    def test_start_light(self):
        self.camera.start(10.0, 1)
        self.mock_lib.ArtemisSetDarkMode.assert_called_once_with(self.camera._handle, False)
        self.mock_lib.ArtemisStartExposure.assert_called_once()

    def test_start_dark(self):
        self.camera.start(10.0, 0)
        self.mock_lib.ArtemisSetDarkMode.assert_called_once_with(self.camera._handle, True)

    def test_start_bias_is_dark(self):
        self.camera.start(0.0, 2)
        self.mock_lib.ArtemisSetDarkMode.assert_called_once_with(self.camera._handle, True)

    def test_start_accepts_datetime_kwarg(self):
        # datetime is accepted but unused by the Atik SDK path
        self.camera.start(5.0, 1, datetime="2024-06-01T00:00:00.000")
        self.mock_lib.ArtemisStartExposure.assert_called_once()

    # --- full_frame / half_frame / small_frame ---

    def test_full_frame_returns_size(self):
        result = self.camera.full_frame()
        self.assertEqual(result, [4096, 4126])
        self.mock_lib.ArtemisSubframe.assert_called_once_with(
            self.camera._handle, 0, 0, 4096, 4126)
        self.assertEqual(self.camera._subframe, [0, 0, 4096, 4126])

    def test_half_frame_returns_size(self):
        result = self.camera.half_frame()
        self.assertEqual(result, [4096, 4126 // 2])
        self.mock_lib.ArtemisSubframe.assert_called_once_with(
            self.camera._handle, 0, 4126 // 4, 4096, 4126 // 2)
        self.assertEqual(self.camera._subframe, [0, 4126 // 4, 4096, 4126 // 2])

    def test_small_frame_returns_size(self):
        result = self.camera.small_frame()
        self.assertEqual(result, [4096, 500])
        self.mock_lib.ArtemisSubframe.assert_called_once_with(
            self.camera._handle, 0, 1500, 4096, 500)
        self.assertEqual(self.camera._subframe, [0, 1500, 4096, 500])

    # --- state ---

    def test_state(self):
        self.mock_lib.ArtemisCameraState.return_value = 0
        self.assertEqual(self.camera.state, 0)

    # --- binning ---

    def test_binning_get_calls_sdk(self):
        result = self.camera.binning
        self.assertIsInstance(result, list)
        self.assertEqual(len(result), 2)
        self.mock_lib.ArtemisGetBin.assert_called_once_with(self.camera._handle, ANY, ANY)

    def test_binning_set(self):
        self.camera.binning = [2, 2]
        self.mock_lib.ArtemisBin.assert_called_once_with(self.camera._handle, 2, 2)

    # --- filter ---

    def test_filter_get_returns_zero(self):
        self.assertEqual(self.camera.filter, 0)

    def test_filter_set_is_noop(self):
        self.camera.filter = 3
        self.assertEqual(self.mock_lib.method_calls, [])  # no SDK calls

    # --- all ---

    def test_all_has_all_keys(self):
        self.mock_lib.ArtemisCameraState.return_value = 0
        result = self.camera.all
        expected_keys = {
            "ambient", "setpoint", "temperature", "cooler", "fan",
            "binning", "max_range", "xystart", "xyend",
            "xrange", "yrange", "center", "state", "description",
        }
        self.assertEqual(set(result.keys()), expected_keys)

    def test_all_xystart_reflects_subframe(self):
        self.camera._subframe = [512, 256, 4096, 500]
        self.mock_lib.ArtemisCameraState.return_value = 0
        result = self.camera.all
        # binning defaults to 0 from mock → guard gives 0; check keys exist
        self.assertIn("xystart", result)
        self.assertIn("xyend", result)

    # --- download ---

    def test_download_polls_image_ready(self):
        self.mock_lib.ArtemisImageReady.return_value = 1   # ready immediately
        self.mock_lib.ArtemisImageBuffer.return_value = 0  # null ptr → skip write
        with tempfile.TemporaryDirectory() as tmpdir:
            self.camera.download(filepath=os.path.join(tmpdir, "test.fits"))
        self.mock_lib.ArtemisImageReady.assert_called()
        self.mock_lib.ArtemisGetImageData.assert_called()
        self.mock_lib.ArtemisImageBuffer.assert_called()


if __name__ == '__main__':
    unittest.main()
+177 −0
Original line number Diff line number Diff line
import os
import tempfile
import unittest
from unittest.mock import patch, MagicMock

from noctua.devices.stx import Camera


def mock_resp(text):
    """Build a minimal mock requests.Response."""
    r = MagicMock()
    r.text = text if isinstance(text, str) else text.decode()
    r.content = text if isinstance(text, bytes) else text.encode()
    r.raise_for_status.return_value = None
    return r


class TestSTXCameraMethods(unittest.TestCase):
    """Unit tests for stx.Camera (devices.cam)."""

    def setUp(self):
        self.camera = Camera("http://mock-stx")
        self.camera._command_interval = 0  # skip 50ms throttle in tests
        self.camera.error = []

    # --- abort ---

    @patch('noctua.devices.stx.requests.get')
    def test_abort(self, mock_get):
        mock_get.return_value = mock_resp("OK\r\n")
        self.camera.abort()
        args, _ = mock_get.call_args
        self.assertIn("ImagerAbortExposure", args[0])

    # --- start ---

    @patch('noctua.devices.stx.requests.get')
    def test_start_when_idle(self, mock_get):
        mock_get.side_effect = [
            mock_resp("0\r\n"),   # state → 0.0 (idle)
            mock_resp("OK\r\n"),  # StartExposure
        ]
        self.camera.start(10.0, 1)
        self.assertEqual(mock_get.call_count, 2)
        self.assertEqual(self.camera.error, [])

    @patch('noctua.devices.stx.requests.get')
    def test_start_not_idle(self, mock_get):
        mock_get.return_value = mock_resp("2\r\n")  # state = 2 (exposing)
        self.camera.start(10.0, 1)
        # state is queried twice: once in `if self.state` and once in the log message
        for call_args in mock_get.call_args_list:
            self.assertIn("ImagerState", call_args[0][0])
        self.assertIn("Camera not idle", self.camera.error)

    @patch('noctua.devices.stx.requests.get')
    def test_start_passes_datetime(self, mock_get):
        mock_get.side_effect = [
            mock_resp("0\r\n"),
            mock_resp("OK\r\n"),
        ]
        self.camera.start(5.0, 1, datetime="2024-06-01T00:00:00.000")
        _, kwargs = mock_get.call_args
        self.assertIn("DateTime", kwargs.get('params', ''))

    # --- full_frame / half_frame / small_frame ---

    @patch('noctua.devices.stx.requests.get')
    def test_full_frame_returns_size(self, mock_get):
        mock_get.side_effect = [
            mock_resp("4096\r\n4126\r\n"),  # GetSettings → cam_x, cam_y
            mock_resp("0\r\n"),              # state check inside set_window
            mock_resp("OK\r\n"),             # SetSettings
        ]
        result = self.camera.full_frame()
        self.assertEqual(result, [4096, 4126])

    @patch('noctua.devices.stx.requests.get')
    def test_half_frame_returns_size(self, mock_get):
        mock_get.side_effect = [
            mock_resp("4096\r\n4126\r\n"),
            mock_resp("0\r\n"),
            mock_resp("OK\r\n"),
        ]
        result = self.camera.half_frame()
        self.assertEqual(result, [4096 // 2, 4126 // 2])

    @patch('noctua.devices.stx.requests.get')
    def test_small_frame_returns_size(self, mock_get):
        mock_get.side_effect = [
            mock_resp("4096\r\n4126\r\n"),
            mock_resp("0\r\n"),
            mock_resp("OK\r\n"),
        ]
        result = self.camera.small_frame()
        self.assertEqual(result, [4096 // 10, 4126 // 10])

    # --- state ---

    @patch('noctua.devices.stx.requests.get')
    def test_state(self, mock_get):
        mock_get.return_value = mock_resp("0\r\n")
        self.assertEqual(self.camera.state, 0.0)

    # --- binning ---

    @patch('noctua.devices.stx.requests.get')
    def test_binning_get(self, mock_get):
        mock_get.return_value = mock_resp("2\r\n2\r\n")
        self.assertEqual(self.camera.binning, [2, 2])

    @patch('noctua.devices.stx.requests.get')
    def test_binning_set(self, mock_get):
        mock_get.side_effect = [
            mock_resp("0\r\n"),   # state check
            mock_resp("OK\r\n"),  # SetSettings
        ]
        self.camera.binning = [2, 2]
        self.assertEqual(mock_get.call_count, 2)

    # --- filter ---

    @patch('noctua.devices.stx.requests.get')
    def test_filter_get(self, mock_get):
        mock_get.return_value = mock_resp("3\r\n")
        self.assertEqual(self.camera.filter, 3.0)

    @patch('noctua.devices.stx.requests.get')
    def test_filter_set(self, mock_get):
        mock_get.side_effect = [
            mock_resp("0\r\n"),   # is_moving check
            mock_resp("OK\r\n"),  # ChangeFilter
        ]
        self.camera.filter = 2
        self.assertEqual(mock_get.call_count, 2)

    # --- all ---

    @patch('noctua.devices.stx.requests.get')
    def test_all_has_all_keys(self, mock_get):
        # 13 values matching the ImagerGetSettings params list
        settings = (
            "20.5\r\n-10.0\r\n-5.3\r\n"   # ambient, setpoint, temp
            "1\r\n45\r\n"                   # cooler, fan
            "1\r\n1\r\n"                    # binX, binY
            "4096\r\n4126\r\n"              # camX, camY
            "0\r\n0\r\n4096\r\n4126\r\n"   # startX, startY, numX, numY
        )
        mock_get.side_effect = [
            mock_resp(settings),
            mock_resp("0\r\n"),           # state
            mock_resp("STX-16803\r\n"),   # description
        ]
        result = self.camera.all
        expected_keys = {
            "ambient", "setpoint", "temperature", "cooler", "fan",
            "binning", "max_range", "xystart", "xyend",
            "xrange", "yrange", "center", "state", "description",
        }
        self.assertEqual(set(result.keys()), expected_keys)

    # --- download ---

    @patch('noctua.devices.stx.requests.get')
    def test_download_writes_file(self, mock_get):
        fake_content = b"SIMPLE  =                    T"
        mock_get.return_value = mock_resp(fake_content)
        with tempfile.TemporaryDirectory() as tmpdir:
            filepath = os.path.join(tmpdir, "test.fits")
            self.camera.download(filepath=filepath)
            self.assertTrue(os.path.exists(filepath))
            with open(filepath, 'rb') as f:
                self.assertEqual(f.read(), fake_content)


if __name__ == '__main__':
    unittest.main()