Loading noctua/devices/mako.py +78 −155 Original line number Diff line number Diff line Loading @@ -2,16 +2,18 @@ # -*- coding: utf-8 -*- """ Driver for Allied Vision Mako cameras using VmbPy with persistent context. Driver for Allied Vision Mako cameras using VmbPy. Zero-dependency version (no OpenCV, no Pillow). """ # System modules import threading import numpy as np import zlib import struct from datetime import datetime # Third-party modules import cv2 from astropy.io import fits from vmbpy import VmbSystem, FrameStatus Loading @@ -25,91 +27,39 @@ class Mako(BaseDevice): """ def __init__(self, url): """ Initialize the Mako camera parameters. Parameters ---------- url : str Camera IP address or Unique ID. """ super().__init__(url) self.id = url self.vmb = VmbSystem.get_instance() self._cam = None self._streaming = False def _check_connection(self): """ Internal method to manage persistent Vimba and Camera context. Returns ------- vmbpy.Camera The opened camera instance. """ if self._cam is None: self.vmb.__enter__() try: self._cam = self.vmb.get_camera_by_id(self.id) self._cam.__enter__() # Setup GigE packet size try: stream = self._cam.get_streams()[0] stream.GVSPAdjustPacketSize.run() while not stream.GVSPAdjustPacketSize.is_done(): pass except Exception: pass while not stream.GVSPAdjustPacketSize.is_done(): pass except: pass except Exception as e: self._cam = None raise ConnectionError(f"Could not connect to Mako camera {self.id}: {e}") raise ConnectionError(f"Connection failed: {e}") return self._cam def get(self, feature_name): """ Get a specific camera feature object. Parameters ---------- feature_name : str Vimba feature name. """ cam = self._check_connection() return getattr(cam, feature_name) return getattr(cam, feature_name).get() def put(self, feature_name, value): """ Set a specific camera feature value. Parameters ---------- feature_name : str Vimba feature name. value : any Value to set. """ cam = self._check_connection() feature = getattr(cam, feature_name) feature.set(value) getattr(cam, feature_name).set(value) def __del__(self): """ Ensure clean shutdown of camera and Vimba system. """ if self._cam: if self._streaming: self._cam.stop_streaming() self._cam.__exit__(None, None, None) if self.vmb: self.vmb.__exit__(None, None, None) Loading @@ -117,157 +67,130 @@ class Mako(BaseDevice): class Webcam(Mako): """ High-level interface for Mako cameras with streaming and capture support. High-level interface for Mako cameras using only provided stack. """ def __init__(self, url): """ Initialize the Webcam instance. """ super().__init__(url) self._last_frame = None self._lock = threading.Lock() self._range = [0, 255] self._show = False def _frame_handler(self, cam, stream, frame): """ Callback to update the latest frame during streaming. """ if frame.get_status() == FrameStatus.Complete: with self._lock: # Copy the frame data for persistent access self._last_frame = frame.as_numpy_ndarray().copy() cam.queue_frame(frame) @property def streaming(self): """ Get the current streaming status. """ return self._streaming @streaming.setter def streaming(self, b): """ Start or stop camera streaming. """ cam = self._check_connection() if b == self._streaming: return if b == self._streaming: return if b: cam.start_streaming(handler=self._frame_handler, buffer_count=5) self._streaming = True else: self._show = False cam.stop_streaming() self._streaming = False @property def image(self): """ Get the latest image. Works both during streaming or single capture. Returns ------- numpy.ndarray The image data. """ """Get the latest raw numpy array.""" if self._streaming: with self._lock: return self._last_frame else: cam = self._check_connection() frame = cam.get_frame() return frame.as_numpy_ndarray() return cam.get_frame().as_numpy_ndarray() def save_image(self, filename="temp.png"): """ Save the current frame as a PNG image using software stretching range. """ def _make_chunk(self, tag, data): """Helper for PNG chunks.""" chunk = tag + data return struct.pack("!I", len(data)) + chunk + struct.pack("!I", zlib.crc32(chunk) & 0xFFFFFFFF) data = self.image if data is not None: # Rescale data to 8-bit using the defined range [min, max] def _to_png(self, data): """Pure Python/Numpy PNG encoder (Supports Mono and RGB).""" # Rescale and clip based on self._range rescaled = np.clip(data, self._range[0], self._range[1]) rescaled = ((rescaled - self._range[0]) / (self._range[1] - self._range[0]) * 255).astype(np.uint8) cv2.imwrite(filename, rescaled) # --- FIX: Handle shape (H, W), (H, W, 1) or (H, W, 3) --- if rescaled.ndim == 3: height, width, channels = rescaled.shape # Color type: 0 = Grayscale, 2 = RGB color_type = 2 if channels == 3 else 0 else: height, width = rescaled.shape channels = 1 color_type = 0 # PNG signature png_bin = b'\x89PNG\r\n\x1a\n' # IHDR chunk: width, height, bitdepth, colortype, compression, filter, interlace ihdr = struct.pack("!2I5B", width, height, 8, color_type, 0, 0, 0) png_bin += self._make_chunk(b'IHDR', ihdr) # IDAT chunk # Reshape to ensure we are dealing with rows of bytes flat_rows = rescaled.reshape(height, width * channels) # PNG requires a 'filter byte' (0x00 for None) at the start of every scanline filtered_data = np.insert(flat_rows, 0, 0, axis=1).tobytes() png_bin += self._make_chunk(b'IDAT', zlib.compress(filtered_data)) # IEND chunk png_bin += self._make_chunk(b'IEND', b'') return png_bin def save_image(self, filename="temp.png"): """Save image using pure Python PNG encoder.""" data = self.image if data is not None: png_data = self._to_png(data) with open(filename, 'wb') as f: f.write(png_data) return filename def save_fits(self, filename="temp.fits"): """ Save the current frame as a FITS file. """ """Save image as FITS using astropy.""" raw = self.image if raw is not None: data = np.reshape(raw, (1024, 1280)) filename = datetime.utcnow().strftime(filename) hdu = fits.PrimaryHDU(data) hdu.writeto(filename, overwrite=True) if raw.ndim == 3 and raw.shape[2] == 1: data = raw.reshape(raw.shape[0], raw.shape[1]) else: data = raw hdu = fits.PrimaryHDU(data) hdu.writeto(datetime.utcnow().strftime(filename), overwrite=True) return filename @property def autoexpose(self): """ Get exposure auto status and available options. """ return self.get("ExposureAuto").as_tuple() @autoexpose.setter def autoexpose(self, value): """ Set exposure auto status ('Continuous', 'Off', 'Once'). """ self.put("ExposureAuto", value) def show_stream(self): """ Open an OpenCV window in a separate thread for visual monitoring. Instructions to view the stream. Since no GUI libs are present, use the Quart web server. """ print("--- MAKO WEB STREAM ---") print("1. In your app.py, register a route that returns 'mako.get_jpeg()'") print("2. Open your browser at: http://localhost:5533/api/webcam/stream") print("-----------------------") if not self._streaming: print("Error: Streaming is False. Start it first.") return def display_loop(): self._show = True win_name = f"Mako Stream: {self.id}" cv2.namedWindow(win_name, cv2.WINDOW_AUTOSIZE) while self._show: if self._last_frame is not None: # Basic normalization for visualization fmax = self._last_frame.max() disp = (self._last_frame / (fmax if fmax > 0 else 1) * 255).astype(np.uint8) cv2.imshow(win_name, disp) if cv2.waitKey(1) & 0xFF == ord('q'): break cv2.destroyWindow(win_name) self._show = False thread = threading.Thread(target=display_loop, daemon=True) thread.start() print("Display thread started. Press 'q' in the window to close it.") def get_png_buffer(self): """Returns the current frame as an in-memory PNG for the Quart Response.""" data = self.image return self._to_png(data) if data is not None else None noctua/devices/mako_ok_with_opencv.py 0 → 100644 +177 −0 Original line number Diff line number Diff line #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Driver for Allied Vision Mako cameras using VmbPy. Zero-dependency version (no OpenCV, no Pillow). """ # System modules import threading import numpy as np import zlib import struct from datetime import datetime # Third-party modules from astropy.io import fits from vmbpy import VmbSystem, FrameStatus # Custom modules from .basedevice import BaseDevice class Mako(BaseDevice): """ Base wrapper class for Allied Vision Mako cameras with persistent connection. """ def __init__(self, url): super().__init__(url) self.id = url self.vmb = VmbSystem.get_instance() self._cam = None self._streaming = False def _check_connection(self): if self._cam is None: self.vmb.__enter__() try: self._cam = self.vmb.get_camera_by_id(self.id) self._cam.__enter__() # Setup GigE packet size try: stream = self._cam.get_streams()[0] stream.GVSPAdjustPacketSize.run() while not stream.GVSPAdjustPacketSize.is_done(): pass except: pass except Exception as e: self._cam = None raise ConnectionError(f"Connection failed: {e}") return self._cam def get(self, feature_name): cam = self._check_connection() return getattr(cam, feature_name) def put(self, feature_name, value): cam = self._check_connection() getattr(cam, feature_name).set(value) def __del__(self): if self._cam: self._cam.__exit__(None, None, None) if self.vmb: self.vmb.__exit__(None, None, None) class Webcam(Mako): """ High-level interface for Mako cameras using only provided stack. """ def __init__(self, url): super().__init__(url) self._last_frame = None self._lock = threading.Lock() self._range = [0, 255] def _frame_handler(self, cam, stream, frame): if frame.get_status() == FrameStatus.Complete: with self._lock: self._last_frame = frame.as_numpy_ndarray().copy() cam.queue_frame(frame) @property def streaming(self): return self._streaming @streaming.setter def streaming(self, b): cam = self._check_connection() if b == self._streaming: return if b: cam.start_streaming(handler=self._frame_handler, buffer_count=5) self._streaming = True else: cam.stop_streaming() self._streaming = False @property def image(self): """Get the latest raw numpy array.""" if self._streaming: with self._lock: return self._last_frame else: cam = self._check_connection() return cam.get_frame().as_numpy_ndarray() def _to_png(self, data): """Pure Python/Numpy PNG encoder (8-bit grayscale).""" # Rescale and clip rescaled = np.clip(data, self._range[0], self._range[1]) rescaled = ((rescaled - self._range[0]) / (self._range[1] - self._range[0]) * 255).astype(np.uint8) height, width = rescaled.shape # PNG signature png_bin = b'\x89PNG\r\n\x1a\n' # IHDR chunk ihdr = struct.pack("!2I5B", width, height, 8, 0, 0, 0, 0) png_bin += self._make_chunk(b'IHDR', ihdr) # IDAT chunk (zlib compressed pixels with filter type 0) flat_data = np.insert(rescaled, 0, 0, axis=1).tobytes() # Add null filter byte to each row png_bin += self._make_chunk(b'IDAT', zlib.compress(flat_data)) # IEND chunk png_bin += self._make_chunk(b'IEND', b'') return png_bin def _make_chunk(self, tag, data): """Helper for PNG chunks.""" chunk = tag + data return struct.pack("!I", len(data)) + chunk + struct.pack("!I", zlib.crc32(chunk) & 0xFFFFFFFF) def save_image(self, filename="temp.png"): """Save image using pure Python PNG encoder.""" data = self.image if data is not None: png_data = self._to_png(data) with open(filename, 'wb') as f: f.write(png_data) return filename def save_fits(self, filename="temp.fits"): """Save image as FITS using astropy.""" raw = self.image if raw is not None: # Reshape based on Mako G-125 geometry data = np.reshape(raw, (1024, 1280)) hdu = fits.PrimaryHDU(data) hdu.writeto(datetime.utcnow().strftime(filename), overwrite=True) return filename @property def autoexpose(self): return self.get("ExposureAuto").as_tuple() @autoexpose.setter def autoexpose(self, value): self.put("ExposureAuto", value) def show_stream(self): """ Instructions to view the stream. Since no GUI libs are present, use the Quart web server. """ print("--- MAKO WEB STREAM ---") print("1. In your app.py, register a route that returns 'mako.get_jpeg()'") print("2. Open your browser at: http://localhost:5533/api/webcam/stream") print("-----------------------") def get_png_buffer(self): """Returns the current frame as an in-memory PNG for the Quart Response.""" data = self.image return self._to_png(data) if data is not None else None Loading
noctua/devices/mako.py +78 −155 Original line number Diff line number Diff line Loading @@ -2,16 +2,18 @@ # -*- coding: utf-8 -*- """ Driver for Allied Vision Mako cameras using VmbPy with persistent context. Driver for Allied Vision Mako cameras using VmbPy. Zero-dependency version (no OpenCV, no Pillow). """ # System modules import threading import numpy as np import zlib import struct from datetime import datetime # Third-party modules import cv2 from astropy.io import fits from vmbpy import VmbSystem, FrameStatus Loading @@ -25,91 +27,39 @@ class Mako(BaseDevice): """ def __init__(self, url): """ Initialize the Mako camera parameters. Parameters ---------- url : str Camera IP address or Unique ID. """ super().__init__(url) self.id = url self.vmb = VmbSystem.get_instance() self._cam = None self._streaming = False def _check_connection(self): """ Internal method to manage persistent Vimba and Camera context. Returns ------- vmbpy.Camera The opened camera instance. """ if self._cam is None: self.vmb.__enter__() try: self._cam = self.vmb.get_camera_by_id(self.id) self._cam.__enter__() # Setup GigE packet size try: stream = self._cam.get_streams()[0] stream.GVSPAdjustPacketSize.run() while not stream.GVSPAdjustPacketSize.is_done(): pass except Exception: pass while not stream.GVSPAdjustPacketSize.is_done(): pass except: pass except Exception as e: self._cam = None raise ConnectionError(f"Could not connect to Mako camera {self.id}: {e}") raise ConnectionError(f"Connection failed: {e}") return self._cam def get(self, feature_name): """ Get a specific camera feature object. Parameters ---------- feature_name : str Vimba feature name. """ cam = self._check_connection() return getattr(cam, feature_name) return getattr(cam, feature_name).get() def put(self, feature_name, value): """ Set a specific camera feature value. Parameters ---------- feature_name : str Vimba feature name. value : any Value to set. """ cam = self._check_connection() feature = getattr(cam, feature_name) feature.set(value) getattr(cam, feature_name).set(value) def __del__(self): """ Ensure clean shutdown of camera and Vimba system. """ if self._cam: if self._streaming: self._cam.stop_streaming() self._cam.__exit__(None, None, None) if self.vmb: self.vmb.__exit__(None, None, None) Loading @@ -117,157 +67,130 @@ class Mako(BaseDevice): class Webcam(Mako): """ High-level interface for Mako cameras with streaming and capture support. High-level interface for Mako cameras using only provided stack. """ def __init__(self, url): """ Initialize the Webcam instance. """ super().__init__(url) self._last_frame = None self._lock = threading.Lock() self._range = [0, 255] self._show = False def _frame_handler(self, cam, stream, frame): """ Callback to update the latest frame during streaming. """ if frame.get_status() == FrameStatus.Complete: with self._lock: # Copy the frame data for persistent access self._last_frame = frame.as_numpy_ndarray().copy() cam.queue_frame(frame) @property def streaming(self): """ Get the current streaming status. """ return self._streaming @streaming.setter def streaming(self, b): """ Start or stop camera streaming. """ cam = self._check_connection() if b == self._streaming: return if b == self._streaming: return if b: cam.start_streaming(handler=self._frame_handler, buffer_count=5) self._streaming = True else: self._show = False cam.stop_streaming() self._streaming = False @property def image(self): """ Get the latest image. Works both during streaming or single capture. Returns ------- numpy.ndarray The image data. """ """Get the latest raw numpy array.""" if self._streaming: with self._lock: return self._last_frame else: cam = self._check_connection() frame = cam.get_frame() return frame.as_numpy_ndarray() return cam.get_frame().as_numpy_ndarray() def save_image(self, filename="temp.png"): """ Save the current frame as a PNG image using software stretching range. """ def _make_chunk(self, tag, data): """Helper for PNG chunks.""" chunk = tag + data return struct.pack("!I", len(data)) + chunk + struct.pack("!I", zlib.crc32(chunk) & 0xFFFFFFFF) data = self.image if data is not None: # Rescale data to 8-bit using the defined range [min, max] def _to_png(self, data): """Pure Python/Numpy PNG encoder (Supports Mono and RGB).""" # Rescale and clip based on self._range rescaled = np.clip(data, self._range[0], self._range[1]) rescaled = ((rescaled - self._range[0]) / (self._range[1] - self._range[0]) * 255).astype(np.uint8) cv2.imwrite(filename, rescaled) # --- FIX: Handle shape (H, W), (H, W, 1) or (H, W, 3) --- if rescaled.ndim == 3: height, width, channels = rescaled.shape # Color type: 0 = Grayscale, 2 = RGB color_type = 2 if channels == 3 else 0 else: height, width = rescaled.shape channels = 1 color_type = 0 # PNG signature png_bin = b'\x89PNG\r\n\x1a\n' # IHDR chunk: width, height, bitdepth, colortype, compression, filter, interlace ihdr = struct.pack("!2I5B", width, height, 8, color_type, 0, 0, 0) png_bin += self._make_chunk(b'IHDR', ihdr) # IDAT chunk # Reshape to ensure we are dealing with rows of bytes flat_rows = rescaled.reshape(height, width * channels) # PNG requires a 'filter byte' (0x00 for None) at the start of every scanline filtered_data = np.insert(flat_rows, 0, 0, axis=1).tobytes() png_bin += self._make_chunk(b'IDAT', zlib.compress(filtered_data)) # IEND chunk png_bin += self._make_chunk(b'IEND', b'') return png_bin def save_image(self, filename="temp.png"): """Save image using pure Python PNG encoder.""" data = self.image if data is not None: png_data = self._to_png(data) with open(filename, 'wb') as f: f.write(png_data) return filename def save_fits(self, filename="temp.fits"): """ Save the current frame as a FITS file. """ """Save image as FITS using astropy.""" raw = self.image if raw is not None: data = np.reshape(raw, (1024, 1280)) filename = datetime.utcnow().strftime(filename) hdu = fits.PrimaryHDU(data) hdu.writeto(filename, overwrite=True) if raw.ndim == 3 and raw.shape[2] == 1: data = raw.reshape(raw.shape[0], raw.shape[1]) else: data = raw hdu = fits.PrimaryHDU(data) hdu.writeto(datetime.utcnow().strftime(filename), overwrite=True) return filename @property def autoexpose(self): """ Get exposure auto status and available options. """ return self.get("ExposureAuto").as_tuple() @autoexpose.setter def autoexpose(self, value): """ Set exposure auto status ('Continuous', 'Off', 'Once'). """ self.put("ExposureAuto", value) def show_stream(self): """ Open an OpenCV window in a separate thread for visual monitoring. Instructions to view the stream. Since no GUI libs are present, use the Quart web server. """ print("--- MAKO WEB STREAM ---") print("1. In your app.py, register a route that returns 'mako.get_jpeg()'") print("2. Open your browser at: http://localhost:5533/api/webcam/stream") print("-----------------------") if not self._streaming: print("Error: Streaming is False. Start it first.") return def display_loop(): self._show = True win_name = f"Mako Stream: {self.id}" cv2.namedWindow(win_name, cv2.WINDOW_AUTOSIZE) while self._show: if self._last_frame is not None: # Basic normalization for visualization fmax = self._last_frame.max() disp = (self._last_frame / (fmax if fmax > 0 else 1) * 255).astype(np.uint8) cv2.imshow(win_name, disp) if cv2.waitKey(1) & 0xFF == ord('q'): break cv2.destroyWindow(win_name) self._show = False thread = threading.Thread(target=display_loop, daemon=True) thread.start() print("Display thread started. Press 'q' in the window to close it.") def get_png_buffer(self): """Returns the current frame as an in-memory PNG for the Quart Response.""" data = self.image return self._to_png(data) if data is not None else None
noctua/devices/mako_ok_with_opencv.py 0 → 100644 +177 −0 Original line number Diff line number Diff line #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Driver for Allied Vision Mako cameras using VmbPy. Zero-dependency version (no OpenCV, no Pillow). """ # System modules import threading import numpy as np import zlib import struct from datetime import datetime # Third-party modules from astropy.io import fits from vmbpy import VmbSystem, FrameStatus # Custom modules from .basedevice import BaseDevice class Mako(BaseDevice): """ Base wrapper class for Allied Vision Mako cameras with persistent connection. """ def __init__(self, url): super().__init__(url) self.id = url self.vmb = VmbSystem.get_instance() self._cam = None self._streaming = False def _check_connection(self): if self._cam is None: self.vmb.__enter__() try: self._cam = self.vmb.get_camera_by_id(self.id) self._cam.__enter__() # Setup GigE packet size try: stream = self._cam.get_streams()[0] stream.GVSPAdjustPacketSize.run() while not stream.GVSPAdjustPacketSize.is_done(): pass except: pass except Exception as e: self._cam = None raise ConnectionError(f"Connection failed: {e}") return self._cam def get(self, feature_name): cam = self._check_connection() return getattr(cam, feature_name) def put(self, feature_name, value): cam = self._check_connection() getattr(cam, feature_name).set(value) def __del__(self): if self._cam: self._cam.__exit__(None, None, None) if self.vmb: self.vmb.__exit__(None, None, None) class Webcam(Mako): """ High-level interface for Mako cameras using only provided stack. """ def __init__(self, url): super().__init__(url) self._last_frame = None self._lock = threading.Lock() self._range = [0, 255] def _frame_handler(self, cam, stream, frame): if frame.get_status() == FrameStatus.Complete: with self._lock: self._last_frame = frame.as_numpy_ndarray().copy() cam.queue_frame(frame) @property def streaming(self): return self._streaming @streaming.setter def streaming(self, b): cam = self._check_connection() if b == self._streaming: return if b: cam.start_streaming(handler=self._frame_handler, buffer_count=5) self._streaming = True else: cam.stop_streaming() self._streaming = False @property def image(self): """Get the latest raw numpy array.""" if self._streaming: with self._lock: return self._last_frame else: cam = self._check_connection() return cam.get_frame().as_numpy_ndarray() def _to_png(self, data): """Pure Python/Numpy PNG encoder (8-bit grayscale).""" # Rescale and clip rescaled = np.clip(data, self._range[0], self._range[1]) rescaled = ((rescaled - self._range[0]) / (self._range[1] - self._range[0]) * 255).astype(np.uint8) height, width = rescaled.shape # PNG signature png_bin = b'\x89PNG\r\n\x1a\n' # IHDR chunk ihdr = struct.pack("!2I5B", width, height, 8, 0, 0, 0, 0) png_bin += self._make_chunk(b'IHDR', ihdr) # IDAT chunk (zlib compressed pixels with filter type 0) flat_data = np.insert(rescaled, 0, 0, axis=1).tobytes() # Add null filter byte to each row png_bin += self._make_chunk(b'IDAT', zlib.compress(flat_data)) # IEND chunk png_bin += self._make_chunk(b'IEND', b'') return png_bin def _make_chunk(self, tag, data): """Helper for PNG chunks.""" chunk = tag + data return struct.pack("!I", len(data)) + chunk + struct.pack("!I", zlib.crc32(chunk) & 0xFFFFFFFF) def save_image(self, filename="temp.png"): """Save image using pure Python PNG encoder.""" data = self.image if data is not None: png_data = self._to_png(data) with open(filename, 'wb') as f: f.write(png_data) return filename def save_fits(self, filename="temp.fits"): """Save image as FITS using astropy.""" raw = self.image if raw is not None: # Reshape based on Mako G-125 geometry data = np.reshape(raw, (1024, 1280)) hdu = fits.PrimaryHDU(data) hdu.writeto(datetime.utcnow().strftime(filename), overwrite=True) return filename @property def autoexpose(self): return self.get("ExposureAuto").as_tuple() @autoexpose.setter def autoexpose(self, value): self.put("ExposureAuto", value) def show_stream(self): """ Instructions to view the stream. Since no GUI libs are present, use the Quart web server. """ print("--- MAKO WEB STREAM ---") print("1. In your app.py, register a route that returns 'mako.get_jpeg()'") print("2. Open your browser at: http://localhost:5533/api/webcam/stream") print("-----------------------") def get_png_buffer(self): """Returns the current frame as an in-memory PNG for the Quart Response.""" data = self.image return self._to_png(data) if data is not None else None