Commit 938fff41 authored by vertighel's avatar vertighel
Browse files

Refactor: image pipeline to utils, normalize device.matrix



- utils/image.py: add auto_range(), apply_viridis(), fits_to_png()
  (moved from fits_image.py; fits_to_png() combines the 3-step pipeline)
- devices/mako.py Guider: add dtype=np.uint8, acquisition='stream';
  matrix now returns (H,W) always (squeeze inside the device)
- devices/stx.py Camera: add dtype=np.float32, acquisition='trigger'
- api/fits_image.py: remove _apply_viridis/_auto_range definitions;
  _run_loop uses device.dtype/acquisition instead of isinstance(stx.Camera);
  /png and /panoramic routes simplified via fits_to_png()
- web/stream.py: import auto_range/apply_viridis from utils.image instead
  of fits_image; _broadcast_preview simplified to fits_to_png() one-liner;
  _fits_path promoted to top-level import
- Remove mako_ok_with_opencv.py (dead code)

Co-Authored-By: default avatarClaude Sonnet 4.6 <noreply@anthropic.com>
parent f5cdb58a
Loading
Loading
Loading
Loading
Loading
+21 −67
Original line number Diff line number Diff line
@@ -38,6 +38,7 @@ The key must match the URL segment ``<station>/<camera>``.

# System modules
import asyncio
import base64
import configparser
from pathlib import Path

@@ -48,9 +49,7 @@ from quart import Blueprint, request, Response, jsonify, send_file

# Custom modules
from noctua.config.constants import DATA_FOLDER
from noctua.utils.image import array_to_png
from noctua.utils.color_tables import viridis
from noctua.devices import stx as _stx
from noctua.utils.image import array_to_png, auto_range, fits_to_png

viewer_blueprint = Blueprint('viewer', __name__)

@@ -69,30 +68,6 @@ _cfg.read(PACKAGE_ROOT / 'config' / 'viewer.ini')
# Private helpers
# ---------------------------------------------------------------------------

def _apply_viridis(data, vmin, vmax):
    """
    Map scalar float data to a Viridis RGB uint8 array.

    Parameters
    ----------
    data : np.ndarray
        2-D scalar array.
    vmin : float
        Lower stretching limit.
    vmax : float
        Upper stretching limit.

    Returns
    -------
    np.ndarray
        RGB uint8 array of shape ``(H, W, 3)``.
    """

    norm = np.clip((data - vmin) / (vmax - vmin), 0, 1)
    indices = (norm * 255).astype(np.uint8)
    return viridis[indices]


def _fits_path(station, camera):
    """
    Return the filesystem path for a camera's latest FITS file.
@@ -143,28 +118,6 @@ def _load_data(station, camera):
        return data.astype(np.float32)


def _auto_range(data, lo=0.5, hi=99.5):
    """
    Calculate a robust display range from percentiles.

    Parameters
    ----------
    data : np.ndarray
        Image data array.
    lo : float, optional
        Lower percentile.  Default is ``0.5``.
    hi : float, optional
        Upper percentile.  Default is ``99.5``.

    Returns
    -------
    tuple of float
        ``(vmin, vmax)`` pair.
    """

    return float(np.percentile(data, lo)), float(np.percentile(data, hi))


# ---------------------------------------------------------------------------
# Teccam acquisition loop
# ---------------------------------------------------------------------------
@@ -185,11 +138,12 @@ async def _run_loop(device, exposure: float, station: str, camera: str):
    """Continuous acquisition loop: captures frames and broadcasts PNG via WebSocket."""
    from noctua.web import streamer  # lazy — avoids circular import at module load
    ev = asyncio.get_running_loop()
    is_stx = isinstance(device, _stx.Camera)
    trigger_mode = getattr(device, 'acquisition', 'stream') == 'trigger'
    uint8_mode   = getattr(device, 'dtype', np.float32) == np.uint8
    try:
        while True:
            if is_stx:
                # STX Guider: trigger exposure, poll ready, then grab matrix
            if trigger_mode:
                # STX: explicit exposure trigger + ready poll
                await ev.run_in_executor(None, device.start, exposure, 1)
                deadline = ev.time() + exposure + 30.0
                while ev.time() < deadline:
@@ -200,10 +154,18 @@ async def _run_loop(device, exposure: float, station: str, camera: str):

            data = await ev.run_in_executor(None, lambda: device.matrix)
            if data is not None:
                await streamer._broadcast_preview(station, camera, np.squeeze(data))
                # device.matrix always returns (H, W); dtype selects rendering path
                if uint8_mode:
                    png = array_to_png(data)          # fixed [0, 255], grayscale
                else:
                    png = fits_to_png(data, thumbnail_size=512)  # viridis + auto-range
                encoded = base64.b64encode(png).decode('utf-8')
                await streamer.broadcaster.broadcast(
                    "fits-preview",
                    {"station": station, "camera": camera, "png": encoded},
                )

            if not is_stx:
                # Mako: broadcast first frame immediately, then wait before next
            if not trigger_mode:
                await asyncio.sleep(exposure)
    except asyncio.CancelledError:
        pass
@@ -236,7 +198,7 @@ async def image_info(station, camera):
    if data is None:
        return jsonify({'error': 'FITS not found'}), 404

    vmin, vmax = _auto_range(data)
    vmin, vmax = auto_range(data)
    return jsonify({
        'shape': list(data.shape),
        'dtype': str(data.dtype),
@@ -278,11 +240,7 @@ async def image_png(station, camera):

    vmin = request.args.get('vmin', default=None, type=float)
    vmax = request.args.get('vmax', default=None, type=float)
    if vmin is None or vmax is None:
        vmin, vmax = _auto_range(data)

    rgb = _apply_viridis(data, vmin, vmax)
    png = array_to_png(rgb)
    png = fits_to_png(data, vmin=vmin, vmax=vmax)
    return Response(png, mimetype='image/png', headers={'Cache-Control': 'no-store'})


@@ -322,12 +280,8 @@ async def image_panoramic(station, camera):

    vmin = request.args.get('vmin', default=None, type=float)
    vmax = request.args.get('vmax', default=None, type=float)
    if vmin is None or vmax is None:
        vmin, vmax = _auto_range(data)

    max_px = int(request.args.get('max_px', 256))
    rgb = _apply_viridis(data, vmin, vmax)
    png = array_to_png(rgb, thumbnail_size=max_px)
    png = fits_to_png(data, vmin=vmin, vmax=vmax, thumbnail_size=max_px)
    return Response(png, mimetype='image/png', headers={'Cache-Control': 'no-store'})


+12 −12
Original line number Diff line number Diff line
@@ -88,6 +88,9 @@ class Guider(Mako):
    capture support.
    """

    dtype = np.uint8        # raw sensor data is 8-bit grayscale
    acquisition = 'stream'  # grab-and-sleep, no exposure trigger needed

    def __init__(self, url):
        """
        Constructor
@@ -134,15 +137,18 @@ class Guider(Mako):
    @property
    def matrix(self):
        """
        Get the latest image as a numpy array.
        Get the latest image as a 2-D uint8 numpy array ``(H, W)``.
        Works during streaming or via single capture.
        """
        if self._streaming:
            with self._lock:
                return self._last_frame
                raw = self._last_frame
        else:
            cam = self._check_connection()
            return cam.get_frame().as_numpy_ndarray()
            raw = cam.get_frame().as_numpy_ndarray()
        if raw is None:
            return None
        return np.squeeze(raw)

    @property
    def image(self):
@@ -169,14 +175,8 @@ class Guider(Mako):
        if filename is None:
            from ..config.constants import viewer_fits_path
            filename = viewer_fits_path(getattr(self, '_viewer_key', None))
        raw = self.matrix
        if raw is not None:
            # Squeeze or reshape to 2D if it's a mono image with a channel dim
            if raw.ndim == 3 and raw.shape[2] == 1:
                data = raw.reshape(raw.shape[0], raw.shape[1])
            else:
                data = raw

        data = self.matrix  # already (H, W)
        if data is not None:
            hdu = fits.PrimaryHDU(data)
            hdu.writeto(filename, overwrite=True)
        return filename
+0 −177
Original line number Diff line number Diff line
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
Driver for Allied Vision Mako cameras using VmbPy.
Zero-dependency version (no OpenCV, no Pillow).
"""

# System modules
import threading
import numpy as np
import zlib
import struct
from datetime import datetime

# Third-party modules
from astropy.io import fits
from vmbpy import VmbSystem, FrameStatus

# Custom modules
from .basedevice import BaseDevice


class Mako(BaseDevice):
    """
    Base wrapper class for Allied Vision Mako cameras with persistent connection.
    """

    def __init__(self, url):
        super().__init__(url)
        self.id = url
        self.vmb = VmbSystem.get_instance()
        self._cam = None
        self._streaming = False

    def _check_connection(self):
        if self._cam is None:
            self.vmb.__enter__()
            try:
                self._cam = self.vmb.get_camera_by_id(self.id)
                self._cam.__enter__()
                # Setup GigE packet size
                try:
                    stream = self._cam.get_streams()[0]
                    stream.GVSPAdjustPacketSize.run()
                    while not stream.GVSPAdjustPacketSize.is_done(): pass
                except: pass
            except Exception as e:
                self._cam = None
                raise ConnectionError(f"Connection failed: {e}")
        return self._cam

    def get(self, feature_name):
        cam = self._check_connection()
        return getattr(cam, feature_name)

    def put(self, feature_name, value):
        cam = self._check_connection()
        getattr(cam, feature_name).set(value)

    def __del__(self):
        if self._cam:
            self._cam.__exit__(None, None, None)
        if self.vmb:
            self.vmb.__exit__(None, None, None)


class Webcam(Mako):
    """
    High-level interface for Mako cameras using only provided stack.
    """

    def __init__(self, url):
        super().__init__(url)
        self._last_frame = None
        self._lock = threading.Lock()
        self._range = [0, 255]

    def _frame_handler(self, cam, stream, frame):
        if frame.get_status() == FrameStatus.Complete:
            with self._lock:
                self._last_frame = frame.as_numpy_ndarray().copy()
        cam.queue_frame(frame)

    @property
    def streaming(self):
        return self._streaming

    @streaming.setter
    def streaming(self, b):
        cam = self._check_connection()
        if b == self._streaming: return
        if b:
            cam.start_streaming(handler=self._frame_handler, buffer_count=5)
            self._streaming = True
        else:
            cam.stop_streaming()
            self._streaming = False

    @property
    def image(self):
        """Get the latest raw numpy array."""
        if self._streaming:
            with self._lock:
                return self._last_frame
        else:
            cam = self._check_connection()
            return cam.get_frame().as_numpy_ndarray()

    def _to_png(self, data):
        """Pure Python/Numpy PNG encoder (8-bit grayscale)."""
        # Rescale and clip
        rescaled = np.clip(data, self._range[0], self._range[1])
        rescaled = ((rescaled - self._range[0]) / (self._range[1] - self._range[0]) * 255).astype(np.uint8)
        
        height, width = rescaled.shape
        # PNG signature
        png_bin = b'\x89PNG\r\n\x1a\n'
        
        # IHDR chunk
        ihdr = struct.pack("!2I5B", width, height, 8, 0, 0, 0, 0)
        png_bin += self._make_chunk(b'IHDR', ihdr)
        
        # IDAT chunk (zlib compressed pixels with filter type 0)
        flat_data = np.insert(rescaled, 0, 0, axis=1).tobytes() # Add null filter byte to each row
        png_bin += self._make_chunk(b'IDAT', zlib.compress(flat_data))
        
        # IEND chunk
        png_bin += self._make_chunk(b'IEND', b'')
        return png_bin

    def _make_chunk(self, tag, data):
        """Helper for PNG chunks."""
        chunk = tag + data
        return struct.pack("!I", len(data)) + chunk + struct.pack("!I", zlib.crc32(chunk) & 0xFFFFFFFF)

    def save_image(self, filename="temp.png"):
        """Save image using pure Python PNG encoder."""
        data = self.image
        if data is not None:
            png_data = self._to_png(data)
            with open(filename, 'wb') as f:
                f.write(png_data)
        return filename

    def save_fits(self, filename="temp.fits"):
        """Save image as FITS using astropy."""
        raw = self.image
        if raw is not None:
            # Reshape based on Mako G-125 geometry
            data = np.reshape(raw, (1024, 1280))
            hdu = fits.PrimaryHDU(data)
            hdu.writeto(datetime.utcnow().strftime(filename), overwrite=True)
        return filename

    @property
    def autoexpose(self):
        return self.get("ExposureAuto").as_tuple()

    @autoexpose.setter
    def autoexpose(self, value):
        self.put("ExposureAuto", value)

    def show_stream(self):
        """
        Instructions to view the stream. 
        Since no GUI libs are present, use the Quart web server.
        """
        print("--- MAKO WEB STREAM ---")
        print("1. In your app.py, register a route that returns 'mako.get_jpeg()'")
        print("2. Open your browser at: http://localhost:5533/api/webcam/stream")
        print("-----------------------")

    def get_png_buffer(self):
        """Returns the current frame as an in-memory PNG for the Quart Response."""
        data = self.image
        return self._to_png(data) if data is not None else None
+3 −0
Original line number Diff line number Diff line
@@ -161,6 +161,9 @@ class Camera(STX):
    High-level interface for the SBIG STX camera imaging CCD.
    """

    dtype = np.float32       # FITS data is floating-point
    acquisition = 'trigger'  # requires start() + ready poll before matrix

    def __init__(self, url):
        """
        Constructor
+23 −0
Original line number Diff line number Diff line
@@ -15,6 +15,9 @@ from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler
# Third-party modules
import numpy as np

# Internal
from .color_tables import viridis as _viridis


def array_to_png(data, vmin=0, vmax=255, thumbnail_size=None):
    """
@@ -98,6 +101,26 @@ def array_to_png(data, vmin=0, vmax=255, thumbnail_size=None):
    return png_bin


def auto_range(data, lo=0.5, hi=99.5):
    """Return (vmin, vmax) from percentiles of *data*."""
    return float(np.percentile(data, lo)), float(np.percentile(data, hi))


def apply_viridis(data, vmin, vmax):
    """Map a 2-D scalar array to a Viridis RGB uint8 array ``(H, W, 3)``."""
    norm = np.clip((data - vmin) / (vmax - vmin), 0, 1)
    indices = (norm * 255).astype(np.uint8)
    return _viridis[indices]


def fits_to_png(data, vmin=None, vmax=None, thumbnail_size=None):
    """Convert a 2-D float array to a viridis-coloured PNG (auto-range when vmin/vmax absent)."""
    if vmin is None or vmax is None:
        vmin, vmax = auto_range(data)
    rgb = apply_viridis(data, vmin, vmax)
    return array_to_png(rgb, thumbnail_size=thumbnail_size)


class Streamer:
    """
    HTTP server that broadcasts images provided by a callback function.
Loading