Loading noctua/devices/mako.py +148 −115 Original line number Diff line number Diff line Loading @@ -2,18 +2,18 @@ # -*- coding: utf-8 -*- """ Driver for Allied Vision Mako cameras using VmbPy. Driver for Allied Vision Mako cameras using VmbPy with persistent context. """ # System modules import os import threading import numpy as np from datetime import datetime # Third-party modules import cv2 from astropy.io import fits from vmbpy import VmbSystem, VmbCameraError, VmbFeatureError, Frame from vmbpy import VmbSystem, FrameStatus # Custom modules from .basedevice import BaseDevice Loading @@ -21,12 +21,12 @@ from .basedevice import BaseDevice class Mako(BaseDevice): """ Base wrapper class for Allied Vision Mako cameras. Base wrapper class for Allied Vision Mako cameras with persistent connection. """ def __init__(self, url): """ Initialize the Mako camera device. Initialize the Mako camera parameters. Parameters ---------- Loading @@ -37,6 +37,40 @@ class Mako(BaseDevice): super().__init__(url) self.id = url self.vmb = VmbSystem.get_instance() self._cam = None self._streaming = False def _check_connection(self): """ Internal method to manage persistent Vimba and Camera context. Returns ------- vmbpy.Camera The opened camera instance. """ if self._cam is None: # Manually enter VmbSystem and Camera contexts self.vmb.__enter__() try: self._cam = self.vmb.get_camera_by_id(self.id) self._cam.__enter__() # Setup GigE packet size once at connection try: stream = self._cam.get_streams()[0] stream.GVSPAdjustPacketSize.run() while not stream.GVSPAdjustPacketSize.is_done(): pass except Exception: pass except Exception as e: self._cam = None raise ConnectionError(f"Could not connect to Mako camera {self.id}: {e}") return self._cam def get(self, feature_name): Loading @@ -46,11 +80,10 @@ class Mako(BaseDevice): Parameters ---------- feature_name : str The name of the Vimba feature to query. Vimba feature name. """ with self.vmb as vmb: with vmb.get_camera_by_id(self.id) as cam: cam = self._check_connection() feature = getattr(cam, feature_name) return feature.get() Loading @@ -62,46 +95,62 @@ class Mako(BaseDevice): Parameters ---------- feature_name : str The name of the Vimba feature to set. Vimba feature name. value : any The value to assign to the feature. Value to set. """ with self.vmb as vmb: with vmb.get_camera_by_id(self.id) as cam: cam = self._check_connection() feature = getattr(cam, feature_name) feature.set(value) def __del__(self): """ Ensure clean shutdown of camera and Vimba system. """ if self._cam: if self._streaming: self._cam.stop_streaming() self._cam.__exit__(None, None, None) if self.vmb: self.vmb.__exit__(None, None, None) class Webcam(Mako): """ High-level interface for Mako cameras used as technical webcams. High-level interface for Mako cameras with streaming and capture support. """ def __init__(self, url): """ Initialize the Webcam instance. Parameters ---------- url : str Camera IP address or Unique ID. """ super().__init__(url) self._streaming = False self._last_frame = None self._lock = threading.Lock() self._range = [0, 255] def _frame_handler(self, cam, stream, frame): """ Callback to update the latest frame during streaming. """ if frame.get_status() == FrameStatus.Complete: with self._lock: # Copy the frame data to allow the SDK to reuse the buffer self._last_frame = frame.as_numpy_ndarray().copy() cam.queue_frame(frame) @property def streaming(self): """ Get the current streaming status. Returns ------- bool True if the camera is streaming, False otherwise. """ return self._streaming Loading @@ -111,30 +160,14 @@ class Webcam(Mako): def streaming(self, b): """ Start or stop camera streaming. Parameters ---------- b : bool True to start, False to stop. """ cam = self._check_connection() if b == self._streaming: return with self.vmb as vmb: with vmb.get_camera_by_id(self.id) as cam: if b: # Setup GigE packet size for stable stream try: stream = cam.get_streams()[0] stream.GVSPAdjustPacketSize.run() while not stream.GVSPAdjustPacketSize.is_done(): pass except (AttributeError, VmbFeatureError): pass # Start streaming with a dummy handler as we don't need frame processing cam.start_streaming(handler=lambda cam, stream, frame: cam.queue_frame(frame)) cam.start_streaming(handler=self._frame_handler, buffer_count=5) self._streaming = True else: cam.stop_streaming() Loading @@ -144,39 +177,31 @@ class Webcam(Mako): @property def image(self): """ Get a single frame from the camera. Get the latest image. Works both during streaming or single capture. Returns ------- numpy.ndarray or None The image data, or None if streaming is active. numpy.ndarray The image data. """ if self._streaming: self.error = ["streaming is True. first stop it"] return None with self.vmb as vmb: with vmb.get_camera_by_id(self.id) as cam: with self._lock: return self._last_frame else: cam = self._check_connection() frame = cam.get_frame() return frame.as_numpy_ndarray() def save_image(self, filename="temp.png"): """ Save a single frame as a PNG image. Parameters ---------- filename : str Output file path. Save the current frame as a PNG image using software stretching range. """ data = self.image if data is None: return self.error # Rescale data to 8-bit using the defined range if data is not None: # Rescale data to 8-bit using the defined range [min, max] rescaled = np.clip(data, self._range[0], self._range[1]) rescaled = ((rescaled - self._range[0]) / (self._range[1] - self._range[0]) * 255).astype(np.uint8) cv2.imwrite(filename, rescaled) Loading @@ -186,21 +211,13 @@ class Webcam(Mako): def save_fits(self, filename=None): """ Save a single frame as a FITS file. Parameters ---------- filename : str, optional Output file path. Save the current frame as a FITS file. """ data = self.image if data is None: return self.error if data is not None: if filename is None: filename = datetime.utcnow().strftime("%Y-%m-%dT%H_%M_%S.fits") hdu = fits.PrimaryHDU(data) hdu.writeto(filename, overwrite=True) Loading @@ -210,12 +227,7 @@ class Webcam(Mako): @property def range(self): """ Get the dynamic range limits for image conversion. Returns ------- list [min, max] values. Get/Set software dynamic range [min, max] for PNG conversion (0-255). """ return self._range Loading @@ -224,40 +236,61 @@ class Webcam(Mako): @range.setter def range(self, r): """ Set the dynamic range limits for image conversion. Parameters ---------- r : list [min, max] values. Set the dynamic range limits. """ self._range = r @property def white(self): def autowhite(self): """ Get the current white balance auto setting. Returns ------- str The white balance mode. Get/Set white balance auto status ('Continuous', 'Off', 'Once'). """ return self.get("BalanceWhiteAuto") @white.setter def white(self, value): @autowhite.setter def autowhite(self, value): """ Set the white balance auto setting. Parameters ---------- value : str The mode ('Continuous', 'Off', 'Once'). Set white balance auto status. """ self.put("BalanceWhiteAuto", value) @property def autoexpose(self): """ Get/Set exposure auto status ('Continuous', 'Off', 'Once'). """ return self.get("ExposureAuto").as_tuple() @autoexpose.setter def autoexpose(self, value): """ Set exposure auto status. ('Continuous', 'Off', 'Once') """ self.put("ExposureAuto", value) @property def exptime(self): """ Get/Set exposure time in microseconds. """ return self.get("ExposureTime") @exptime.setter def exptime(self, value): """ Set exposure time in microseconds. Only applicable if autoexpose is 'Off'. """ self.put("ExposureTime", float(value)) Loading
noctua/devices/mako.py +148 −115 Original line number Diff line number Diff line Loading @@ -2,18 +2,18 @@ # -*- coding: utf-8 -*- """ Driver for Allied Vision Mako cameras using VmbPy. Driver for Allied Vision Mako cameras using VmbPy with persistent context. """ # System modules import os import threading import numpy as np from datetime import datetime # Third-party modules import cv2 from astropy.io import fits from vmbpy import VmbSystem, VmbCameraError, VmbFeatureError, Frame from vmbpy import VmbSystem, FrameStatus # Custom modules from .basedevice import BaseDevice Loading @@ -21,12 +21,12 @@ from .basedevice import BaseDevice class Mako(BaseDevice): """ Base wrapper class for Allied Vision Mako cameras. Base wrapper class for Allied Vision Mako cameras with persistent connection. """ def __init__(self, url): """ Initialize the Mako camera device. Initialize the Mako camera parameters. Parameters ---------- Loading @@ -37,6 +37,40 @@ class Mako(BaseDevice): super().__init__(url) self.id = url self.vmb = VmbSystem.get_instance() self._cam = None self._streaming = False def _check_connection(self): """ Internal method to manage persistent Vimba and Camera context. Returns ------- vmbpy.Camera The opened camera instance. """ if self._cam is None: # Manually enter VmbSystem and Camera contexts self.vmb.__enter__() try: self._cam = self.vmb.get_camera_by_id(self.id) self._cam.__enter__() # Setup GigE packet size once at connection try: stream = self._cam.get_streams()[0] stream.GVSPAdjustPacketSize.run() while not stream.GVSPAdjustPacketSize.is_done(): pass except Exception: pass except Exception as e: self._cam = None raise ConnectionError(f"Could not connect to Mako camera {self.id}: {e}") return self._cam def get(self, feature_name): Loading @@ -46,11 +80,10 @@ class Mako(BaseDevice): Parameters ---------- feature_name : str The name of the Vimba feature to query. Vimba feature name. """ with self.vmb as vmb: with vmb.get_camera_by_id(self.id) as cam: cam = self._check_connection() feature = getattr(cam, feature_name) return feature.get() Loading @@ -62,46 +95,62 @@ class Mako(BaseDevice): Parameters ---------- feature_name : str The name of the Vimba feature to set. Vimba feature name. value : any The value to assign to the feature. Value to set. """ with self.vmb as vmb: with vmb.get_camera_by_id(self.id) as cam: cam = self._check_connection() feature = getattr(cam, feature_name) feature.set(value) def __del__(self): """ Ensure clean shutdown of camera and Vimba system. """ if self._cam: if self._streaming: self._cam.stop_streaming() self._cam.__exit__(None, None, None) if self.vmb: self.vmb.__exit__(None, None, None) class Webcam(Mako): """ High-level interface for Mako cameras used as technical webcams. High-level interface for Mako cameras with streaming and capture support. """ def __init__(self, url): """ Initialize the Webcam instance. Parameters ---------- url : str Camera IP address or Unique ID. """ super().__init__(url) self._streaming = False self._last_frame = None self._lock = threading.Lock() self._range = [0, 255] def _frame_handler(self, cam, stream, frame): """ Callback to update the latest frame during streaming. """ if frame.get_status() == FrameStatus.Complete: with self._lock: # Copy the frame data to allow the SDK to reuse the buffer self._last_frame = frame.as_numpy_ndarray().copy() cam.queue_frame(frame) @property def streaming(self): """ Get the current streaming status. Returns ------- bool True if the camera is streaming, False otherwise. """ return self._streaming Loading @@ -111,30 +160,14 @@ class Webcam(Mako): def streaming(self, b): """ Start or stop camera streaming. Parameters ---------- b : bool True to start, False to stop. """ cam = self._check_connection() if b == self._streaming: return with self.vmb as vmb: with vmb.get_camera_by_id(self.id) as cam: if b: # Setup GigE packet size for stable stream try: stream = cam.get_streams()[0] stream.GVSPAdjustPacketSize.run() while not stream.GVSPAdjustPacketSize.is_done(): pass except (AttributeError, VmbFeatureError): pass # Start streaming with a dummy handler as we don't need frame processing cam.start_streaming(handler=lambda cam, stream, frame: cam.queue_frame(frame)) cam.start_streaming(handler=self._frame_handler, buffer_count=5) self._streaming = True else: cam.stop_streaming() Loading @@ -144,39 +177,31 @@ class Webcam(Mako): @property def image(self): """ Get a single frame from the camera. Get the latest image. Works both during streaming or single capture. Returns ------- numpy.ndarray or None The image data, or None if streaming is active. numpy.ndarray The image data. """ if self._streaming: self.error = ["streaming is True. first stop it"] return None with self.vmb as vmb: with vmb.get_camera_by_id(self.id) as cam: with self._lock: return self._last_frame else: cam = self._check_connection() frame = cam.get_frame() return frame.as_numpy_ndarray() def save_image(self, filename="temp.png"): """ Save a single frame as a PNG image. Parameters ---------- filename : str Output file path. Save the current frame as a PNG image using software stretching range. """ data = self.image if data is None: return self.error # Rescale data to 8-bit using the defined range if data is not None: # Rescale data to 8-bit using the defined range [min, max] rescaled = np.clip(data, self._range[0], self._range[1]) rescaled = ((rescaled - self._range[0]) / (self._range[1] - self._range[0]) * 255).astype(np.uint8) cv2.imwrite(filename, rescaled) Loading @@ -186,21 +211,13 @@ class Webcam(Mako): def save_fits(self, filename=None): """ Save a single frame as a FITS file. Parameters ---------- filename : str, optional Output file path. Save the current frame as a FITS file. """ data = self.image if data is None: return self.error if data is not None: if filename is None: filename = datetime.utcnow().strftime("%Y-%m-%dT%H_%M_%S.fits") hdu = fits.PrimaryHDU(data) hdu.writeto(filename, overwrite=True) Loading @@ -210,12 +227,7 @@ class Webcam(Mako): @property def range(self): """ Get the dynamic range limits for image conversion. Returns ------- list [min, max] values. Get/Set software dynamic range [min, max] for PNG conversion (0-255). """ return self._range Loading @@ -224,40 +236,61 @@ class Webcam(Mako): @range.setter def range(self, r): """ Set the dynamic range limits for image conversion. Parameters ---------- r : list [min, max] values. Set the dynamic range limits. """ self._range = r @property def white(self): def autowhite(self): """ Get the current white balance auto setting. Returns ------- str The white balance mode. Get/Set white balance auto status ('Continuous', 'Off', 'Once'). """ return self.get("BalanceWhiteAuto") @white.setter def white(self, value): @autowhite.setter def autowhite(self, value): """ Set the white balance auto setting. Parameters ---------- value : str The mode ('Continuous', 'Off', 'Once'). Set white balance auto status. """ self.put("BalanceWhiteAuto", value) @property def autoexpose(self): """ Get/Set exposure auto status ('Continuous', 'Off', 'Once'). """ return self.get("ExposureAuto").as_tuple() @autoexpose.setter def autoexpose(self, value): """ Set exposure auto status. ('Continuous', 'Off', 'Once') """ self.put("ExposureAuto", value) @property def exptime(self): """ Get/Set exposure time in microseconds. """ return self.get("ExposureTime") @exptime.setter def exptime(self, value): """ Set exposure time in microseconds. Only applicable if autoexpose is 'Off'. """ self.put("ExposureTime", float(value))