Commit c0375db2 authored by vertighel's avatar vertighel
Browse files

dev

parent 0c5d735f
Loading
Loading
Loading
Loading
+4 −4
Original line number Diff line number Diff line
@@ -11,10 +11,10 @@ good-names=i,
            gp, # from gnuplotlib

[MASTER]
init-hook='
    import os, sys; 
    from pylint.config import find_pylintrc; 
    sys.path.append(os.path.dirname(find_pylintrc()))
# init-hook='
#     import os, sys; 
#     from pylint.config import find_pylintrc; 
#     sys.path.append(os.path.dirname(find_pylintrc()))
    '

# To find modules when running pylint on the 'noctua' package, it's usually better to ensure

noctua/devices/lx200.py

deleted100644 → 0
+0 −403
Original line number Diff line number Diff line
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
Interface with an LX200-based device
"""

# System modules
import socket

# Custom modules
from .basedevice import BaseDevice
from ..utils import check
from ..utils.logger import log


class LX200(BaseDevice):
    '''Base wrapper class for LX200 stuff'''

    def __init__(self, url):
        '''Constructor.'''
        
        super().__init__(url)
        self.url = url.split(":")[0] or url
        self.port = url.split(":")[1] or 22
        self.timeout = 3
        self.connection = None

    @check.socket_errors
    def _connect(self):
        '''Setup a connection to the telescope, if needed.'''


    @check.socket_errors
    def _send(self, get_or_set, message=None):
        '''
        Send messages to the telescope to set or get properties, if it is
        useful to have a common metod. If not, implement directly on get/put
        '''
        
    def get(self, message):
        '''Getter method'''

        return self._send("get", message)

    def put(self, key, val):
        '''Setter method'''
        
        return self._send("set", message)


class Telescope(LX200):
    '''Implementation of the Telescope commands mocking an Alpaca
       Telescope wrapper.'''
         
    @property
    def tracking(self):
        '''Tracking status'''
        message = "POINTING.TRACK"
        res = self.get(message)
        try:
            self._tracking = True if int(res) else False
        except TypeError as e:            
            self._tracking = None
        return self._tracking

    @tracking.setter
    def tracking(self, b):
        '''(Re)Start/Stop tracking of telescope in function of currently configured target'''
        ''' 1 slew to currently configured target and start tracking'''
        ''' 0 stop tracking'''
        message = "POINTING.TRACK"
        track = 1 if b else 0
        self.put(message, track)
        return self.tracking

    def track(self):
        '''More direct way to start tracking'''
        self.tracking = True
        return self._tracking

    def abort(self):
        '''Direct way to stop tracking'''
        '''Check if need to add other abort'''
        
        self.tracking = False
        return self._tracking

    @property
    def is_moving(self):
        '''Check the motion state of the telescope'''
        
        message = "TELESCOPE.MOTION_STATE"
        res = self.get(message)
        return res

    @property
    def temperature(self):
        '''Mirrors temperature'''
        message = ["AUXILIARY.SENSOR[2].VALUE",
                   "AUXILIARY.SENSOR[3].VALUE",
                   "AUXILIARY.SENSOR[4].VALUE"]
        res = self.get(message)
        return res

    @property
    def status(self):
        '''need to check exit and error from get function'''
        message = "TELESCOPE.STATUS.LIST"
        res = self.get(message)
        return res

    @property
    def state(self):

        message = "TELESCOPE.STATUS.GLOBAL"
        res = self.get(message)
        return res

    def clear(self, n):

        res = self.put("TELESCOPE.STATUS.CLEAR_ERROR", n)
        return res

    @property
    def clock(self):
        """
        in UNIX time
        """

        message = "POSITION.LOCAL.UTC"
        res = self.get(message)
        return res

    @property
    def cover(self):

        message = "AUXILIARY.COVER.REALPOS"
        res = self.get(message)
        self._cover = res
        return self._cover

    @property
    def open(self):

        res = self.cover
        if res == 1.0:
            self._open = True
        elif res == 0.0:
            self._open = False
        else:
            self._open = None
        return self._open

    @open.setter
    def open(self, b):
        pos = 1.0 if b else 0.0
        res = self.put("AUXILIARY.COVER.TARGETPOS", pos)
        self._open = self.open

    @property
    def park(self):
        '''Get if the telescope is in parked position'''
        
        message = "TELESCOPE.READY_STATE"
        res = self.get(message)
        try:
            self._park = False if int(res) else True
        except TypeError as e:
            self._park = None
        return self._park

    @park.setter
    def park(self, b, timeout=90):
        '''Init the telescope homing the axes,
        or put the telescope in park position'''
        
        message = "TELESCOPE.READY"
        pos = 0.0 if b else 1.0
        res = self.put(message, pos)
        self._park = self.park

    @property
    def altaz(self):

        message = ["POSITION.HORIZONTAL.ALT",
                   "POSITION.HORIZONTAL.AZ"]
        res = self.get(message)
        altaz = res
        self._altaz = altaz  # [alt, az]
        return self._altaz

    @altaz.setter
    def altaz(self, a):
        
        #self.tracking = False
        keys = ["POINTING.SETUP.DEROTATOR.SYNCMODE",
                "OBJECT.HORIZONTAL.ALT",
                "OBJECT.HORIZONTAL.AZ",
                "POINTING.TRACK"]
        derot = 2 # from astelos log file "true orientation"
        track = 2 # 2: go and stay there
        values = [derot, a[0], a[1], track]
        res = self.put(keys, values)
        self._altaz = self.altaz

    @property
    def targetradec(self):
        '''Get target Right Ascension / Declination'''

        message = ["POSITION.EQUATORIAL.RA_J2000",
                   "POSITION.EQUATORIAL.DEC_J2000"]
        res = self.get(message)
        targetradec = res
        self._targetradec = targetradec
        return self._targetradec

    @targetradec.setter
    def targetradec(self, a):
        '''Set target Right Ascension / Declination'''

        keys = ["POINTING.SETUP.DEROTATOR.SYNCMODE",
                "OBJECT.EQUATORIAL.RA",
                "OBJECT.EQUATORIAL.DEC",
                "POINTING.TRACK"]
        derot = 2 # from astelos log file "true orientation"
        track = 1 # 1: go and track
        values = [derot, a[0], a[1], track]
        res = self.put(keys, values)
        self._targetradec = self.targetradec

    @property
    def radec(self):
        '''Get Right Ascension / Declination'''

        message = ["POSITION.EQUATORIAL.RA_J2000",
                   "POSITION.EQUATORIAL.DEC_J2000"]
        res = self.get(message)
        radec = res
        self._radec = radec
        return self._radec

    @radec.setter
    def radec(self, a):
        '''Set Right Ascension / Declination'''

        keys = ["POINTING.SETUP.DEROTATOR.SYNCMODE",
                "OBJECT.EQUATORIAL.RA",
                "OBJECT.EQUATORIAL.DEC",
                "POINTING.TRACK"]
        derot = 2 # from astelos log file "true orientation"
        track = 1 # 1: go and track
        values = [derot, a[0], a[1], track]
        res = self.put(keys, values)
        self._radec = self.radec

    @property
    def offset(self):
        '''Get Telescope offset'''

        message = ["POSITION.INSTRUMENTAL.ZD.OFFSET",
                   "POSITION.INSTRUMENTAL.AZ.OFFSET"]
        res = self.get(message)
        offset = res
        self._offset = offset
        return self._offset

    @offset.setter
    def offset(self, a):
        '''Set Telescope offset'''
        
        keys = ["POSITION.INSTRUMENTAL.ZD.OFFSET",
                "POSITION.INSTRUMENTAL.AZ.OFFSET"]
        values = [a[0], a[1]]
        if abs(values[0]) > 0.9:  # 54 arcmin
            log.error(f"zd {a[0]} too large. Maybe arcsec instead of deg?")
            return
        if abs(values[1]) > 0.9:  # 54 arcmin
            log.error(f"az {a[1]} too large. Maybe arcsec instead of deg?")
            return
        res = self.put(keys, values)
        self._offset = self.offset

    @property
    def coordinates(self):
        """
        Ask simultaneously
        RA2000, DEC2000, ALT, AZ, LST, UTC.
        Answer is given as a dict of decimal numbers.
        UTC is given as unix time.
        """

        message = ["POSITION.EQUATORIAL.RA_J2000",
                   "POSITION.EQUATORIAL.DEC_J2000",
                   "POSITION.HORIZONTAL.ALT",
                   "POSITION.HORIZONTAL.AZ",
                   # "OBJECT.HORIZONTAL.ALT",
                   # "OBJECT.HORIZONTAL.AZ",
                   "POSITION.LOCAL.SIDEREAL_TIME",
                   "POSITION.LOCAL.UTC"]

        res = self.get(message)

        # print(telnet_dict)

        simple_dict = {
            "radec": [res[0],
                      res[1]],
            "altaz": [res[2],
                      res[3]],
            "lst": res[4],
            "utc": res[5],
        } if res else None

        return simple_dict


class Focuser(LX200):
    '''Implementation of the Focuser commands mocking my Alpaca Telescope
       wrapper.'''

    @property
    def is_moving(self):
        '''TBD with the right telnet commands'''
        #log.warning("TBD with the right telnet commands")
        
    @property
    def position(self):
        '''Get Relative focuser position from telnet'''

        message = "POSITION.INSTRUMENTAL.FOCUS.OFFSET"
        pos = self.get(message)
        try:
            res = float(pos)*1000
        except TypeError as e:
            res =  None            
        self._position = res
        return self._position

    @position.setter
    def position(self, s):  # 0-34500=micron?
        '''Set Relative focuser position from telnet'''

        message = "POSITION.INSTRUMENTAL.FOCUS.OFFSET"
        pos = s/1000
        res = self.put(message, pos)
        self._position = self.position

    @property
    def absolute(self):
        '''Get Absolute focuser position from telnet'''

        message = "POSITION.INSTRUMENTAL.FOCUS.REALPOS"
        res = self.get(message)
        self._absolute = res
        return self._absolute

    @absolute.setter
    def absolute(self, s):  # 0-34500=micron?
        '''Set Absolute focuser position from telnet'''

        message = "POSITION.INSTRUMENTAL.FOCUS.OFFSET"
        pos = s/1000
        res = self.put(message, pos)
        self._absolute = self.absolute


class Rotator(LX200):
    '''Implementation of the Rotator commands mocking my Alpaca Telescope
       wrapper.'''

    @property
    def is_moving(self):
        '''TBD with the right telnet commands'''
        #log.warning("TBD with the right telnet commands")

    @property
    def position(self):
        '''Get Relative rotator position from telnet'''

        message = "POSITION.INSTRUMENTAL.DEROTATOR[2].OFFSET"
        res = self.get(message)
        self._position = res
        return self._position

    @position.setter
    def position(self, s):  # 0-270 deg?
        '''Set Relative rotator position from telnet'''
         
        message = "POSITION.INSTRUMENTAL.DEROTATOR[2].OFFSET"
        res = self.put(message, s)
        self._position = self.position

    @property
    def absolute(self):
        '''Get Absolute rotator position from telnet'''
        
        message = "POSITION.INSTRUMENTAL.DEROTATOR[2].REALPOS"
        res = self.get(message)
        self._absolute = res
        return self._absolute

tests/__init__.py

0 → 100644
+3 −0
Original line number Diff line number Diff line
'''
Unit tests for the package
'''
+159 −0
Original line number Diff line number Diff line
      
# tests/test_device_access.py
import unittest
from unittest.mock import patch, MagicMock, mock_open
import configparser # To mock config loading

# We need to ensure 'noctua.devices' can be imported.
# If 'noctua' is not yet installed, and tests are run from project root:
# Make sure the project root is in PYTHONPATH or use `python -m unittest ...`

class TestDevicePropertyAccess(unittest.TestCase):

    @patch('noctua.devices.configparser.ConfigParser')
    @patch('noctua.devices.importlib.import_module') # Patch import_module used in devices/__init__.py
    def setUp(self, mock_import_module, mock_config_parser_constructor):
        """
        Set up mock configurations to allow 'noctua.devices' to "load"
        dummy devices without real hardware or full config files.
        """
        # Mock ConfigParser instances
        self.mock_nodes_config = MagicMock(spec=configparser.ConfigParser)
        self.mock_devs_config = MagicMock(spec=configparser.ConfigParser)

        # Configure the constructor to return our mocks
        mock_config_parser_constructor.side_effect = [
            self.mock_nodes_config,  # First call to ConfigParser() in devices/__init__
            self.mock_devs_config    # Second call
        ]

        # --- Mock nodes.ini content ---
        # Simulating [DUMMY_NODE] ip=localhost port=1234
        self.mock_nodes_config.items.return_value = [('ip', 'localhost'), ('port', '1234')]

        # --- Mock devices.ini content ---
        # Simulate one dome and one camera
        self.mock_devs_config.sections.return_value = ['dom', 'cam', 'tel']
        
        # Values for test_dom
        def devs_get_dom(section, option):
            if section == 'dom':
                if option == 'module': return 'alpaca' # Or whatever module 'dom' usually is
                if option == 'class': return 'Dome'
                if option == 'node': return 'DUMMY_NODE'
            raise configparser.NoOptionError(option, section)

        # Values for test_cam
        def devs_get_cam(section, option):
            if section == 'cam':
                if option == 'module': return 'stx' # Or your camera module
                if option == 'class': return 'Camera'
                if option == 'node': return 'DUMMY_NODE'
                if option == 'outlet': return '0' # if it's a switch-like camera power
            raise configparser.NoOptionError(option, section)

        # Values for test_cam
        def devs_get_tel(section, option):
            if section == 'tel':
                if option == 'module': return 'astelco' # Or your camera module
                if option == 'class': return 'Telescope'
                if option == 'node': return 'DUMMY_NODE'
            raise configparser.NoOptionError(option, section)

        # Side effect for devs.get based on section
        def devs_get_side_effect(section, option):
            if section == 'dom':
                return devs_get_dom(section, option)
            elif section == 'cam':
                return devs_get_cam(section, option)
            elif section == 'tel':
                return devs_get_tel(section, option)
            raise configparser.NoSectionError(section)

        self.mock_devs_config.get.side_effect = devs_get_side_effect

        # --- Mock the dynamically imported modules and classes ---
        self.mock_dome_module = MagicMock()
        self.mock_dome_class = MagicMock()
        self.mock_dome_instance = MagicMock(spec_set=['azimuth', 'error']) # spec_set for strictness
        self.mock_dome_instance.azimuth = 123.45
        self.mock_dome_instance.error = []
        self.mock_dome_class.return_value = self.mock_dome_instance
        self.mock_dome_module.Dome = self.mock_dome_class # Class name must match devices.ini

        self.mock_camera_module = MagicMock()
        self.mock_camera_class = MagicMock()
        self.mock_camera_instance = MagicMock(spec_set=['binning', 'error'])
        self.mock_camera_instance.binning = [1,1]
        self.mock_camera_instance.error = []
        self.mock_camera_class.return_value = self.mock_camera_instance
        self.mock_camera_module.Camera = self.mock_camera_class

        self.mock_telescope_module = MagicMock()
        self.mock_telescope_class = MagicMock()
        self.mock_telescope_instance = MagicMock(spec_set=['altaz', 'error'])
        self.mock_telescope_instance.altaz = [1,1]
        self.mock_telescope_instance.error = []
        self.mock_telescope_class.return_value = self.mock_telescope_instance
        self.mock_telescope_module.Telescope = self.mock_telescope_class

        def import_module_side_effect(name, package=None):
            # The name will be like 'noctua.devices.alpaca'
            if 'alpaca' in name: # Or your actual dome module name
                return self.mock_dome_module
            elif 'stx' in name: # Or your actual camera module name
                return self.mock_camera_module
            elif 'astelco' in name: # Or your actual telescope module name
                return self.mock_camera_module
            raise ImportError(f"Mocked import_module cannot find {name}")
        
        mock_import_module.side_effect = import_module_side_effect
        
        # Now, import noctua.devices. This will trigger its __init__.py
        # We need to ensure that if devices is already imported, it's reloaded with mocks.
        # This can be tricky. It's often better to ensure tests run in a clean environment
        # or explicitly reload. For simplicity, we assume it's the first import or
        # the mocks effectively override.
        
        # If 'noctua.devices' might already be loaded by another test:
        import sys
        if 'noctua.devices' in sys.modules:
            del sys.modules['noctua.devices'] # Force a reload
            if 'noctua.devices.alpaca' in sys.modules: del sys.modules['noctua.devices.alpaca']
            if 'noctua.devices.stx' in sys.modules: del sys.modules['noctua.devices.stx']
            if 'noctua.devices.astelco' in sys.modules: del sys.modules['noctua.devices.astelco']
            # Add other device modules if necessary

        from noctua import devices
        self.devices = devices


    def test_dome_azimuth_access(self):
        """Test accessing devices.dom.azimuth."""
        # devices.test_dom is our mock_dome_instance
        self.assertEqual(self.devices.dom.azimuth, 123.45)
        self.assertEqual(self.devices.dom.error, [])

    def test_telescope_altaz_access(self):
        """Test accessing devices.test_dom.altaz."""
        self.assertEqual(self.devices.tel.altaz, [45.0, 180.0])
        self.assertEqual(self.devices.tel.error, [])

    def test_camera_binning_access(self):
        """Test accessing devices.test_cam.binning."""
        self.assertEqual(self.devices.cam.binning, [1,1])
        self.assertEqual(self.devices.cam.error, [])

    def test_non_existent_device_attribute_error(self):
        """Test accessing a non-existent device raises AttributeError."""
        with self.assertRaises(AttributeError):
            _ = self.devices.non_existent_device

# Note: This setUp is quite involved because it mocks the dynamic
# loading.  An alternative for simpler unit tests is to *not* rely on
# `from noctua import devices` and instead directly instantiate and
# test the device classes (e.g., `alpaca.Dome`) by mocking their
# specific dependencies (e.g., `requests.get`).  The test above is
# more of an "integration unit test" for the devices package loading.

    
+30 −0
Original line number Diff line number Diff line
import unittest
from unittest.mock import patch, MagicMock

# Assuming your project structure allows this import when tests are run
# from the project root (e.g., python -m unittest discover)
from noctua.devices import alpaca # Or directly import specific classes like Dome, Switch
from noctua.devices.__init__ import dynamic_import # To load devices based on config for integration-like unit tests

# You'll need a way to configure device instances for testing,
# or use the dynamic import mechanism if your test environment
# has access to dummy config files.

# For pure unit tests, directly instantiate:
# MOCK_URL = "http://mock-alpaca-server"
# test_switch = alpaca.Switch(MOCK_URL, switch_id=1, device_name="TestAlpacaSwitch")

class TestAlpacaSwitch(unittest.TestCase):
    def setUp(self):
        # This method is run before each test
        self.mock_url = "http://mock-alpaca-server.test"
        # device_name is added if you adopted the BaseDevice change. If not, remove it.
        self.switch = alpaca.Switch(self.mock_url, switch_id=0, device_name="TestAlpacaSwitch0")
        # Clear any errors from previous test runs if the instance is reused (not typical in setUp)
        if hasattr(self.switch, 'error'):
            self.switch.error = []

    # ... test methods ...

if __name__ == '__main__':
    unittest.main()
Loading