Loading noctua/devices/mako.py +39 −62 Original line number Diff line number Diff line Loading @@ -52,13 +52,11 @@ class Mako(BaseDevice): """ if self._cam is None: # Manually enter VmbSystem and Camera contexts self.vmb.__enter__() try: self._cam = self.vmb.get_camera_by_id(self.id) self._cam.__enter__() # Setup GigE packet size once at connection try: stream = self._cam.get_streams()[0] stream.GVSPAdjustPacketSize.run() Loading @@ -75,7 +73,7 @@ class Mako(BaseDevice): def get(self, feature_name): """ Get a specific camera feature value. Get a specific camera feature object. Parameters ---------- Loading @@ -84,8 +82,7 @@ class Mako(BaseDevice): """ cam = self._check_connection() feature = getattr(cam, feature_name) return feature.get() return getattr(cam, feature_name) def put(self, feature_name, value): Loading Loading @@ -132,6 +129,7 @@ class Webcam(Mako): self._last_frame = None self._lock = threading.Lock() self._range = [0, 255] self._show = False def _frame_handler(self, cam, stream, frame): Loading @@ -141,7 +139,7 @@ class Webcam(Mako): if frame.get_status() == FrameStatus.Complete: with self._lock: # Copy the frame data to allow the SDK to reuse the buffer # Copy the frame data for persistent access self._last_frame = frame.as_numpy_ndarray().copy() cam.queue_frame(frame) Loading Loading @@ -170,6 +168,7 @@ class Webcam(Mako): cam.start_streaming(handler=self._frame_handler, buffer_count=5) self._streaming = True else: self._show = False cam.stop_streaming() self._streaming = False Loading Loading @@ -209,61 +208,25 @@ class Webcam(Mako): return filename def save_fits(self, filename=None): def save_fits(self, filename="temp.fits"): """ Save the current frame as a FITS file. """ data = self.image if data is not None: if filename is None: filename = datetime.utcnow().strftime("%Y-%m-%dT%H_%M_%S.fits") raw = self.image if raw is not None: data = np.reshape(raw, (1024, 1280)) filename = datetime.utcnow().strftime(filename) hdu = fits.PrimaryHDU(data) hdu.writeto(filename, overwrite=True) return filename @property def range(self): """ Get/Set software dynamic range [min, max] for PNG conversion (0-255). """ return self._range @range.setter def range(self, r): """ Set the dynamic range limits. """ self._range = r @property def autowhite(self): """ Get/Set white balance auto status ('Continuous', 'Off', 'Once'). """ return self.get("BalanceWhiteAuto") @autowhite.setter def autowhite(self, value): """ Set white balance auto status. """ self.put("BalanceWhiteAuto", value) @property def autoexpose(self): """ Get/Set exposure auto status ('Continuous', 'Off', 'Once'). Get exposure auto status and available options. """ return self.get("ExposureAuto").as_tuple() Loading @@ -272,25 +235,39 @@ class Webcam(Mako): @autoexpose.setter def autoexpose(self, value): """ Set exposure auto status. ('Continuous', 'Off', 'Once') Set exposure auto status ('Continuous', 'Off', 'Once'). """ self.put("ExposureAuto", value) @property def exptime(self): def show_stream(self): """ Get/Set exposure time in microseconds. Open an OpenCV window in a separate thread for visual monitoring. """ return self.get("ExposureTime") if not self._streaming: print("Error: Streaming is False. Start it first.") return def display_loop(): self._show = True win_name = f"Mako Stream: {self.id}" cv2.namedWindow(win_name, cv2.WINDOW_AUTOSIZE) @exptime.setter def exptime(self, value): """ Set exposure time in microseconds. Only applicable if autoexpose is 'Off'. """ while self._show: if self._last_frame is not None: # Basic normalization for visualization fmax = self._last_frame.max() disp = (self._last_frame / (fmax if fmax > 0 else 1) * 255).astype(np.uint8) cv2.imshow(win_name, disp) if cv2.waitKey(1) & 0xFF == ord('q'): break cv2.destroyWindow(win_name) self._show = False self.put("ExposureTime", float(value)) thread = threading.Thread(target=display_loop, daemon=True) thread.start() print("Display thread started. Press 'q' in the window to close it.") Loading
noctua/devices/mako.py +39 −62 Original line number Diff line number Diff line Loading @@ -52,13 +52,11 @@ class Mako(BaseDevice): """ if self._cam is None: # Manually enter VmbSystem and Camera contexts self.vmb.__enter__() try: self._cam = self.vmb.get_camera_by_id(self.id) self._cam.__enter__() # Setup GigE packet size once at connection try: stream = self._cam.get_streams()[0] stream.GVSPAdjustPacketSize.run() Loading @@ -75,7 +73,7 @@ class Mako(BaseDevice): def get(self, feature_name): """ Get a specific camera feature value. Get a specific camera feature object. Parameters ---------- Loading @@ -84,8 +82,7 @@ class Mako(BaseDevice): """ cam = self._check_connection() feature = getattr(cam, feature_name) return feature.get() return getattr(cam, feature_name) def put(self, feature_name, value): Loading Loading @@ -132,6 +129,7 @@ class Webcam(Mako): self._last_frame = None self._lock = threading.Lock() self._range = [0, 255] self._show = False def _frame_handler(self, cam, stream, frame): Loading @@ -141,7 +139,7 @@ class Webcam(Mako): if frame.get_status() == FrameStatus.Complete: with self._lock: # Copy the frame data to allow the SDK to reuse the buffer # Copy the frame data for persistent access self._last_frame = frame.as_numpy_ndarray().copy() cam.queue_frame(frame) Loading Loading @@ -170,6 +168,7 @@ class Webcam(Mako): cam.start_streaming(handler=self._frame_handler, buffer_count=5) self._streaming = True else: self._show = False cam.stop_streaming() self._streaming = False Loading Loading @@ -209,61 +208,25 @@ class Webcam(Mako): return filename def save_fits(self, filename=None): def save_fits(self, filename="temp.fits"): """ Save the current frame as a FITS file. """ data = self.image if data is not None: if filename is None: filename = datetime.utcnow().strftime("%Y-%m-%dT%H_%M_%S.fits") raw = self.image if raw is not None: data = np.reshape(raw, (1024, 1280)) filename = datetime.utcnow().strftime(filename) hdu = fits.PrimaryHDU(data) hdu.writeto(filename, overwrite=True) return filename @property def range(self): """ Get/Set software dynamic range [min, max] for PNG conversion (0-255). """ return self._range @range.setter def range(self, r): """ Set the dynamic range limits. """ self._range = r @property def autowhite(self): """ Get/Set white balance auto status ('Continuous', 'Off', 'Once'). """ return self.get("BalanceWhiteAuto") @autowhite.setter def autowhite(self, value): """ Set white balance auto status. """ self.put("BalanceWhiteAuto", value) @property def autoexpose(self): """ Get/Set exposure auto status ('Continuous', 'Off', 'Once'). Get exposure auto status and available options. """ return self.get("ExposureAuto").as_tuple() Loading @@ -272,25 +235,39 @@ class Webcam(Mako): @autoexpose.setter def autoexpose(self, value): """ Set exposure auto status. ('Continuous', 'Off', 'Once') Set exposure auto status ('Continuous', 'Off', 'Once'). """ self.put("ExposureAuto", value) @property def exptime(self): def show_stream(self): """ Get/Set exposure time in microseconds. Open an OpenCV window in a separate thread for visual monitoring. """ return self.get("ExposureTime") if not self._streaming: print("Error: Streaming is False. Start it first.") return def display_loop(): self._show = True win_name = f"Mako Stream: {self.id}" cv2.namedWindow(win_name, cv2.WINDOW_AUTOSIZE) @exptime.setter def exptime(self, value): """ Set exposure time in microseconds. Only applicable if autoexpose is 'Off'. """ while self._show: if self._last_frame is not None: # Basic normalization for visualization fmax = self._last_frame.max() disp = (self._last_frame / (fmax if fmax > 0 else 1) * 255).astype(np.uint8) cv2.imshow(win_name, disp) if cv2.waitKey(1) & 0xFF == ord('q'): break cv2.destroyWindow(win_name) self._show = False self.put("ExposureTime", float(value)) thread = threading.Thread(target=display_loop, daemon=True) thread.start() print("Display thread started. Press 'q' in the window to close it.")