Loading noctua/devices/mako.py +119 −12 Original line number Diff line number Diff line Loading @@ -135,12 +135,13 @@ class Webcam(Mako): ihdr = struct.pack("!2I5B", width, height, 8, color_type, 0, 0, 0) png_bin += self._make_chunk(b'IHDR', ihdr) # IDAT chunk # Reshape to ensure we are dealing with rows of bytes flat_rows = rescaled.reshape(height, width * channels) # PNG requires a 'filter byte' (0x00 for None) at the start of every scanline filtered_data = np.insert(flat_rows, 0, 0, axis=1).tobytes() png_bin += self._make_chunk(b'IDAT', zlib.compress(filtered_data)) png_bin += self._make_chunk(b'IDAT', zlib.compress(filtered_data, level=1)) # IEND chunk png_bin += self._make_chunk(b'IEND', b'') Loading Loading @@ -180,17 +181,123 @@ class Webcam(Mako): self.put("ExposureAuto", value) def show_stream(self): def stream_opencv(self): """ Instructions to view the stream. Since no GUI libs are present, use the Quart web server. Open an OpenCV window in a separate thread. """ print("--- MAKO WEB STREAM ---") print("1. In your app.py, register a route that returns 'mako.get_jpeg()'") print("2. Open your browser at: http://localhost:5533/api/webcam/stream") print("-----------------------") import cv2 def get_png_buffer(self): """Returns the current frame as an in-memory PNG for the Quart Response.""" data = self.image return self._to_png(data) if data is not None else None if not self._streaming: print("Error: Streaming is False. Start it first.") return def loop(): self._show_cv = True while self._show_cv: if self._last_frame is not None: # Basic normalization for 8-bit display fmax = self._last_frame.max() disp = (self._last_frame / (fmax if fmax > 0 else 1) * 255).astype(np.uint8) cv2.imshow("Mako OpenCV Stream", disp) if cv2.waitKey(1) & 0xFF == ord('q'): break cv2.destroyAllWindows() self._show_cv = False threading.Thread(target=loop, daemon=True).start() print("OpenCV Window started. Press 'q' in the window to stop.") def stream(self, port=5534, active=True, fps=5): """ Manage a persistent web server for static images or live streaming. Parameters ---------- port : int The network port to use. active : bool True to start the server, False to stop it. """ from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler import time import threading parent = self # Reference to the Webcam instance # --- Shutdown server --- if not active: if hasattr(self, "_http_server") and self._http_server: print(f"Stopping HTTP Server on port {port}...") threading.Thread(target=self._http_server.shutdown).start() self._http_server.server_close() self._http_server = None else: print("HTTP Server is not running.") return # --- Startup server --- if hasattr(self, "_http_server") and self._http_server: print(f"HTTP Server is running on http://localhost:{port}") return class StreamHandler(BaseHTTPRequestHandler): def do_GET(self): if self.path == '/': self.send_response(200) self.send_header('Age', 0) self.send_header('Cache-Control', 'no-cache, private') self.send_header('Pragma', 'no-cache') self.send_header('Content-Type', 'multipart/x-mixed-replace; boundary=frame') self.end_headers() try: while parent._http_server: # 1. Get the current image # (static or from stream buffer) data = parent.image if data is not None: png_buffer = parent._to_png(data) frame_header = ( b"--frame\r\n" b"Content-Type: image/png\r\n" b"Content-Length: " + str(len(png_buffer)).encode() + b"\r\n" b"\r\n" ) self.wfile.write(frame_header) self.wfile.write(png_buffer) self.wfile.write(b"\r\n") self.wfile.flush() # 2. Hybrid logic: if parent._streaming: # Live mode: high frequency update time.sleep(1/fps) else: # Static mode: wait until streaming is enabled # or the server is stopped while not parent._streaming and parent._http_server: time.sleep(0.5) except (ConnectionResetError, BrokenPipeError): # Client closed the browser/VLC pass except Exception as e: print(f"Stream error: {e}") def server_thread(): try: parent._http_server = ThreadingHTTPServer(('0.0.0.0', port), StreamHandler) parent._http_server.allow_reuse_address = True print(f"HTTP Server started at http://localhost:{port}") parent._http_server.serve_forever() except Exception as e: print(f"Could not start server: {e}") parent._http_server = None # Start the server in a background thread threading.Thread(target=server_thread, daemon=True).start() Loading
noctua/devices/mako.py +119 −12 Original line number Diff line number Diff line Loading @@ -135,12 +135,13 @@ class Webcam(Mako): ihdr = struct.pack("!2I5B", width, height, 8, color_type, 0, 0, 0) png_bin += self._make_chunk(b'IHDR', ihdr) # IDAT chunk # Reshape to ensure we are dealing with rows of bytes flat_rows = rescaled.reshape(height, width * channels) # PNG requires a 'filter byte' (0x00 for None) at the start of every scanline filtered_data = np.insert(flat_rows, 0, 0, axis=1).tobytes() png_bin += self._make_chunk(b'IDAT', zlib.compress(filtered_data)) png_bin += self._make_chunk(b'IDAT', zlib.compress(filtered_data, level=1)) # IEND chunk png_bin += self._make_chunk(b'IEND', b'') Loading Loading @@ -180,17 +181,123 @@ class Webcam(Mako): self.put("ExposureAuto", value) def show_stream(self): def stream_opencv(self): """ Instructions to view the stream. Since no GUI libs are present, use the Quart web server. Open an OpenCV window in a separate thread. """ print("--- MAKO WEB STREAM ---") print("1. In your app.py, register a route that returns 'mako.get_jpeg()'") print("2. Open your browser at: http://localhost:5533/api/webcam/stream") print("-----------------------") import cv2 def get_png_buffer(self): """Returns the current frame as an in-memory PNG for the Quart Response.""" data = self.image return self._to_png(data) if data is not None else None if not self._streaming: print("Error: Streaming is False. Start it first.") return def loop(): self._show_cv = True while self._show_cv: if self._last_frame is not None: # Basic normalization for 8-bit display fmax = self._last_frame.max() disp = (self._last_frame / (fmax if fmax > 0 else 1) * 255).astype(np.uint8) cv2.imshow("Mako OpenCV Stream", disp) if cv2.waitKey(1) & 0xFF == ord('q'): break cv2.destroyAllWindows() self._show_cv = False threading.Thread(target=loop, daemon=True).start() print("OpenCV Window started. Press 'q' in the window to stop.") def stream(self, port=5534, active=True, fps=5): """ Manage a persistent web server for static images or live streaming. Parameters ---------- port : int The network port to use. active : bool True to start the server, False to stop it. """ from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler import time import threading parent = self # Reference to the Webcam instance # --- Shutdown server --- if not active: if hasattr(self, "_http_server") and self._http_server: print(f"Stopping HTTP Server on port {port}...") threading.Thread(target=self._http_server.shutdown).start() self._http_server.server_close() self._http_server = None else: print("HTTP Server is not running.") return # --- Startup server --- if hasattr(self, "_http_server") and self._http_server: print(f"HTTP Server is running on http://localhost:{port}") return class StreamHandler(BaseHTTPRequestHandler): def do_GET(self): if self.path == '/': self.send_response(200) self.send_header('Age', 0) self.send_header('Cache-Control', 'no-cache, private') self.send_header('Pragma', 'no-cache') self.send_header('Content-Type', 'multipart/x-mixed-replace; boundary=frame') self.end_headers() try: while parent._http_server: # 1. Get the current image # (static or from stream buffer) data = parent.image if data is not None: png_buffer = parent._to_png(data) frame_header = ( b"--frame\r\n" b"Content-Type: image/png\r\n" b"Content-Length: " + str(len(png_buffer)).encode() + b"\r\n" b"\r\n" ) self.wfile.write(frame_header) self.wfile.write(png_buffer) self.wfile.write(b"\r\n") self.wfile.flush() # 2. Hybrid logic: if parent._streaming: # Live mode: high frequency update time.sleep(1/fps) else: # Static mode: wait until streaming is enabled # or the server is stopped while not parent._streaming and parent._http_server: time.sleep(0.5) except (ConnectionResetError, BrokenPipeError): # Client closed the browser/VLC pass except Exception as e: print(f"Stream error: {e}") def server_thread(): try: parent._http_server = ThreadingHTTPServer(('0.0.0.0', port), StreamHandler) parent._http_server.allow_reuse_address = True print(f"HTTP Server started at http://localhost:{port}") parent._http_server.serve_forever() except Exception as e: print(f"Could not start server: {e}") parent._http_server = None # Start the server in a background thread threading.Thread(target=server_thread, daemon=True).start()