Commit 545696a2 authored by vertighel's avatar vertighel
Browse files

Started implementing AlliedVision Mako Camera device

parent 91baca41
Loading
Loading
Loading
Loading
+12 −7
Original line number Diff line number Diff line
@@ -6,13 +6,18 @@
# node = MY_NODE_NAME    # You defined it in nodes.ini
# #####################################################

[stage]
module = mercury
class = Stage
node = DUMMY
pos_imaging = 120
pos_spectro = 110
pos_echelle = 100
[teccam]
module = mako
class = Webcam
node = MAKO

# [stage]
# module = mercury
# class = Stage
# node = DUMMY
# pos_imaging = 120
# pos_spectro = 110
# pos_echelle = 100

# [pdu]
# module = netio

noctua/devices/mako.py

0 → 100644
+263 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
Driver for Allied Vision Mako cameras using VmbPy.
"""

# System modules
import os
import numpy as np
from datetime import datetime

# Third-party modules
import cv2
from astropy.io import fits
from vmbpy import VmbSystem, VmbCameraError, VmbFeatureError, Frame

# Custom modules
from .basedevice import BaseDevice


class Mako(BaseDevice):
    """
    Base wrapper class for Allied Vision Mako cameras.
    """

    def __init__(self, url):
        """
        Initialize the Mako camera device.

        Parameters
        ----------
        url : str
            Camera IP address or Unique ID.
        """

        super().__init__(url)
        self.id = url
        self.vmb = VmbSystem.get_instance()


    def get(self, feature_name):
        """
        Get a specific camera feature value.

        Parameters
        ----------
        feature_name : str
            The name of the Vimba feature to query.
        """

        with self.vmb as vmb:
            with vmb.get_camera_by_id(self.id) as cam:
                feature = getattr(cam, feature_name)
                return feature.get()


    def put(self, feature_name, value):
        """
        Set a specific camera feature value.

        Parameters
        ----------
        feature_name : str
            The name of the Vimba feature to set.
        value : any
            The value to assign to the feature.
        """

        with self.vmb as vmb:
            with vmb.get_camera_by_id(self.id) as cam:
                feature = getattr(cam, feature_name)
                feature.set(value)


class Webcam(Mako):
    """
    High-level interface for Mako cameras used as technical webcams.
    """

    def __init__(self, url):
        """
        Initialize the Webcam instance.

        Parameters
        ----------
        url : str
            Camera IP address or Unique ID.
        """

        super().__init__(url)
        self._streaming = False
        self._range = [0, 255]


    @property
    def streaming(self):
        """
        Get the current streaming status.

        Returns
        -------
        bool
            True if the camera is streaming, False otherwise.
        """

        return self._streaming


    @streaming.setter
    def streaming(self, b):
        """
        Start or stop camera streaming.

        Parameters
        ----------
        b : bool
            True to start, False to stop.
        """

        if b == self._streaming:
            return

        with self.vmb as vmb:
            with vmb.get_camera_by_id(self.id) as cam:
                if b:
                    # Setup GigE packet size for stable stream
                    try:
                        stream = cam.get_streams()[0]
                        stream.GVSPAdjustPacketSize.run()
                        while not stream.GVSPAdjustPacketSize.is_done():
                            pass
                    except (AttributeError, VmbFeatureError):
                        pass

                    # Start streaming with a dummy handler as we don't need frame processing
                    cam.start_streaming(handler=lambda cam, stream, frame: cam.queue_frame(frame))
                    self._streaming = True
                else:
                    cam.stop_streaming()
                    self._streaming = False


    @property
    def image(self):
        """
        Get a single frame from the camera.

        Returns
        -------
        numpy.ndarray or None
            The image data, or None if streaming is active.
        """

        if self._streaming:
            self.error = ["streaming is True. first stop it"]
            return None

        with self.vmb as vmb:
            with vmb.get_camera_by_id(self.id) as cam:
                frame = cam.get_frame()
                return frame.as_numpy_ndarray()


    def save_image(self, filename="temp.png"):
        """
        Save a single frame as a PNG image.

        Parameters
        ----------
        filename : str
            Output file path.
        """

        data = self.image
        if data is None:
            return self.error

        # Rescale data to 8-bit using the defined range
        rescaled = np.clip(data, self._range[0], self._range[1])
        rescaled = ((rescaled - self._range[0]) / (self._range[1] - self._range[0]) * 255).astype(np.uint8)
        cv2.imwrite(filename, rescaled)
        
        return filename


    def save_fits(self, filename=None):
        """
        Save a single frame as a FITS file.

        Parameters
        ----------
        filename : str, optional
            Output file path.
        """

        data = self.image
        if data is None:
            return self.error

        if filename is None:
            filename = datetime.utcnow().strftime("%Y-%m-%dT%H_%M_%S.fits")

        hdu = fits.PrimaryHDU(data)
        hdu.writeto(filename, overwrite=True)
        
        return filename


    @property
    def range(self):
        """
        Get the dynamic range limits for image conversion.

        Returns
        -------
        list
            [min, max] values.
        """

        return self._range


    @range.setter
    def range(self, r):
        """
        Set the dynamic range limits for image conversion.

        Parameters
        ----------
        r : list
            [min, max] values.
        """

        self._range = r


    @property
    def white(self):
        """
        Get the current white balance auto setting.

        Returns
        -------
        str
            The white balance mode.
        """

        return self.get("BalanceWhiteAuto")


    @white.setter
    def white(self, value):
        """
        Set the white balance auto setting.

        Parameters
        ----------
        value : str
            The mode ('Continuous', 'Off', 'Once').
        """

        self.put("BalanceWhiteAuto", value)