Commit 91baca41 authored by vertighel's avatar vertighel
Browse files

Added devices for PDU (netio.py class Switch) and PI stage (mercury class...

Added devices for PDU (netio.py class Switch) and PI stage (mercury class Stage). Missing: add in __init__ a hardware limit configuration with an external hard stop, and eventually a software limit
parent 2ac5d164
Loading
Loading
Loading
Loading
Loading
+108 −91
Original line number Diff line number Diff line
@@ -6,107 +6,109 @@
# node = MY_NODE_NAME    # You defined it in nodes.ini
# #####################################################

[stage]
module = mercury
class = Stage
node = DUMMY
pos_imaging = 120
pos_spectro = 110
pos_echelle = 100

# [pdu]
# module = netio
# class = Switch
# node = PDU
# outlet = 1

# [tel]
# module = alpaca
# module = astelco
# class = Telescope
# node = ASCOM_REMOTE
# node = CABINET

# [foc]
# module = alpaca
# module = astelco
# class = Focuser
# node = ASCOM_REMOTE
# node = CABINET

# [rot]
# module = alpaca
# module = astelco
# class = Rotator
# node = CABINET

# [tel_temp]
# module = astelco
# class = Sensor
# node = CABINET
# outlet1 = 1
# outlet2 = 2

# [dom]
# module = alpaca
# class = Dome
# node = ASCOM_REMOTE

# [lamp]
# module = alpaca
# class = Switch
# node = ASCOM_REMOTE
# outlet = 2

[tel]
module = astelco
class = Telescope
node = CABINET

[foc]
module = astelco
class = Focuser
node = CABINET

[rot]
module = astelco
class = Rotator
node = CABINET

[tel_temp]
module = astelco
class = Sensor
node = CABINET
outlet1 = 1
outlet2 = 2

[dom]
module = alpaca
class = Dome
node = ASCOM_REMOTE

[lamp]
module = alpaca
class = Switch
node = ASCOM_REMOTE
outlet = 2

[light]
module = alpaca
class = Switch
node = ASCOM_REMOTE
outlet = 3

[cam]
module = stx
class = Camera
node = STX

[cab]
module = siemens
class = Switch
node = SIEMENS_SWITCH
outlet = 0

[sof]
module = domotics
class = Switch
node = DOMOTICS
outlet = 3

[fork]
module = domotics
class = Sensor
node = DOMOTICS
outlet1 = 1
outlet2 = 2

[rec]
module = domotics
class = Sensor
node = DOMOTICS
outlet1 = 5
outlet2 = 4

[rack]
module = domotics
class = Sensor
node = DOMOTICS
outlet1 = 7
outlet2 = 6

[ipcam]
module = ipcam
class = Webcam
node = IPCAM

[met]
module = meteo
class = Meteo
node = METEO
# [light]
# module = alpaca
# class = Switch
# node = ASCOM_REMOTE
# outlet = 3

# [cam]
# module = stx
# class = Camera
# node = STX

# [cab]
# module = siemens
# class = Switch
# node = SIEMENS_SWITCH
# outlet = 0

# [ipcam]
# module = ipcam
# class = Webcam
# node = IPCAM

# [met]
# module = meteo
# class = Meteo
# node = METEO




# [sof]
# module = domotics
# class = Switch
# node = DOMOTICS
# outlet = 3

# [fork]
# module = domotics
# class = Sensor
# node = DOMOTICS
# outlet1 = 1
# outlet2 = 2

# [rec]
# module = domotics
# class = Sensor
# node = DOMOTICS
# outlet1 = 5
# outlet2 = 4

# [rack]
# module = domotics
# class = Sensor
# node = DOMOTICS
# outlet1 = 7
# outlet2 = 6

# # [lamp]
# # module = alpaca
@@ -125,3 +127,18 @@ node = METEO
# # class = Dome
# # node = LUCA_FINI

# # [tel]
# # module = alpaca
# # class = Telescope
# # node = ASCOM_REMOTE

# # [foc]
# # module = alpaca
# # class = Focuser
# # node = ASCOM_REMOTE

# # [rot]
# # module = alpaca
# # class = Rotator
# # node = ASCOM_REMOTE
+10 −2
Original line number Diff line number Diff line
@@ -19,8 +19,12 @@ this_module = sys.modules[__name__]
nodes = configparser.ConfigParser()
devs = configparser.ConfigParser()

try:
    nodes.read(Path(__file__).parent.parent / 'config' / 'nodes.ini')
    devs.read(Path(__file__).parent.parent / 'config' / 'devices.ini')
except configparser.DuplicateSectionError as e:
    print(f"Init error: duplicate section in config file: {e}")
    sys.exit(1)


def dynamic_import(this, dev):
@@ -49,6 +53,10 @@ def dynamic_import(this, dev):
    elif cls.__name__ == "Switch":
        outlet = int(devs.get(dev, 'outlet'))
        instance = cls(url, outlet)
    elif cls.__name__ == "Stage":
        # extract all entries starting with pos_ and strip the prefix
        pos_dict = {k[4:]: float(v) for k, v in devs.items(dev) if k.startswith('pos_')}
        instance = cls(url, named_positions=pos_dict)
    else:
        instance = cls(url)

+318 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
Driver for Physik Instrumente (PI) Mercury controller using PIPython.
"""

# System modules
import time

# Third-party modules
from pipython.pidevice.gcscommands import GCSCommands
from pipython.pidevice.gcsmessages import GCSMessages
from pipython.pidevice.interfaces.piserial import PISerial
from pipython.pidevice.interfaces.pisocket import PISocket

# Custom modules
from .basedevice import BaseDevice


class Mercury(BaseDevice):
    """
    Base wrapper class for Physik Instrumente Mercury controllers.
    """

    def __init__(self, url):
        """
        Initialize the Mercury device parameters.

        Parameters
        ----------
        url : str
            Serial port path (e.g., "/dev/ttyUSB0").
        """

        super().__init__(url)
        self.host, self.port = url.split(":")
        self.port = int(self.port)
        self.baudrate = 115200
        self.axis = "1"
        self._gateway = None
        self.pidevice = None


    # def _check_connection(self):
    #     """
    #     Internal method to manage the context manager state.

    #     Returns
    #     -------
    #     GCSCommands
    #         The active PI GCS commands instance.
    #     """

    #     if self.pidevice is None:
    #         self._gateway = PISerial(port=self.port, baudrate=self.baudrate)
    #         # Internal management of the library context manager
    #         gateway_instance = self._gateway.__enter__()
    #         messages = GCSMessages(gateway_instance)
    #         self.pidevice = GCSCommands(messages)

    #     return self.pidevice

    # def __del__(self):
    #     """
    #     Ensure the internal context manager is closed on destruction.
    #     """

    #     if self._gateway is not None:
    #         self._gateway.__exit__(None, None, None)


    def _check_connection(self):
        """
        Internal method to manage the context managers state.

        Returns
        -------
        GCSCommands
            The active PI GCS commands instance.
        """

        if self.pidevice is None:
            # Open Gateway (Socket)
            self._gateway = PISocket(host=self.host, port=self.port)
            self._gateway.__enter__() 
        
            messages = GCSMessages(self._gateway)
        
            # 3. Creation and opening of GCSCommands
            self.pidevice = GCSCommands(messages)
            self.pidevice.__enter__() # Second context manager

        return self.pidevice

    
    def __del__(self):
        """
        Ensure the internal context managers are closed on destruction.
        """
        
        # Close GCSCommands
        if self.pidevice is not None:
            try:
                self.pidevice.__exit__(None, None, None)
            except:
                pass
        # Close Socket Gateway
        if self._gateway is not None:
            try:
                self._gateway.__exit__(None, None, None)
            except:
                pass


    def get(self, command):
        """
        Get information from the controller using GCS query commands.

        Parameters
        ----------
        command : str
            The GCS query command name without 'q'.
        """

        pidevice = self._check_connection()
        method = getattr(pidevice, f"q{command}")
        if (command == "ERR"):
            return method()
        else:
            return method(self.axis)[self.axis]


    def put(self, command, *args):
        """
        Send a command to the controller.

        Parameters
        ----------
        command : str
            The GCS command name.
        """

        pidevice = self._check_connection()
        method = getattr(pidevice, command)
        return method(self.axis, *args)

    def wait(self):
        """
        Wait until the axis stops moving using high-level helper.
        """

        pidevice = self._check_connection()
        
        time.sleep(0.2)
        
        moving = True
        while moving:
            status_dict = pidevice.IsMoving(self.axis)
            moving = status_dict[self.axis]
            
            err = pidevice.qERR()
            if err != 0:
                pass
                
            time.sleep(0.2)

    def init(self):
        """
        Perform stage initialization sequence with explicit error clearing.
        """

        pidevice = self._check_connection()

        # Clear any previous errors
        pidevice.qERR()
        # Servo on
        self.put("SVO", True)

        # Find negative limit
        self.put("FNL")
        self.wait()

        # Find positive limit
        self.put("FPL")
        self.wait()
            

class Stage(Mercury):
    """
    High-level interface for a PI linear stage.
    """
    
    def __init__(self, url, named_positions=None):
        """
        Initialize the stage with optional named positions.

        Parameters
        ----------
        url : str
            Serial port path.
        named_positions : dict, optional
            Dictionary of named positions {name: value in mm}.
        """

        super().__init__(url)
        self.named_positions = named_positions or {}
        self.tolerance = 0.05
        
    @property
    def is_moving(self):
        """
        Check if the stage is currently in motion.

        Returns
        -------
        bool
            True if moving, False otherwise.
        """

        pidevice = self._check_connection()
        return pidevice.IsMoving(self.axis)[self.axis]


    @property
    def is_init(self):
        """
        Check if the stage has been referenced.

        Returns
        -------
        bool
            True if referenced, False otherwise.
        """

        res = self.get("FRF")
        return bool(res)

    
    @property
    def position(self):
        """
        Get the current stage position in millimeters.

        Returns
        -------
        float
            Current position in mm.
        """

        res = self.get("POS")
        return res


    @position.setter
    def position(self, pos):
        """
        Move the stage to an absolute position in millimeters.

        Parameters
        ----------
        pos : float
            Target absolute position in mm.
        """

        self.put("MOV", pos)

        
    def move_rel(self, dist):
        """
        Move the stage by a relative distance.

        Parameters
        ----------
        dist : float
            Distance to move in mm.
        """

        self.put("MVR", dist)

    @property
    def named(self):
        """
        Get the name of the current position if within tolerance.

        Returns
        -------
        str or None
            The name of the position if matched, else None.
        """

        current = self.position
        for name, pos in self.named_positions.items():
            if abs(current - pos) < self.tolerance:
                return name
        return None


    @named.setter
    def named(self, value):
        """
        Move the stage to a named position.

        Parameters
        ----------
        value : str
            The name of the target position.

        Raises
        ------
        KeyError
            If the position name is not in the configuration.
        """

        if value not in self.named_positions:
            raise KeyError(f"Invalid named position: {value} != {self.named_positions}")
        
        self.position = self.named_positions[value]
+174 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

# System modules
from urllib.parse import urlencode

# Third-party modules
import requests

# Other templates
from ..utils import check
from ..utils.logger import log


class PowerPDU:
    # admin:admin
    # curl -XGET  http://192.168.1.78/netio.json 
    # curl -XPOST http://192.168.1.78/netio.json -d '{"Outputs":[ {"ID":1,"Action":1} ] }' -u netio:netio
    # see https://www.netio-products.com/en/application-notes/an21-json-https-protocol-to-control-netio-110230v-power-sockets-3x-rest-api
    
    def __init__(self, url, id):
        self.url = url
        self.addr = self.url + "/netio.json"
        self.timeout = 3
        self.error = None
        self.id = id
        requests.packages.urllib3.disable_warnings()  # For verify=False

    @check.request_errors
    def get(self, param=None, id=None):
        
        res = requests.get(self.addr,
                           timeout=self.timeout,
                           verify=False)
                           # params=base_params,

        res.raise_for_status()
        resj = res.json()
        outputs = resj['Outputs']
        return [d for d in outputs if d['ID']==id]

        # if resj["status"] != "OK":
        #     log.error(f'DOMOTICS: {resj["status"]}')
        #     if "message" in resj:
        #         log.error(resj["message"])
        # else:
        #     try:
        #         if not param:
        #             return resj
        #         else:
        #             if param == "ServerTime":
        #                 value = resj["ServerTime"]
        #             else:
        #                 value = resj["result"][0][param]
        #             return value
        #     except AttributeError as e:
        #         return res
        #     except KeyError as e:
        #         return res

    @check.request_errors
    def put(self, params={}, id=None):

        data = {"Outputs":[ {"ID":id, "Action": params} ] }
        res = requests.post(self.addr,
                            json=data,
                           timeout=self.timeout,
                           verify=False)

        res.raise_for_status()
        resj = res.json()
        return resj

    @property
    def clock(self):
        
        res = requests.get(self.addr,           
                           timeout=self.timeout,
                           verify=False)        
        res.raise_for_status()
        resj = res.json()
        return resj['Agent']['Time']

    @property
    def all(self):
        
        res = requests.get(self.addr,           
                           timeout=self.timeout,
                           verify=False)        
        res.raise_for_status()
        resj = res.json()
        return resj


class Switch(PowerPDU):
    def __init__(self, url, id):
        super().__init__(url, id)

    def reboot(self):

        while self.state:
            self.state = False

        while not self.state:
            self.state = True

        return self.state

    @property
    def state(self):
        resj = self.get(id=self.id)
        try:
            res = resj[0]['State']
            self._state = True if res == 1 else False
            return self._state
        except IndexError as e:
            log.error("No valid response")
            return None

    @state.setter
    def state(self, s):
        switchcmd = 1 if s else 0
        res = self.put(switchcmd, id=self.id)
        self._state = self.state

#     @property
#     def all(self):
#         res = self.get()
#         return res


# class Switch(Sonoff):
#     def __init__(self, url, id):
#         super().__init__(url, id)

#     def reboot(self):

#         while self.state:
#             self.state = False

#         while not self.state:
#             self.state = True

#         return self.state

#     @property
#     def state(self):
#         res = self.get("Status", id=self.id)
#         self._state = True if res == 'On' else False
#         return self._state

#     @state.setter
#     def state(self, s):
#         switchcmd = 'On' if s else 'Off'
#         params = {"param": "switchlight", "switchcmd": switchcmd}
#         res = self.put(params, id=self.id)
#         self._state = self.state


# class Sensor(Sonoff):
#     def __init__(self, url, temp_id, hum_id):
#         super().__init__(url, temp_id)
#         self.id = temp_id  # recycle for last_update
#         self.hum_id = hum_id

#     @property
#     def temperature(self):
#         res = self.get("Temp", id=self.id)
#         return res

#     @property
#     def humidity(self):
#         res = self.get("Humidity", id=self.hum_id)
#         return res
+2 −1
Original line number Diff line number Diff line
@@ -20,7 +20,8 @@ def build_url(itn):
            prot = itn["protocol"] + "://"
    else:
        prot = ""
    ip = itn["ip"]
        
    ip = itn["ip"] if itn.get("ip") else ""
    endp = itn.get("endpoint") or ""
    port = ":" + itn["port"] if itn.get("port") else ""