Loading noctua/api/fits_image.py +146 −132 Original line number Diff line number Diff line Loading @@ -4,29 +4,36 @@ """ REST API for FITS image serving. Endpoints (all under /api/viewer/<station>/<camera>): GET /png?vmin=&vmax= → full PNG (rendered with viridis) GET /fits → raw FITS file download GET /info → JSON: shape, dtype, percentile stats GET /tile?cx=&cy=&r= → arraybuffer tile (raw float32 pixels) centred on (cx,cy) with half-radius r GET /panoramic?vmin=&vmax= → small PNG thumbnail (≤256px longest side) The station/camera names map to filesystem paths via VIEWER_PATHS in noctua/config/viewer.ini, e.g.: All endpoints live under ``/api/viewer/<station>/<camera>`` and are camera-agnostic: the same routes serve every scientific or technical camera that is listed in ``noctua/config/viewer.ini``. Endpoints --------- GET /info JSON metadata: shape, dtype, auto-range percentiles. GET /png?vmin=&vmax= Full-resolution PNG rendered with Viridis. GET /panoramic?vmin=&vmax=&max_px= Small PNG thumbnail (longest side ≤ *max_px*, default 256). Thumbnail downscaling is done inside :func:`~noctua.utils.image.array_to_png` using pure numpy — no scikit-image dependency. GET /tile?cx=&cy=&rx=&ry= Raw ``float32`` arraybuffer tile centred on *(cx, cy)*. GET /fits Raw FITS file download. Configuration ------------- Station/camera paths are declared in ``noctua/config/viewer.ini``:: [station1/scicam] path = /data/station1/latest.fits [station1/teccam] path = /data/station1/latest_tec.fits """ #!/usr/bin/env python3 # -*- coding: utf-8 -*- [station2/allsky] path = /data/station2/allsky.fits """ REST API for FITS image serving. The key must match the URL segment ``<station>/<camera>``. """ # System modules Loading @@ -45,85 +52,88 @@ from noctua.utils.color_tables import viridis viewer_blueprint = Blueprint('viewer', __name__) # Path Resolution # --------------------------------------------------------------------------- # Path resolution # --------------------------------------------------------------------------- PACKAGE_ROOT = Path(__file__).parent.parent.absolute() PROJECT_ROOT = PACKAGE_ROOT.parent _cfg = configparser.ConfigParser() _cfg_path = PACKAGE_ROOT / 'config' / 'viewer.ini' _cfg.read(_cfg_path) _cfg.read(PACKAGE_ROOT / 'config' / 'viewer.ini') # --------------------------------------------------------------------------- # Private helpers # --------------------------------------------------------------------------- def _apply_viridis(data, vmin, vmax): """ Map scalar float data to Viridis RGB uint8 array. Map scalar float data to a Viridis RGB uint8 array. Parameters ---------- data : np.ndarray Scalar 2D array. vmin, vmax : float Stretching limits. 2-D scalar array. vmin : float Lower stretching limit. vmax : float Upper stretching limit. Returns ------- np.ndarray RGB uint8 array with shape (H, W, 3). RGB uint8 array of shape ``(H, W, 3)``. """ # Normalize 0.0 to 1.0 norm = np.clip((data - vmin) / (vmax - vmin), 0, 1) # Map to 0-255 indices indices = (norm * 255).astype(np.uint8) return viridis[indices] def _fits_path(station: str, camera: str) -> Path | None: def _fits_path(station, camera): """ Return the filesystem path for the latest FITS of a camera. Return the filesystem path for a camera's latest FITS file. Parameters ---------- station : str The name of the station. Station identifier (e.g. ``'station1'``). camera : str The name of the camera. Camera identifier (e.g. ``'scicam'``, ``'allsky'``). Returns ------- Path or None The absolute Path object to the FITS file, or None if not configured. pathlib.Path or None Absolute path to the FITS file, or ``None`` if not configured. """ key = f'{station}/{camera}' raw_path = _cfg.get(key, 'path', fallback=None) raw_path = _cfg.get(f'{station}/{camera}', 'path', fallback=None) if raw_path is None: return None return PROJECT_ROOT / DATA_FOLDER / raw_path def _load_data(station: str, camera: str) -> np.ndarray | None: def _load_data(station, camera): """ Load the primary HDU data as float32. Load the primary HDU data as a ``float32`` array. Parameters ---------- station : str The name of the station. Station identifier. camera : str The name of the camera. Camera identifier. Returns ------- np.ndarray or None The image data array, or None if the file is missing or invalid. Image data, or ``None`` if the file is missing or empty. """ path = _fits_path(station, camera) if path is None or not path.exists(): return None with fits.open(path) as hdul: data = hdul[0].data if data is None: Loading @@ -131,62 +141,31 @@ def _load_data(station: str, camera: str) -> np.ndarray | None: return data.astype(np.float32) def _auto_range(data: np.ndarray, lo: float = 0.5, hi: float = 99.5) -> tuple: def _auto_range(data, lo=0.5, hi=99.5): """ Calculate dynamic range based on percentiles. Calculate a robust display range from percentiles. Parameters ---------- data : np.ndarray The image data array. Image data array. lo : float, optional The lower percentile. Defaults to 0.5. Lower percentile. Default is ``0.5``. hi : float, optional The upper percentile. Defaults to 99.5. Upper percentile. Default is ``99.5``. Returns ------- tuple A tuple containing (vmin, vmax). tuple of float ``(vmin, vmax)`` pair. """ return float(np.percentile(data, lo)), float(np.percentile(data, hi)) def _thumb(data: np.ndarray, max_px: int = 256) -> np.ndarray: """ Downscale data to fit within max_px on the longest side. Parameters ---------- data : np.ndarray The original image data. max_px : int, optional The maximum size in pixels for the longest edge. Defaults to 256. Returns ------- np.ndarray The downscaled image array. """ h, w = data.shape[:2] scale = max_px / max(h, w) if scale >= 1.0: return data new_h = max(1, int(h * scale)) new_w = max(1, int(w * scale)) try: from skimage.transform import resize as sk_resize return sk_resize(data, (new_h, new_w), anti_aliasing=True, preserve_range=True).astype(np.float32) except ImportError: sy = max(1, h // new_h) sx = max(1, w // new_w) return data[::sy, ::sx] # --------------------------------------------------------------------------- # Routes # --------------------------------------------------------------------------- @viewer_blueprint.route('/viewer/<station>/<camera>/info') async def image_info(station, camera): Loading @@ -196,23 +175,22 @@ async def image_info(station, camera): Parameters ---------- station : str The name of the station. Station identifier. camera : str The name of the camera. Camera identifier. Returns ------- Response A JSON response containing image metadata. quart.Response JSON with keys ``shape``, ``dtype``, ``vmin_auto``, ``vmax_auto``, ``global_min``, ``global_max``. """ data = _load_data(station, camera) if data is None: return jsonify({'error': 'FITS not found'}), 404 vmin, vmax = _auto_range(data) return jsonify({ 'shape': list(data.shape), 'dtype': str(data.dtype), Loading @@ -226,19 +204,26 @@ async def image_info(station, camera): @viewer_blueprint.route('/viewer/<station>/<camera>/png') async def image_png(station, camera): """ Render the full image as a PNG. Render the full image as a PNG with Viridis colormap. Parameters ---------- station : str The name of the station. Station identifier. camera : str The name of the camera. Camera identifier. Returns ------- Response The PNG image data. quart.Response PNG image bytes. Query Parameters ---------------- vmin : float, optional Lower display limit. Auto-computed when omitted. vmax : float, optional Upper display limit. Auto-computed when omitted. """ data = _load_data(station, camera) Loading @@ -258,20 +243,33 @@ async def image_png(station, camera): @viewer_blueprint.route('/viewer/<station>/<camera>/panoramic') async def image_panoramic(station, camera): """ Render a small thumbnail PNG. Render a thumbnail PNG (longest side ≤ *max_px* pixels). Downscaling is performed by :func:`~noctua.utils.image.array_to_png` using pure numpy stride sampling — no extra dependencies required. Parameters ---------- station : str The name of the station. Station identifier. camera : str The name of the camera. Camera identifier. Returns ------- Response The panoramic PNG image data. quart.Response Small PNG image bytes. Query Parameters ---------------- vmin : float, optional Lower display limit. Auto-computed when omitted. vmax : float, optional Upper display limit. Auto-computed when omitted. max_px : int, optional Maximum pixels on the longest side. Default is ``256``. """ data = _load_data(station, camera) if data is None: return Response('FITS not found', status=404) Loading @@ -281,46 +279,57 @@ async def image_panoramic(station, camera): if vmin is None or vmax is None: vmin, vmax = _auto_range(data) thumb = _thumb(data, max_px=int(request.args.get('max_px', 256))) rgb = _apply_viridis(thumb, vmin, vmax) png = array_to_png(rgb) max_px = int(request.args.get('max_px', 256)) rgb = _apply_viridis(data, vmin, vmax) png = array_to_png(rgb, thumbnail_size=max_px) return Response(png, mimetype='image/png', headers={'Cache-Control': 'no-store'}) @viewer_blueprint.route('/viewer/<station>/<camera>/tile') async def image_tile(station, camera): """ Return a raw float32 arraybuffer tile centred on (cx, cy). Return a raw ``float32`` arraybuffer tile centred on *(cx, cy)*. The response binary layout is:: [tw: int32][th: int32][data: float32 × tw × th] Parameters ---------- station : str The name of the station. Station identifier. camera : str The name of the camera. Camera identifier. Returns ------- Response The binary tile data with header. quart.Response Binary ``application/octet-stream`` payload. Query Parameters ---------------- cx : int, optional Tile centre X (image pixels). Defaults to image centre. cy : int, optional Tile centre Y (image pixels). Defaults to image centre. rx : int, optional Half-width of the tile. Default is ``50``. ry : int, optional Half-height of the tile. Default is ``50``. """ data = _load_data(station, camera) if data is None: return Response('FITS not found', status=404) h, w = data.shape[:2] cx = request.args.get('cx', default=w // 2, type=int) cy = request.args.get('cy', default=h // 2, type=int) rx = request.args.get('rx', default=50, type=int) # Default 15 half-width (30 total) ry = request.args.get('ry', default=50, type=int) # Default 10 half-height (20 total) x0 = max(0, cx - rx) x1 = min(w, cx + rx) y0 = max(0, cy - ry) y1 = min(h, cy + ry) rx = request.args.get('rx', default=50, type=int) ry = request.args.get('ry', default=50, type=int) x0, x1 = max(0, cx - rx), min(w, cx + rx) y0, y1 = max(0, cy - ry), min(h, cy + ry) tile = data[y0:y1, x0:x1] tw, th = tile.shape[1], tile.shape[0] Loading @@ -333,27 +342,32 @@ async def image_tile(station, camera): headers={'Cache-Control': 'no-store'}, ) @viewer_blueprint.route('/viewer/<station>/<camera>/fits') async def download_fits(station, camera): """ Serve the raw FITS file for download. Serve the raw FITS file as a download. Parameters ---------- station : str The name of the station. Station identifier. camera : str The name of the camera. Camera identifier. Returns ------- Response The raw FITS file attachment. quart.Response File attachment. """ path = _fits_path(station, camera) if path is None or not path.exists(): return Response('FITS not found', status=404) return await send_file(path, mimetype='image/fits', as_attachment=True, attachment_filename=path.name) return await send_file( path, mimetype='image/fits', as_attachment=True, attachment_filename=path.name, ) noctua/config/devices.ini +17 −12 Original line number Diff line number Diff line Loading @@ -8,18 +8,6 @@ ###################################### [teccam] module = mako class = Webcam node = MAKO [teccam2] module = mako class = Webcam node = MAKO2 ###################################### [stage] module = mercury class = Stage Loading @@ -30,6 +18,23 @@ pos_echelle = 100 ###################################### [teccam] module = stx class = Guider node = STX [teccam2] module = mako class = Guider node = MAKO2 [teccam3] module = mako class = Guider node = MAKO ###################################### [cam] module = stx class = Camera Loading noctua/devices/mako.py +1 −1 Original line number Diff line number Diff line Loading @@ -84,7 +84,7 @@ class Mako(BaseDevice): self.vmb.__exit__(None, None, None) class Webcam(Mako): class Guider(Mako): """ High-level interface for Mako cameras with streaming and capture support. Loading noctua/devices/stx.py +153 −24 Original line number Diff line number Diff line Loading @@ -3,6 +3,30 @@ """ Interface with a SBIG STX camera device Imager Internal External BinX BinX BinX BinY BinY BinY CoolerState CoolerState --- CCDTemperature CCDTemperature --- CCDTemperatureSetpoint --- --- CoolerPower CoolerPower --- CameraXSize CameraXSize CameraXSize CameraYSize CameraYSize CameraYSize ElectronsPerADU ElectronsPerADU ElectronsPerADU FullWellCapacity FullWellCapacity FullWellCapacity AmbientTemperature AmbientTemperature AmbientTemperature MaxADU MaxADU MaxADU MaxBinX MaxBinX MaxBinX MaxBinY MaxBinY MaxBinY StartX StartX StartX StartY StartY StartY NumX NumX NumX NumY NumY NumY PixelSizeX PixelSizeX PixelSizeX PixelSizeY PixelSizeY PixelSizeY """ # System modules Loading Loading @@ -158,9 +182,17 @@ class Camera(STX): High-level interface for the SBIG STX camera imaging CCD. """ def __init__(self, url): """ Constructor """ super().__init__(url) self.cam = "Imager" def abort(self): """Aborts the current exposure.""" self.put("ImagerAbortExposure") self.put(f"{self.cam}AbortExposure") def start(self, duration, frametype, datetime=None): """ Loading Loading @@ -190,7 +222,7 @@ class Camera(STX): params = {"Duration": duration, "FrameType": frametype, "DateTime": datetime} self.put("ImagerStartExposure", params=params) self.put(f"{self.cam}StartExposure", params=params) def download(self, filepath=temp_fits): Loading Loading @@ -246,14 +278,14 @@ class Camera(STX): "NumY": height } self.put("ImagerSetSettings", params=params) self.put(f"{self.cam}SetSettings", params=params) def full_frame(self): """Sets the camera to use the full sensor area.""" params = ["CameraXSize", "CameraYSize"] try: cam_x, cam_y = self.get("ImagerGetSettings", params=params) cam_x, cam_y = self.get(f"{self.cam}GetSettings", params=params) except ValueError as e: msg = f"Device exception: {e}" log.error(msg) Loading @@ -269,7 +301,7 @@ class Camera(STX): params = ["CameraXSize", "CameraYSize"] try: cam_x, cam_y = self.get("ImagerGetSettings", params=params) cam_x, cam_y = self.get(f"{self.cam}GetSettings", params=params) except ValueError as e: msg = f"Device exception: {e}" log.error(msg) Loading @@ -290,7 +322,7 @@ class Camera(STX): params = ["CameraXSize", "CameraYSize"] try: cam_x, cam_y = self.get("ImagerGetSettings", params=params) cam_x, cam_y = self.get(f"{self.cam}GetSettings", params=params) except ValueError as e: msg = f"Device exception: {e}" log.error(msg) Loading @@ -312,7 +344,7 @@ class Camera(STX): params = ["BinX", "BinY"] try: binx, biny = self.get("ImagerGetSettings", params=params) binx, biny = self.get(f"{self.cam}GetSettings", params=params) except ValueError as e: msg = f"Device exception: {e}" log.error(msg) Loading @@ -334,7 +366,7 @@ class Camera(STX): return params = {"BinX": b[0], "BinY": b[1]} self.put("ImagerSetSettings", params=params) self.put(f"{self.cam}SetSettings", params=params) @property def filter(self): Loading Loading @@ -365,7 +397,7 @@ class Camera(STX): """bool: The current state of the CCD cooler (True=On, False=Off).""" params = ["CoolerState"] res = self.get("ImagerGetSettings", params=params) res = self.get(f"{self.cam}GetSettings", params=params) if self.error: return None Loading @@ -382,14 +414,14 @@ class Camera(STX): return params = {"CoolerState": "1" if b else "0"} self.put("ImagerSetSettings", params=params) self.put(f"{self.cam}SetSettings", params=params) @property def temperature(self): """float: The current CCD temperature in degrees Celsius.""" params = ["CCDTemperature"] res = self.get("ImagerGetSettings", params=params) res = self.get(f"{self.cam}GetSettings", params=params) if self.error: return None Loading @@ -406,7 +438,7 @@ class Camera(STX): return params = {"CCDTemperatureSetpoint": t} self.put("ImagerSetSettings", params=params) self.put(f"{self.cam}SetSettings", params=params) @property def all(self): Loading @@ -426,7 +458,7 @@ class Camera(STX): "StartY", "NumX", "NumY"] res = self.get("ImagerGetSettings", params=params) res = self.get(f"{self.cam}GetSettings", params=params) if self.error or not res or len(res) != len(params): return {} Loading Loading @@ -477,7 +509,7 @@ class Camera(STX): """float: The ambient temperature in degrees Celsius.""" params = ["AmbientTemperature"] res = self.get("ImagerGetSettings", params=params) res = self.get(f"{self.cam}GetSettings", params=params) if self.error: return None Loading @@ -489,7 +521,7 @@ class Camera(STX): params = ["CameraXSize", "CameraYSize", "BinX", "BinY"] try: camx, camy, binx, biny = self.get("ImagerGetSettings", params=params) camx, camy, binx, biny = self.get(f"{self.cam}GetSettings", params=params) except ValueError as e: msg = f"Device exception: {e}" log.error(msg) Loading @@ -515,7 +547,7 @@ class Camera(STX): """int: The current cooler power level in percent.""" params = ["CoolerPower"] res = self.get("ImagerGetSettings", params=params) res = self.get(f"{self.cam}GetSettings", params=params) if self.error: return None Loading @@ -538,7 +570,7 @@ class Camera(STX): """list of int: The maximum sensor dimensions [X, Y] for current binning.""" params = ["CameraXSize", "CameraYSize"] res = self.get("ImagerGetSettings", params=params) res = self.get(f"{self.cam}GetSettings", params=params) if self.error: return [None, None] Loading @@ -550,7 +582,7 @@ class Camera(STX): """float: The current CCD temperature setpoint in degrees Celsius.""" params = ["CCDTemperatureSetpoint"] res = self.get("ImagerGetSettings", params=params) res = self.get(f"{self.cam}GetSettings", params=params) if self.error: return None Loading @@ -562,7 +594,7 @@ class Camera(STX): 0=Idle, 2=Exposing, 3=Reading, 5=Error. """ res = self.get("ImagerState") res = self.get(f"{self.cam}State") if self.error: return None Loading @@ -574,7 +606,7 @@ class Camera(STX): 1 if an image is ready for download, 0 otherwise. """ res = self.get("ImagerImageReady") res = self.get(f"{self.cam}ImageReady") if self.error: return None Loading @@ -596,7 +628,7 @@ class Camera(STX): params = ["StartX", "NumX", "BinX"] try: startx, numx, binx = self.get("ImagerGetSettings", params=params) startx, numx, binx = self.get(f"{self.cam}GetSettings", params=params) except ValueError as e: msg = f"Device exception: {e}" log.error(msg) Loading @@ -615,7 +647,7 @@ class Camera(STX): params = ["StartY", "NumY", "BinY"] try: starty, numy, biny = self.get("ImagerGetSettings", params=params) starty, numy, biny = self.get(f"{self.cam}GetSettings", params=params) except ValueError as e: msg = f"Device exception: {e}" log.error(msg) Loading @@ -635,7 +667,7 @@ class Camera(STX): params = ["StartX", "StartY", "BinX", "BinY"] try: startx, starty, binx, biny = self.get( "ImagerGetSettings", params=params) f"{self.cam}GetSettings", params=params) except ValueError as e: msg = f"Device exception: {e}" log.error(msg) Loading @@ -653,7 +685,7 @@ class Camera(STX): params = ["StartX", "StartY", "NumX", "NumY", "BinX", "BinY"] try: startx, starty, numx, numy, binx, biny = self.get( "ImagerGetSettings", params=params) f"{self.cam}GetSettings", params=params) except ValueError as e: msg = f"Device exception: {e}" log.error(msg) Loading @@ -665,3 +697,100 @@ class Camera(STX): x_end = (int(startx) + int(numx)) // int(binx) y_end = (int(starty) + int(numy)) // int(biny) return [x_end, y_end] class Guider(Camera): """ Guiding Teccam. """ def __init__(self, url): """ Constructor """ super().__init__(url) self.cam = "ExtGuider" @property def cooler(self): """bool: The current state of the CCD cooler (True=On, False=Off).""" log.warning(f"Not available in External Guider") return None @cooler.setter def cooler(self, b): """bool: The current state of the CCD cooler (True=On, False=Off).""" log.warning(f"Not available in External Guider") return None @property def temperature(self): """float: The current CCD temperature in degrees Celsius.""" log.warning(f"Not available in External Guider") return None @temperature.setter def temperature(self, t): log.warning(f"Not available in External Guider") return None @property def all(self): """dict: A comprehensive dictionary of the current camera state.""" params = [ "AmbientTemperature", "BinX", "BinY", "CameraXSize", "CameraYSize", "StartX", "StartY", "NumX", "NumY"] res = self.get(f"{self.cam}GetSettings", params=params) if self.error or not res or len(res) != len(params): return {} try: ambient, binx, biny, camx, camy, startx, starty, numx, numy = res b_x, b_y = int(binx), int(biny) x_start, y_start = int(startx) // b_x, int(starty) // b_y x_end = (int(startx) + int(numx)) // b_x y_end = (int(starty) + int(numy)) // b_y except ValueError as e: msg = f"Device exception: {e}" log.error(msg) self.error.append(msg) if self.error: return {"ambient": None, "binning": [None, None], "max_range": [None, None], "xystart": [None, None], "xyend": [None, None], "xrange": [None, None], "yrange": [None, None], "center": [None, None], } return {"ambient": round(ambient, 1), "binning": [b_x, b_y], "max_range": [int(camx) // b_x, int(camy) // b_y], "xystart": [x_start, y_start], "xyend": [x_end, y_end], "xrange": [x_start, x_end], "yrange": [y_start, y_end], "center": [int(camx) // b_x // 2, int(camy) // b_y // 2], } noctua/utils/image.py +97 −49 File changed.Preview size limit exceeded, changes collapsed. Show changes Loading
noctua/api/fits_image.py +146 −132 Original line number Diff line number Diff line Loading @@ -4,29 +4,36 @@ """ REST API for FITS image serving. Endpoints (all under /api/viewer/<station>/<camera>): GET /png?vmin=&vmax= → full PNG (rendered with viridis) GET /fits → raw FITS file download GET /info → JSON: shape, dtype, percentile stats GET /tile?cx=&cy=&r= → arraybuffer tile (raw float32 pixels) centred on (cx,cy) with half-radius r GET /panoramic?vmin=&vmax= → small PNG thumbnail (≤256px longest side) The station/camera names map to filesystem paths via VIEWER_PATHS in noctua/config/viewer.ini, e.g.: All endpoints live under ``/api/viewer/<station>/<camera>`` and are camera-agnostic: the same routes serve every scientific or technical camera that is listed in ``noctua/config/viewer.ini``. Endpoints --------- GET /info JSON metadata: shape, dtype, auto-range percentiles. GET /png?vmin=&vmax= Full-resolution PNG rendered with Viridis. GET /panoramic?vmin=&vmax=&max_px= Small PNG thumbnail (longest side ≤ *max_px*, default 256). Thumbnail downscaling is done inside :func:`~noctua.utils.image.array_to_png` using pure numpy — no scikit-image dependency. GET /tile?cx=&cy=&rx=&ry= Raw ``float32`` arraybuffer tile centred on *(cx, cy)*. GET /fits Raw FITS file download. Configuration ------------- Station/camera paths are declared in ``noctua/config/viewer.ini``:: [station1/scicam] path = /data/station1/latest.fits [station1/teccam] path = /data/station1/latest_tec.fits """ #!/usr/bin/env python3 # -*- coding: utf-8 -*- [station2/allsky] path = /data/station2/allsky.fits """ REST API for FITS image serving. The key must match the URL segment ``<station>/<camera>``. """ # System modules Loading @@ -45,85 +52,88 @@ from noctua.utils.color_tables import viridis viewer_blueprint = Blueprint('viewer', __name__) # Path Resolution # --------------------------------------------------------------------------- # Path resolution # --------------------------------------------------------------------------- PACKAGE_ROOT = Path(__file__).parent.parent.absolute() PROJECT_ROOT = PACKAGE_ROOT.parent _cfg = configparser.ConfigParser() _cfg_path = PACKAGE_ROOT / 'config' / 'viewer.ini' _cfg.read(_cfg_path) _cfg.read(PACKAGE_ROOT / 'config' / 'viewer.ini') # --------------------------------------------------------------------------- # Private helpers # --------------------------------------------------------------------------- def _apply_viridis(data, vmin, vmax): """ Map scalar float data to Viridis RGB uint8 array. Map scalar float data to a Viridis RGB uint8 array. Parameters ---------- data : np.ndarray Scalar 2D array. vmin, vmax : float Stretching limits. 2-D scalar array. vmin : float Lower stretching limit. vmax : float Upper stretching limit. Returns ------- np.ndarray RGB uint8 array with shape (H, W, 3). RGB uint8 array of shape ``(H, W, 3)``. """ # Normalize 0.0 to 1.0 norm = np.clip((data - vmin) / (vmax - vmin), 0, 1) # Map to 0-255 indices indices = (norm * 255).astype(np.uint8) return viridis[indices] def _fits_path(station: str, camera: str) -> Path | None: def _fits_path(station, camera): """ Return the filesystem path for the latest FITS of a camera. Return the filesystem path for a camera's latest FITS file. Parameters ---------- station : str The name of the station. Station identifier (e.g. ``'station1'``). camera : str The name of the camera. Camera identifier (e.g. ``'scicam'``, ``'allsky'``). Returns ------- Path or None The absolute Path object to the FITS file, or None if not configured. pathlib.Path or None Absolute path to the FITS file, or ``None`` if not configured. """ key = f'{station}/{camera}' raw_path = _cfg.get(key, 'path', fallback=None) raw_path = _cfg.get(f'{station}/{camera}', 'path', fallback=None) if raw_path is None: return None return PROJECT_ROOT / DATA_FOLDER / raw_path def _load_data(station: str, camera: str) -> np.ndarray | None: def _load_data(station, camera): """ Load the primary HDU data as float32. Load the primary HDU data as a ``float32`` array. Parameters ---------- station : str The name of the station. Station identifier. camera : str The name of the camera. Camera identifier. Returns ------- np.ndarray or None The image data array, or None if the file is missing or invalid. Image data, or ``None`` if the file is missing or empty. """ path = _fits_path(station, camera) if path is None or not path.exists(): return None with fits.open(path) as hdul: data = hdul[0].data if data is None: Loading @@ -131,62 +141,31 @@ def _load_data(station: str, camera: str) -> np.ndarray | None: return data.astype(np.float32) def _auto_range(data: np.ndarray, lo: float = 0.5, hi: float = 99.5) -> tuple: def _auto_range(data, lo=0.5, hi=99.5): """ Calculate dynamic range based on percentiles. Calculate a robust display range from percentiles. Parameters ---------- data : np.ndarray The image data array. Image data array. lo : float, optional The lower percentile. Defaults to 0.5. Lower percentile. Default is ``0.5``. hi : float, optional The upper percentile. Defaults to 99.5. Upper percentile. Default is ``99.5``. Returns ------- tuple A tuple containing (vmin, vmax). tuple of float ``(vmin, vmax)`` pair. """ return float(np.percentile(data, lo)), float(np.percentile(data, hi)) def _thumb(data: np.ndarray, max_px: int = 256) -> np.ndarray: """ Downscale data to fit within max_px on the longest side. Parameters ---------- data : np.ndarray The original image data. max_px : int, optional The maximum size in pixels for the longest edge. Defaults to 256. Returns ------- np.ndarray The downscaled image array. """ h, w = data.shape[:2] scale = max_px / max(h, w) if scale >= 1.0: return data new_h = max(1, int(h * scale)) new_w = max(1, int(w * scale)) try: from skimage.transform import resize as sk_resize return sk_resize(data, (new_h, new_w), anti_aliasing=True, preserve_range=True).astype(np.float32) except ImportError: sy = max(1, h // new_h) sx = max(1, w // new_w) return data[::sy, ::sx] # --------------------------------------------------------------------------- # Routes # --------------------------------------------------------------------------- @viewer_blueprint.route('/viewer/<station>/<camera>/info') async def image_info(station, camera): Loading @@ -196,23 +175,22 @@ async def image_info(station, camera): Parameters ---------- station : str The name of the station. Station identifier. camera : str The name of the camera. Camera identifier. Returns ------- Response A JSON response containing image metadata. quart.Response JSON with keys ``shape``, ``dtype``, ``vmin_auto``, ``vmax_auto``, ``global_min``, ``global_max``. """ data = _load_data(station, camera) if data is None: return jsonify({'error': 'FITS not found'}), 404 vmin, vmax = _auto_range(data) return jsonify({ 'shape': list(data.shape), 'dtype': str(data.dtype), Loading @@ -226,19 +204,26 @@ async def image_info(station, camera): @viewer_blueprint.route('/viewer/<station>/<camera>/png') async def image_png(station, camera): """ Render the full image as a PNG. Render the full image as a PNG with Viridis colormap. Parameters ---------- station : str The name of the station. Station identifier. camera : str The name of the camera. Camera identifier. Returns ------- Response The PNG image data. quart.Response PNG image bytes. Query Parameters ---------------- vmin : float, optional Lower display limit. Auto-computed when omitted. vmax : float, optional Upper display limit. Auto-computed when omitted. """ data = _load_data(station, camera) Loading @@ -258,20 +243,33 @@ async def image_png(station, camera): @viewer_blueprint.route('/viewer/<station>/<camera>/panoramic') async def image_panoramic(station, camera): """ Render a small thumbnail PNG. Render a thumbnail PNG (longest side ≤ *max_px* pixels). Downscaling is performed by :func:`~noctua.utils.image.array_to_png` using pure numpy stride sampling — no extra dependencies required. Parameters ---------- station : str The name of the station. Station identifier. camera : str The name of the camera. Camera identifier. Returns ------- Response The panoramic PNG image data. quart.Response Small PNG image bytes. Query Parameters ---------------- vmin : float, optional Lower display limit. Auto-computed when omitted. vmax : float, optional Upper display limit. Auto-computed when omitted. max_px : int, optional Maximum pixels on the longest side. Default is ``256``. """ data = _load_data(station, camera) if data is None: return Response('FITS not found', status=404) Loading @@ -281,46 +279,57 @@ async def image_panoramic(station, camera): if vmin is None or vmax is None: vmin, vmax = _auto_range(data) thumb = _thumb(data, max_px=int(request.args.get('max_px', 256))) rgb = _apply_viridis(thumb, vmin, vmax) png = array_to_png(rgb) max_px = int(request.args.get('max_px', 256)) rgb = _apply_viridis(data, vmin, vmax) png = array_to_png(rgb, thumbnail_size=max_px) return Response(png, mimetype='image/png', headers={'Cache-Control': 'no-store'}) @viewer_blueprint.route('/viewer/<station>/<camera>/tile') async def image_tile(station, camera): """ Return a raw float32 arraybuffer tile centred on (cx, cy). Return a raw ``float32`` arraybuffer tile centred on *(cx, cy)*. The response binary layout is:: [tw: int32][th: int32][data: float32 × tw × th] Parameters ---------- station : str The name of the station. Station identifier. camera : str The name of the camera. Camera identifier. Returns ------- Response The binary tile data with header. quart.Response Binary ``application/octet-stream`` payload. Query Parameters ---------------- cx : int, optional Tile centre X (image pixels). Defaults to image centre. cy : int, optional Tile centre Y (image pixels). Defaults to image centre. rx : int, optional Half-width of the tile. Default is ``50``. ry : int, optional Half-height of the tile. Default is ``50``. """ data = _load_data(station, camera) if data is None: return Response('FITS not found', status=404) h, w = data.shape[:2] cx = request.args.get('cx', default=w // 2, type=int) cy = request.args.get('cy', default=h // 2, type=int) rx = request.args.get('rx', default=50, type=int) # Default 15 half-width (30 total) ry = request.args.get('ry', default=50, type=int) # Default 10 half-height (20 total) x0 = max(0, cx - rx) x1 = min(w, cx + rx) y0 = max(0, cy - ry) y1 = min(h, cy + ry) rx = request.args.get('rx', default=50, type=int) ry = request.args.get('ry', default=50, type=int) x0, x1 = max(0, cx - rx), min(w, cx + rx) y0, y1 = max(0, cy - ry), min(h, cy + ry) tile = data[y0:y1, x0:x1] tw, th = tile.shape[1], tile.shape[0] Loading @@ -333,27 +342,32 @@ async def image_tile(station, camera): headers={'Cache-Control': 'no-store'}, ) @viewer_blueprint.route('/viewer/<station>/<camera>/fits') async def download_fits(station, camera): """ Serve the raw FITS file for download. Serve the raw FITS file as a download. Parameters ---------- station : str The name of the station. Station identifier. camera : str The name of the camera. Camera identifier. Returns ------- Response The raw FITS file attachment. quart.Response File attachment. """ path = _fits_path(station, camera) if path is None or not path.exists(): return Response('FITS not found', status=404) return await send_file(path, mimetype='image/fits', as_attachment=True, attachment_filename=path.name) return await send_file( path, mimetype='image/fits', as_attachment=True, attachment_filename=path.name, )
noctua/config/devices.ini +17 −12 Original line number Diff line number Diff line Loading @@ -8,18 +8,6 @@ ###################################### [teccam] module = mako class = Webcam node = MAKO [teccam2] module = mako class = Webcam node = MAKO2 ###################################### [stage] module = mercury class = Stage Loading @@ -30,6 +18,23 @@ pos_echelle = 100 ###################################### [teccam] module = stx class = Guider node = STX [teccam2] module = mako class = Guider node = MAKO2 [teccam3] module = mako class = Guider node = MAKO ###################################### [cam] module = stx class = Camera Loading
noctua/devices/mako.py +1 −1 Original line number Diff line number Diff line Loading @@ -84,7 +84,7 @@ class Mako(BaseDevice): self.vmb.__exit__(None, None, None) class Webcam(Mako): class Guider(Mako): """ High-level interface for Mako cameras with streaming and capture support. Loading
noctua/devices/stx.py +153 −24 Original line number Diff line number Diff line Loading @@ -3,6 +3,30 @@ """ Interface with a SBIG STX camera device Imager Internal External BinX BinX BinX BinY BinY BinY CoolerState CoolerState --- CCDTemperature CCDTemperature --- CCDTemperatureSetpoint --- --- CoolerPower CoolerPower --- CameraXSize CameraXSize CameraXSize CameraYSize CameraYSize CameraYSize ElectronsPerADU ElectronsPerADU ElectronsPerADU FullWellCapacity FullWellCapacity FullWellCapacity AmbientTemperature AmbientTemperature AmbientTemperature MaxADU MaxADU MaxADU MaxBinX MaxBinX MaxBinX MaxBinY MaxBinY MaxBinY StartX StartX StartX StartY StartY StartY NumX NumX NumX NumY NumY NumY PixelSizeX PixelSizeX PixelSizeX PixelSizeY PixelSizeY PixelSizeY """ # System modules Loading Loading @@ -158,9 +182,17 @@ class Camera(STX): High-level interface for the SBIG STX camera imaging CCD. """ def __init__(self, url): """ Constructor """ super().__init__(url) self.cam = "Imager" def abort(self): """Aborts the current exposure.""" self.put("ImagerAbortExposure") self.put(f"{self.cam}AbortExposure") def start(self, duration, frametype, datetime=None): """ Loading Loading @@ -190,7 +222,7 @@ class Camera(STX): params = {"Duration": duration, "FrameType": frametype, "DateTime": datetime} self.put("ImagerStartExposure", params=params) self.put(f"{self.cam}StartExposure", params=params) def download(self, filepath=temp_fits): Loading Loading @@ -246,14 +278,14 @@ class Camera(STX): "NumY": height } self.put("ImagerSetSettings", params=params) self.put(f"{self.cam}SetSettings", params=params) def full_frame(self): """Sets the camera to use the full sensor area.""" params = ["CameraXSize", "CameraYSize"] try: cam_x, cam_y = self.get("ImagerGetSettings", params=params) cam_x, cam_y = self.get(f"{self.cam}GetSettings", params=params) except ValueError as e: msg = f"Device exception: {e}" log.error(msg) Loading @@ -269,7 +301,7 @@ class Camera(STX): params = ["CameraXSize", "CameraYSize"] try: cam_x, cam_y = self.get("ImagerGetSettings", params=params) cam_x, cam_y = self.get(f"{self.cam}GetSettings", params=params) except ValueError as e: msg = f"Device exception: {e}" log.error(msg) Loading @@ -290,7 +322,7 @@ class Camera(STX): params = ["CameraXSize", "CameraYSize"] try: cam_x, cam_y = self.get("ImagerGetSettings", params=params) cam_x, cam_y = self.get(f"{self.cam}GetSettings", params=params) except ValueError as e: msg = f"Device exception: {e}" log.error(msg) Loading @@ -312,7 +344,7 @@ class Camera(STX): params = ["BinX", "BinY"] try: binx, biny = self.get("ImagerGetSettings", params=params) binx, biny = self.get(f"{self.cam}GetSettings", params=params) except ValueError as e: msg = f"Device exception: {e}" log.error(msg) Loading @@ -334,7 +366,7 @@ class Camera(STX): return params = {"BinX": b[0], "BinY": b[1]} self.put("ImagerSetSettings", params=params) self.put(f"{self.cam}SetSettings", params=params) @property def filter(self): Loading Loading @@ -365,7 +397,7 @@ class Camera(STX): """bool: The current state of the CCD cooler (True=On, False=Off).""" params = ["CoolerState"] res = self.get("ImagerGetSettings", params=params) res = self.get(f"{self.cam}GetSettings", params=params) if self.error: return None Loading @@ -382,14 +414,14 @@ class Camera(STX): return params = {"CoolerState": "1" if b else "0"} self.put("ImagerSetSettings", params=params) self.put(f"{self.cam}SetSettings", params=params) @property def temperature(self): """float: The current CCD temperature in degrees Celsius.""" params = ["CCDTemperature"] res = self.get("ImagerGetSettings", params=params) res = self.get(f"{self.cam}GetSettings", params=params) if self.error: return None Loading @@ -406,7 +438,7 @@ class Camera(STX): return params = {"CCDTemperatureSetpoint": t} self.put("ImagerSetSettings", params=params) self.put(f"{self.cam}SetSettings", params=params) @property def all(self): Loading @@ -426,7 +458,7 @@ class Camera(STX): "StartY", "NumX", "NumY"] res = self.get("ImagerGetSettings", params=params) res = self.get(f"{self.cam}GetSettings", params=params) if self.error or not res or len(res) != len(params): return {} Loading Loading @@ -477,7 +509,7 @@ class Camera(STX): """float: The ambient temperature in degrees Celsius.""" params = ["AmbientTemperature"] res = self.get("ImagerGetSettings", params=params) res = self.get(f"{self.cam}GetSettings", params=params) if self.error: return None Loading @@ -489,7 +521,7 @@ class Camera(STX): params = ["CameraXSize", "CameraYSize", "BinX", "BinY"] try: camx, camy, binx, biny = self.get("ImagerGetSettings", params=params) camx, camy, binx, biny = self.get(f"{self.cam}GetSettings", params=params) except ValueError as e: msg = f"Device exception: {e}" log.error(msg) Loading @@ -515,7 +547,7 @@ class Camera(STX): """int: The current cooler power level in percent.""" params = ["CoolerPower"] res = self.get("ImagerGetSettings", params=params) res = self.get(f"{self.cam}GetSettings", params=params) if self.error: return None Loading @@ -538,7 +570,7 @@ class Camera(STX): """list of int: The maximum sensor dimensions [X, Y] for current binning.""" params = ["CameraXSize", "CameraYSize"] res = self.get("ImagerGetSettings", params=params) res = self.get(f"{self.cam}GetSettings", params=params) if self.error: return [None, None] Loading @@ -550,7 +582,7 @@ class Camera(STX): """float: The current CCD temperature setpoint in degrees Celsius.""" params = ["CCDTemperatureSetpoint"] res = self.get("ImagerGetSettings", params=params) res = self.get(f"{self.cam}GetSettings", params=params) if self.error: return None Loading @@ -562,7 +594,7 @@ class Camera(STX): 0=Idle, 2=Exposing, 3=Reading, 5=Error. """ res = self.get("ImagerState") res = self.get(f"{self.cam}State") if self.error: return None Loading @@ -574,7 +606,7 @@ class Camera(STX): 1 if an image is ready for download, 0 otherwise. """ res = self.get("ImagerImageReady") res = self.get(f"{self.cam}ImageReady") if self.error: return None Loading @@ -596,7 +628,7 @@ class Camera(STX): params = ["StartX", "NumX", "BinX"] try: startx, numx, binx = self.get("ImagerGetSettings", params=params) startx, numx, binx = self.get(f"{self.cam}GetSettings", params=params) except ValueError as e: msg = f"Device exception: {e}" log.error(msg) Loading @@ -615,7 +647,7 @@ class Camera(STX): params = ["StartY", "NumY", "BinY"] try: starty, numy, biny = self.get("ImagerGetSettings", params=params) starty, numy, biny = self.get(f"{self.cam}GetSettings", params=params) except ValueError as e: msg = f"Device exception: {e}" log.error(msg) Loading @@ -635,7 +667,7 @@ class Camera(STX): params = ["StartX", "StartY", "BinX", "BinY"] try: startx, starty, binx, biny = self.get( "ImagerGetSettings", params=params) f"{self.cam}GetSettings", params=params) except ValueError as e: msg = f"Device exception: {e}" log.error(msg) Loading @@ -653,7 +685,7 @@ class Camera(STX): params = ["StartX", "StartY", "NumX", "NumY", "BinX", "BinY"] try: startx, starty, numx, numy, binx, biny = self.get( "ImagerGetSettings", params=params) f"{self.cam}GetSettings", params=params) except ValueError as e: msg = f"Device exception: {e}" log.error(msg) Loading @@ -665,3 +697,100 @@ class Camera(STX): x_end = (int(startx) + int(numx)) // int(binx) y_end = (int(starty) + int(numy)) // int(biny) return [x_end, y_end] class Guider(Camera): """ Guiding Teccam. """ def __init__(self, url): """ Constructor """ super().__init__(url) self.cam = "ExtGuider" @property def cooler(self): """bool: The current state of the CCD cooler (True=On, False=Off).""" log.warning(f"Not available in External Guider") return None @cooler.setter def cooler(self, b): """bool: The current state of the CCD cooler (True=On, False=Off).""" log.warning(f"Not available in External Guider") return None @property def temperature(self): """float: The current CCD temperature in degrees Celsius.""" log.warning(f"Not available in External Guider") return None @temperature.setter def temperature(self, t): log.warning(f"Not available in External Guider") return None @property def all(self): """dict: A comprehensive dictionary of the current camera state.""" params = [ "AmbientTemperature", "BinX", "BinY", "CameraXSize", "CameraYSize", "StartX", "StartY", "NumX", "NumY"] res = self.get(f"{self.cam}GetSettings", params=params) if self.error or not res or len(res) != len(params): return {} try: ambient, binx, biny, camx, camy, startx, starty, numx, numy = res b_x, b_y = int(binx), int(biny) x_start, y_start = int(startx) // b_x, int(starty) // b_y x_end = (int(startx) + int(numx)) // b_x y_end = (int(starty) + int(numy)) // b_y except ValueError as e: msg = f"Device exception: {e}" log.error(msg) self.error.append(msg) if self.error: return {"ambient": None, "binning": [None, None], "max_range": [None, None], "xystart": [None, None], "xyend": [None, None], "xrange": [None, None], "yrange": [None, None], "center": [None, None], } return {"ambient": round(ambient, 1), "binning": [b_x, b_y], "max_range": [int(camx) // b_x, int(camy) // b_y], "xystart": [x_start, y_start], "xyend": [x_end, y_end], "xrange": [x_start, x_end], "yrange": [y_start, y_end], "center": [int(camx) // b_x // 2, int(camy) // b_y // 2], }
noctua/utils/image.py +97 −49 File changed.Preview size limit exceeded, changes collapsed. Show changes