Loading noctua/app.py +18 −0 Original line number Diff line number Diff line Loading @@ -16,6 +16,24 @@ register_error_handlers(app) # API blueprint in api/__init__.py app.register_blueprint(api_blueprint, url_prefix='/api') # from quart import Response # from noctua import devices # import asyncio # @app.get("/api/webcam/stream") # async def stream(): # print("##################################################") # async def generate(): # while True: # frame = devices.teccam.get_png_buffer() # if frame: # yield (b'--frame\r\n' # b'Content-Type: image/png\r\n\r\n' + frame + b'\r\n') # await asyncio.sleep(0.1) # 10 FPS # return Response(generate(), mimetype='multipart/x-mixed-replace; boundary=frame') def run(): """Server run using uvicorn""" Loading noctua/devices/mako.py +98 −183 Original line number Diff line number Diff line Loading @@ -3,14 +3,12 @@ """ Driver for Allied Vision Mako cameras using VmbPy. Zero-dependency version (no OpenCV, no Pillow). Utilizes external image and streaming utilities. """ # System modules import threading import numpy as np import zlib import struct from datetime import datetime # Third-party modules Loading @@ -19,14 +17,20 @@ from vmbpy import VmbSystem, FrameStatus # Custom modules from .basedevice import BaseDevice from ..utils.image import array_to_png, Streamer from ..utils.logger import log class Mako(BaseDevice): """ Base wrapper class for Allied Vision Mako cameras with persistent connection. Base wrapper class for Allied Vision Mako cameras with persistent connection. """ def __init__(self, url): """ Constructor """ super().__init__(url) self.id = url self.vmb = VmbSystem.get_instance() Loading @@ -34,6 +38,10 @@ class Mako(BaseDevice): self._streaming = False def _check_connection(self): """ Internal method to manage persistent Vimba and Camera context. """ if self._cam is None: self.vmb.__enter__() try: Loading @@ -43,23 +51,33 @@ class Mako(BaseDevice): try: stream = self._cam.get_streams()[0] stream.GVSPAdjustPacketSize.run() while not stream.GVSPAdjustPacketSize.is_done(): pass except: pass while not stream.GVSPAdjustPacketSize.is_done(): pass except Exception: pass except Exception as e: self._cam = None raise ConnectionError(f"Connection failed: {e}") return self._cam def get(self, feature_name): """Return the value of a Vimba feature.""" cam = self._check_connection() return getattr(cam, feature_name).get() def put(self, feature_name, value): """Set the value of a Vimba feature.""" cam = self._check_connection() getattr(cam, feature_name).set(value) def __del__(self): """Clean shutdown of camera and Vimba system.""" if self._cam: if self._streaming: self._cam.stop_streaming() self._cam.__exit__(None, None, None) if self.vmb: self.vmb.__exit__(None, None, None) Loading @@ -67,7 +85,8 @@ class Mako(BaseDevice): class Webcam(Mako): """ High-level interface for Mako cameras using only provided stack. High-level interface for Mako cameras with streaming and capture support. """ def __init__(self, url): Loading @@ -75,21 +94,29 @@ class Webcam(Mako): self._last_frame = None self._lock = threading.Lock() self._range = [0, 255] self._streamer = None def _frame_handler(self, cam, stream, frame): """Callback to handle frames during streaming.""" if frame.get_status() == FrameStatus.Complete: with self._lock: self._last_frame = frame.as_numpy_ndarray().copy() cam.queue_frame(frame) @property def streaming(self): """Get the current streaming status.""" return self._streaming @streaming.setter def streaming(self, b): """Start or stop camera streaming.""" if b == self._streaming: return cam = self._check_connection() if b == self._streaming: return if b: cam.start_streaming(handler=self._frame_handler, buffer_count=5) self._streaming = True Loading @@ -97,85 +124,43 @@ class Webcam(Mako): cam.stop_streaming() self._streaming = False def _make_chunk(self, tag, data): """Helper for PNG chunks.""" chunk = tag + data return struct.pack("!I", len(data)) + chunk + struct.pack("!I", zlib.crc32(chunk) & 0xFFFFFFFF) def _to_png(self, data): """Pure Python/Numpy PNG encoder (Supports Mono and RGB).""" # Rescale and clip based on self._range rescaled = np.clip(data, self._range[0], self._range[1]) rescaled = ((rescaled - self._range[0]) / (self._range[1] - self._range[0]) * 255).astype(np.uint8) # --- FIX: Handle shape (H, W), (H, W, 1) or (H, W, 3) --- if rescaled.ndim == 3: height, width, channels = rescaled.shape # Color type: 0 = Grayscale, 2 = RGB color_type = 2 if channels == 3 else 0 else: height, width = rescaled.shape channels = 1 color_type = 0 # PNG signature png_bin = b'\x89PNG\r\n\x1a\n' # IHDR chunk: width, height, bitdepth, colortype, compression, filter, interlace ihdr = struct.pack("!2I5B", width, height, 8, color_type, 0, 0, 0) png_bin += self._make_chunk(b'IHDR', ihdr) # IDAT chunk # Reshape to ensure we are dealing with rows of bytes flat_rows = rescaled.reshape(height, width * channels) # PNG requires a 'filter byte' (0x00 for None) at the start of every scanline filtered_data = np.insert(flat_rows, 0, 0, axis=1).tobytes() png_bin += self._make_chunk(b'IDAT', zlib.compress(filtered_data, level=1)) # IEND chunk png_bin += self._make_chunk(b'IEND', b'') return png_bin @property def ndarray(self): def matrix(self): """ Get the latest image as a numpy array. Replaces the old 'image' property for internal use. Works during streaming or via single capture. """ if self._streaming: with self._lock: return self._last_frame else: cam = self._check_connection() # Returns the raw array from a single grab return cam.get_frame().as_numpy_ndarray() @property def image(self): """ Get the current frame as a raw binary PNG. Identical behavior to devices/ipcam.py (binary content). """ data = self.ndarray """Get the current frame as a raw binary PNG.""" data = self.matrix if data is not None: return self._to_png(data) return array_to_png(data, vmin=self._range[0], vmax=self._range[1]) return None def save_image(self, filename="temp.png"): """Save image as PNG using the binary content.""" """Save the current frame as a PNG file.""" png_data = self.image if png_data: with open(filename, 'wb') as f: f.write(png_data) return filename def save_fits(self, filename="temp.fits"): """Save image as FITS using the ndarray.""" raw = self.ndarray """Save the current frame as a FITS file.""" raw = self.matrix if raw is not None: # Automatic reshape to 2D if needed # Squeeze or reshape to 2D if it's a mono image with a channel dim if raw.ndim == 3 and raw.shape[2] == 1: data = raw.reshape(raw.shape[0], raw.shape[1]) else: Loading @@ -188,130 +173,60 @@ class Webcam(Mako): @property def autoexpose(self): return self.get("ExposureAuto").as_tuple() """Get exposure auto status and options as a tuple.""" # Accessing the feature object directly to use as_tuple() cam = self._check_connection() return cam.ExposureAuto.as_tuple() @autoexpose.setter def autoexpose(self, value): """Set exposure auto status ('Continuous', 'Off', 'Once').""" self.put("ExposureAuto", value) def stream_opencv(self): """ Open an OpenCV window in a separate thread. """ import cv2 if not self._streaming: print("Error: Streaming is False. Start it first.") return def loop(): self._show_cv = True while self._show_cv: if self._last_frame is not None: # Basic normalization for 8-bit display fmax = self._last_frame.max() disp = (self._last_frame / (fmax if fmax > 0 else 1) * 255).astype(np.uint8) cv2.imshow("Mako OpenCV Stream", disp) if cv2.waitKey(1) & 0xFF == ord('q'): break cv2.destroyAllWindows() self._show_cv = False threading.Thread(target=loop, daemon=True).start() print("OpenCV Window started. Press 'q' in the window to stop.") def stream(self, port=5534, active=True, fps=5): """ Manage a persistent web server for static images or live streaming. Parameters ---------- port : int The network port to use. active : bool True to start the server, False to stop it. """ from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler import time import threading parent = self # Reference to the Webcam instance # --- Shutdown server --- if not active: if hasattr(self, "_http_server") and self._http_server: print(f"Stopping HTTP Server on port {port}...") threading.Thread(target=self._http_server.shutdown).start() self._http_server.server_close() self._http_server = None else: print("HTTP Server is not running.") return # --- Startup server --- if hasattr(self, "_http_server") and self._http_server: print(f"HTTP Server is running on http://localhost:{port}") return class StreamHandler(BaseHTTPRequestHandler): def do_GET(self): if self.path == '/': self.send_response(200) self.send_header('Age', 0) self.send_header('Cache-Control', 'no-cache, private') self.send_header('Pragma', 'no-cache') self.send_header('Content-Type', 'multipart/x-mixed-replace; boundary=frame') self.end_headers() try: while parent._http_server: # 1. Get the current image # (static or from stream buffer) data = parent.ndarray if data is not None: png_buffer = parent._to_png(data) frame_header = ( b"--frame\r\n" b"Content-Type: image/png\r\n" b"Content-Length: " + str(len(png_buffer)).encode() + b"\r\n" b"\r\n" def stream(self, active=True, host='0.0.0.0', port=5534, fps=2): """Manage the HTTP stream using the Streamer utility.""" if active: if not self._streamer: self._streamer = Streamer( host=host, port=port, image_provider=lambda: self.matrix, status_provider=lambda: self.streaming, fps=fps ) self.wfile.write(frame_header) self.wfile.write(png_buffer) self.wfile.write(b"\r\n") self.wfile.flush() # 2. Hybrid logic: if parent._streaming: # Live mode: high frequency update time.sleep(1/fps) else: # Static mode: wait until streaming is enabled # or the server is stopped while not parent._streaming and parent._http_server: time.sleep(0.5) self._streamer.start() log.info(f"Started server http://{host}:{port} fps={fps}") except (ConnectionResetError, BrokenPipeError): # Client closed the browser/VLC pass except Exception as e: print(f"Stream error: {e}") def server_thread(): try: parent._http_server = ThreadingHTTPServer(('0.0.0.0', port), StreamHandler) parent._http_server.allow_reuse_address = True print(f"HTTP Server started at http://localhost:{port}") parent._http_server.serve_forever() except Exception as e: print(f"Could not start server: {e}") parent._http_server = None # Start the server in a background thread threading.Thread(target=server_thread, daemon=True).start() else: if self._streamer: self._streamer.stop() log.info(f"Stopped server http://{host}:{port}") self._streamer = None # def stream_opencv(self): # """ # Open an OpenCV window in a separate thread for local monitoring. # """ # import cv2 # if not self._streaming: # print("Error: Streaming is False. Start it first.") # return # def loop(): # win_name = f"Mako Stream: {self.id}" # while self._streaming: # if self._last_frame is not None: # fmax = self._last_frame.max() # disp = (self._last_frame / (fmax if fmax > 0 else 1) * 255).astype(np.uint8) # cv2.imshow(win_name, disp) # if cv2.waitKey(1) & 0xFF == ord('q'): # break # cv2.destroyAllWindows() # threading.Thread(target=loop, daemon=True).start() # print("OpenCV Window started. Press 'q' in the window to close.") noctua/utils/image.py 0 → 100644 +154 −0 Original line number Diff line number Diff line #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Utilities for image encoding and HTTP streaming. """ # System modules import zlib import struct import time import threading from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler # Third-party modules import numpy as np def array_to_png(data, vmin=0, vmax=255): """ Convert a numpy array to a pure Python PNG binary buffer. """ rescaled = np.clip(data, vmin, vmax) rescaled = ((rescaled - vmin) / (vmax - vmin) * 255).astype(np.uint8) if rescaled.ndim == 3: height, width, channels = rescaled.shape color_type = 2 if channels == 3 else 0 else: height, width = rescaled.shape channels, color_type = 1, 0 def make_chunk(tag, data): chunk = tag + data return struct.pack("!I", len(data)) + chunk + struct.pack("!I", zlib.crc32(chunk) & 0xFFFFFFFF) png_bin = b'\x89PNG\r\n\x1a\n' ihdr = struct.pack("!2I5B", width, height, 8, color_type, 0, 0, 0) png_bin += make_chunk(b'IHDR', ihdr) flat_rows = rescaled.reshape(height, width * channels) filtered_data = np.insert(flat_rows, 0, 0, axis=1).tobytes() png_bin += make_chunk(b'IDAT', zlib.compress(filtered_data, level=1)) png_bin += make_chunk(b'IEND', b'') return png_bin class Streamer: """ HTTP server that broadcasts images provided by a callback function. """ def __init__(self, port, image_provider, host='0.0.0.0', status_provider=None, fps=2): """ Initialize the streamer. Parameters ---------- port : int The network port for the HTTP server. image_provider : callable A function that returns either a numpy ndarray or binary PNG bytes. status_provider : callable, optional Returns True for live stream, False for static. Defaults to always True. fps : int, optional Frames per second. Default is 2. """ self.port = port self.host = host self.image_provider = image_provider self.status_provider = status_provider if status_provider else lambda: True self.fps = fps self.server = None self._thread = None def start(self): """Start the HTTP server in a background thread.""" if self.server: return parent = self class StreamHandler(BaseHTTPRequestHandler): def do_GET(self): if self.path == '/': self.send_response(200) self.send_header('Age', 0) self.send_header('Cache-Control', 'no-cache, private') self.send_header('Pragma', 'no-cache') self.send_header('Content-Type', 'multipart/x-mixed-replace; boundary=frame') self.end_headers() try: while parent.server: start_time = time.time() # 1. Get image from provider img = parent.image_provider() if img is None: time.sleep(0.5) continue # 2. Auto-encode if provider returns a numpy array if isinstance(img, np.ndarray): img_buffer = array_to_png(img) else: img_buffer = img # 3. Write frame frame_header = ( b"--frame\r\n" b"Content-Type: image/png\r\n" b"Content-Length: " + str(len(img_buffer)).encode() + b"\r\n" b"\r\n" ) self.wfile.write(frame_header) self.wfile.write(img_buffer) self.wfile.write(b"\r\n") self.wfile.flush() # 4. FPS Timing logic if parent.status_provider(): elapsed = time.time() - start_time sleep_time = max(0, (1 / parent.fps) - elapsed) time.sleep(sleep_time) else: # Wait until status changes or server stops while not parent.status_provider() and parent.server: time.sleep(0.5) except (ConnectionResetError, BrokenPipeError): pass def log_message(self, format, *args): return def run_server(): try: parent.server = ThreadingHTTPServer((parent.host, parent.port), StreamHandler) parent.server.allow_reuse_address = True parent.server.serve_forever() except: parent.server = None self._thread = threading.Thread(target=run_server, daemon=True) self._thread.start() def stop(self): """Stop the HTTP server and free the port.""" if self.server: threading.Thread(target=self.server.shutdown).start() self.server.server_close() self.server = None Loading
noctua/app.py +18 −0 Original line number Diff line number Diff line Loading @@ -16,6 +16,24 @@ register_error_handlers(app) # API blueprint in api/__init__.py app.register_blueprint(api_blueprint, url_prefix='/api') # from quart import Response # from noctua import devices # import asyncio # @app.get("/api/webcam/stream") # async def stream(): # print("##################################################") # async def generate(): # while True: # frame = devices.teccam.get_png_buffer() # if frame: # yield (b'--frame\r\n' # b'Content-Type: image/png\r\n\r\n' + frame + b'\r\n') # await asyncio.sleep(0.1) # 10 FPS # return Response(generate(), mimetype='multipart/x-mixed-replace; boundary=frame') def run(): """Server run using uvicorn""" Loading
noctua/devices/mako.py +98 −183 Original line number Diff line number Diff line Loading @@ -3,14 +3,12 @@ """ Driver for Allied Vision Mako cameras using VmbPy. Zero-dependency version (no OpenCV, no Pillow). Utilizes external image and streaming utilities. """ # System modules import threading import numpy as np import zlib import struct from datetime import datetime # Third-party modules Loading @@ -19,14 +17,20 @@ from vmbpy import VmbSystem, FrameStatus # Custom modules from .basedevice import BaseDevice from ..utils.image import array_to_png, Streamer from ..utils.logger import log class Mako(BaseDevice): """ Base wrapper class for Allied Vision Mako cameras with persistent connection. Base wrapper class for Allied Vision Mako cameras with persistent connection. """ def __init__(self, url): """ Constructor """ super().__init__(url) self.id = url self.vmb = VmbSystem.get_instance() Loading @@ -34,6 +38,10 @@ class Mako(BaseDevice): self._streaming = False def _check_connection(self): """ Internal method to manage persistent Vimba and Camera context. """ if self._cam is None: self.vmb.__enter__() try: Loading @@ -43,23 +51,33 @@ class Mako(BaseDevice): try: stream = self._cam.get_streams()[0] stream.GVSPAdjustPacketSize.run() while not stream.GVSPAdjustPacketSize.is_done(): pass except: pass while not stream.GVSPAdjustPacketSize.is_done(): pass except Exception: pass except Exception as e: self._cam = None raise ConnectionError(f"Connection failed: {e}") return self._cam def get(self, feature_name): """Return the value of a Vimba feature.""" cam = self._check_connection() return getattr(cam, feature_name).get() def put(self, feature_name, value): """Set the value of a Vimba feature.""" cam = self._check_connection() getattr(cam, feature_name).set(value) def __del__(self): """Clean shutdown of camera and Vimba system.""" if self._cam: if self._streaming: self._cam.stop_streaming() self._cam.__exit__(None, None, None) if self.vmb: self.vmb.__exit__(None, None, None) Loading @@ -67,7 +85,8 @@ class Mako(BaseDevice): class Webcam(Mako): """ High-level interface for Mako cameras using only provided stack. High-level interface for Mako cameras with streaming and capture support. """ def __init__(self, url): Loading @@ -75,21 +94,29 @@ class Webcam(Mako): self._last_frame = None self._lock = threading.Lock() self._range = [0, 255] self._streamer = None def _frame_handler(self, cam, stream, frame): """Callback to handle frames during streaming.""" if frame.get_status() == FrameStatus.Complete: with self._lock: self._last_frame = frame.as_numpy_ndarray().copy() cam.queue_frame(frame) @property def streaming(self): """Get the current streaming status.""" return self._streaming @streaming.setter def streaming(self, b): """Start or stop camera streaming.""" if b == self._streaming: return cam = self._check_connection() if b == self._streaming: return if b: cam.start_streaming(handler=self._frame_handler, buffer_count=5) self._streaming = True Loading @@ -97,85 +124,43 @@ class Webcam(Mako): cam.stop_streaming() self._streaming = False def _make_chunk(self, tag, data): """Helper for PNG chunks.""" chunk = tag + data return struct.pack("!I", len(data)) + chunk + struct.pack("!I", zlib.crc32(chunk) & 0xFFFFFFFF) def _to_png(self, data): """Pure Python/Numpy PNG encoder (Supports Mono and RGB).""" # Rescale and clip based on self._range rescaled = np.clip(data, self._range[0], self._range[1]) rescaled = ((rescaled - self._range[0]) / (self._range[1] - self._range[0]) * 255).astype(np.uint8) # --- FIX: Handle shape (H, W), (H, W, 1) or (H, W, 3) --- if rescaled.ndim == 3: height, width, channels = rescaled.shape # Color type: 0 = Grayscale, 2 = RGB color_type = 2 if channels == 3 else 0 else: height, width = rescaled.shape channels = 1 color_type = 0 # PNG signature png_bin = b'\x89PNG\r\n\x1a\n' # IHDR chunk: width, height, bitdepth, colortype, compression, filter, interlace ihdr = struct.pack("!2I5B", width, height, 8, color_type, 0, 0, 0) png_bin += self._make_chunk(b'IHDR', ihdr) # IDAT chunk # Reshape to ensure we are dealing with rows of bytes flat_rows = rescaled.reshape(height, width * channels) # PNG requires a 'filter byte' (0x00 for None) at the start of every scanline filtered_data = np.insert(flat_rows, 0, 0, axis=1).tobytes() png_bin += self._make_chunk(b'IDAT', zlib.compress(filtered_data, level=1)) # IEND chunk png_bin += self._make_chunk(b'IEND', b'') return png_bin @property def ndarray(self): def matrix(self): """ Get the latest image as a numpy array. Replaces the old 'image' property for internal use. Works during streaming or via single capture. """ if self._streaming: with self._lock: return self._last_frame else: cam = self._check_connection() # Returns the raw array from a single grab return cam.get_frame().as_numpy_ndarray() @property def image(self): """ Get the current frame as a raw binary PNG. Identical behavior to devices/ipcam.py (binary content). """ data = self.ndarray """Get the current frame as a raw binary PNG.""" data = self.matrix if data is not None: return self._to_png(data) return array_to_png(data, vmin=self._range[0], vmax=self._range[1]) return None def save_image(self, filename="temp.png"): """Save image as PNG using the binary content.""" """Save the current frame as a PNG file.""" png_data = self.image if png_data: with open(filename, 'wb') as f: f.write(png_data) return filename def save_fits(self, filename="temp.fits"): """Save image as FITS using the ndarray.""" raw = self.ndarray """Save the current frame as a FITS file.""" raw = self.matrix if raw is not None: # Automatic reshape to 2D if needed # Squeeze or reshape to 2D if it's a mono image with a channel dim if raw.ndim == 3 and raw.shape[2] == 1: data = raw.reshape(raw.shape[0], raw.shape[1]) else: Loading @@ -188,130 +173,60 @@ class Webcam(Mako): @property def autoexpose(self): return self.get("ExposureAuto").as_tuple() """Get exposure auto status and options as a tuple.""" # Accessing the feature object directly to use as_tuple() cam = self._check_connection() return cam.ExposureAuto.as_tuple() @autoexpose.setter def autoexpose(self, value): """Set exposure auto status ('Continuous', 'Off', 'Once').""" self.put("ExposureAuto", value) def stream_opencv(self): """ Open an OpenCV window in a separate thread. """ import cv2 if not self._streaming: print("Error: Streaming is False. Start it first.") return def loop(): self._show_cv = True while self._show_cv: if self._last_frame is not None: # Basic normalization for 8-bit display fmax = self._last_frame.max() disp = (self._last_frame / (fmax if fmax > 0 else 1) * 255).astype(np.uint8) cv2.imshow("Mako OpenCV Stream", disp) if cv2.waitKey(1) & 0xFF == ord('q'): break cv2.destroyAllWindows() self._show_cv = False threading.Thread(target=loop, daemon=True).start() print("OpenCV Window started. Press 'q' in the window to stop.") def stream(self, port=5534, active=True, fps=5): """ Manage a persistent web server for static images or live streaming. Parameters ---------- port : int The network port to use. active : bool True to start the server, False to stop it. """ from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler import time import threading parent = self # Reference to the Webcam instance # --- Shutdown server --- if not active: if hasattr(self, "_http_server") and self._http_server: print(f"Stopping HTTP Server on port {port}...") threading.Thread(target=self._http_server.shutdown).start() self._http_server.server_close() self._http_server = None else: print("HTTP Server is not running.") return # --- Startup server --- if hasattr(self, "_http_server") and self._http_server: print(f"HTTP Server is running on http://localhost:{port}") return class StreamHandler(BaseHTTPRequestHandler): def do_GET(self): if self.path == '/': self.send_response(200) self.send_header('Age', 0) self.send_header('Cache-Control', 'no-cache, private') self.send_header('Pragma', 'no-cache') self.send_header('Content-Type', 'multipart/x-mixed-replace; boundary=frame') self.end_headers() try: while parent._http_server: # 1. Get the current image # (static or from stream buffer) data = parent.ndarray if data is not None: png_buffer = parent._to_png(data) frame_header = ( b"--frame\r\n" b"Content-Type: image/png\r\n" b"Content-Length: " + str(len(png_buffer)).encode() + b"\r\n" b"\r\n" def stream(self, active=True, host='0.0.0.0', port=5534, fps=2): """Manage the HTTP stream using the Streamer utility.""" if active: if not self._streamer: self._streamer = Streamer( host=host, port=port, image_provider=lambda: self.matrix, status_provider=lambda: self.streaming, fps=fps ) self.wfile.write(frame_header) self.wfile.write(png_buffer) self.wfile.write(b"\r\n") self.wfile.flush() # 2. Hybrid logic: if parent._streaming: # Live mode: high frequency update time.sleep(1/fps) else: # Static mode: wait until streaming is enabled # or the server is stopped while not parent._streaming and parent._http_server: time.sleep(0.5) self._streamer.start() log.info(f"Started server http://{host}:{port} fps={fps}") except (ConnectionResetError, BrokenPipeError): # Client closed the browser/VLC pass except Exception as e: print(f"Stream error: {e}") def server_thread(): try: parent._http_server = ThreadingHTTPServer(('0.0.0.0', port), StreamHandler) parent._http_server.allow_reuse_address = True print(f"HTTP Server started at http://localhost:{port}") parent._http_server.serve_forever() except Exception as e: print(f"Could not start server: {e}") parent._http_server = None # Start the server in a background thread threading.Thread(target=server_thread, daemon=True).start() else: if self._streamer: self._streamer.stop() log.info(f"Stopped server http://{host}:{port}") self._streamer = None # def stream_opencv(self): # """ # Open an OpenCV window in a separate thread for local monitoring. # """ # import cv2 # if not self._streaming: # print("Error: Streaming is False. Start it first.") # return # def loop(): # win_name = f"Mako Stream: {self.id}" # while self._streaming: # if self._last_frame is not None: # fmax = self._last_frame.max() # disp = (self._last_frame / (fmax if fmax > 0 else 1) * 255).astype(np.uint8) # cv2.imshow(win_name, disp) # if cv2.waitKey(1) & 0xFF == ord('q'): # break # cv2.destroyAllWindows() # threading.Thread(target=loop, daemon=True).start() # print("OpenCV Window started. Press 'q' in the window to close.")
noctua/utils/image.py 0 → 100644 +154 −0 Original line number Diff line number Diff line #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Utilities for image encoding and HTTP streaming. """ # System modules import zlib import struct import time import threading from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler # Third-party modules import numpy as np def array_to_png(data, vmin=0, vmax=255): """ Convert a numpy array to a pure Python PNG binary buffer. """ rescaled = np.clip(data, vmin, vmax) rescaled = ((rescaled - vmin) / (vmax - vmin) * 255).astype(np.uint8) if rescaled.ndim == 3: height, width, channels = rescaled.shape color_type = 2 if channels == 3 else 0 else: height, width = rescaled.shape channels, color_type = 1, 0 def make_chunk(tag, data): chunk = tag + data return struct.pack("!I", len(data)) + chunk + struct.pack("!I", zlib.crc32(chunk) & 0xFFFFFFFF) png_bin = b'\x89PNG\r\n\x1a\n' ihdr = struct.pack("!2I5B", width, height, 8, color_type, 0, 0, 0) png_bin += make_chunk(b'IHDR', ihdr) flat_rows = rescaled.reshape(height, width * channels) filtered_data = np.insert(flat_rows, 0, 0, axis=1).tobytes() png_bin += make_chunk(b'IDAT', zlib.compress(filtered_data, level=1)) png_bin += make_chunk(b'IEND', b'') return png_bin class Streamer: """ HTTP server that broadcasts images provided by a callback function. """ def __init__(self, port, image_provider, host='0.0.0.0', status_provider=None, fps=2): """ Initialize the streamer. Parameters ---------- port : int The network port for the HTTP server. image_provider : callable A function that returns either a numpy ndarray or binary PNG bytes. status_provider : callable, optional Returns True for live stream, False for static. Defaults to always True. fps : int, optional Frames per second. Default is 2. """ self.port = port self.host = host self.image_provider = image_provider self.status_provider = status_provider if status_provider else lambda: True self.fps = fps self.server = None self._thread = None def start(self): """Start the HTTP server in a background thread.""" if self.server: return parent = self class StreamHandler(BaseHTTPRequestHandler): def do_GET(self): if self.path == '/': self.send_response(200) self.send_header('Age', 0) self.send_header('Cache-Control', 'no-cache, private') self.send_header('Pragma', 'no-cache') self.send_header('Content-Type', 'multipart/x-mixed-replace; boundary=frame') self.end_headers() try: while parent.server: start_time = time.time() # 1. Get image from provider img = parent.image_provider() if img is None: time.sleep(0.5) continue # 2. Auto-encode if provider returns a numpy array if isinstance(img, np.ndarray): img_buffer = array_to_png(img) else: img_buffer = img # 3. Write frame frame_header = ( b"--frame\r\n" b"Content-Type: image/png\r\n" b"Content-Length: " + str(len(img_buffer)).encode() + b"\r\n" b"\r\n" ) self.wfile.write(frame_header) self.wfile.write(img_buffer) self.wfile.write(b"\r\n") self.wfile.flush() # 4. FPS Timing logic if parent.status_provider(): elapsed = time.time() - start_time sleep_time = max(0, (1 / parent.fps) - elapsed) time.sleep(sleep_time) else: # Wait until status changes or server stops while not parent.status_provider() and parent.server: time.sleep(0.5) except (ConnectionResetError, BrokenPipeError): pass def log_message(self, format, *args): return def run_server(): try: parent.server = ThreadingHTTPServer((parent.host, parent.port), StreamHandler) parent.server.allow_reuse_address = True parent.server.serve_forever() except: parent.server = None self._thread = threading.Thread(target=run_server, daemon=True) self._thread.start() def stop(self): """Stop the HTTP server and free the port.""" if self.server: threading.Thread(target=self.server.shutdown).start() self.server.server_close() self.server = None