Loading noctua/devices/mercury.py +50 −74 Original line number Diff line number Diff line Loading @@ -96,21 +96,6 @@ class Mercury(BaseDevice): return self.pidevice def _close(self): """Reset connection state so _check_connection() will reconnect on next call.""" if self.pidevice is not None: try: self.pidevice.__exit__(None, None, None) except Exception: pass self.pidevice = None if self._gateway is not None: try: self._gateway.__exit__(None, None, None) except Exception: pass self._gateway = None def __del__(self): """ Loading Loading @@ -159,26 +144,20 @@ class Mercury(BaseDevice): The GCS query command name without 'q'. """ for attempt in range(2): pidevice = self._check_connection() if pidevice is None: return None method = getattr(pidevice, f"q{command.upper()}", None) if method is None: log.error(f"Command not found: q{command.upper()}") return None method = getattr(pidevice, f"q{command.upper()}") try: if command == "ERR": if (command == "ERR"): return method() else: return method(self.axis)[self.axis] except GCSError as e: log.error(f"Error: {e}") return None except (BrokenPipeError, OSError) as e: log.warning(f"Mercury: connection lost ({e}), reconnecting…") self._close() self.error.append("Connection lost after reconnect attempt") except AttributeError as e: log.error(f"Command not found: {e}") return None Loading @@ -192,23 +171,17 @@ class Mercury(BaseDevice): The GCS command name. """ for attempt in range(2): pidevice = self._check_connection() if pidevice is None: return None method = getattr(pidevice, command.upper(), None) if method is None: log.error(f"Command not found: {command.upper()}") return None method = getattr(pidevice, command.upper()) try: return method(self.axis, *args) except GCSError as e: log.error(f"Error: {e}") return None except (BrokenPipeError, OSError) as e: log.warning(f"Mercury: connection lost ({e}), reconnecting…") self._close() self.error.append("Connection lost after reconnect attempt") except AttributeError as e: log.error(f"Command not found: {e}") return None def wait(self): Loading @@ -229,13 +202,9 @@ class Mercury(BaseDevice): moving = status_dict[self.axis] except GCSError as e: log.warning(e) except (BrokenPipeError, OSError) as e: log.warning(f"Mercury: connection lost during wait ({e})") self._close() break err = self.get("ERR") if err and err != 0: err = pidevice.qERR() if err != 0: log.warning(err) time.sleep(0.2) Loading @@ -245,21 +214,34 @@ class Mercury(BaseDevice): Perform stage initialization sequence with explicit error clearing. """ # Clear any previous errors (reconnects automatically if socket is stale) self.get("ERR") pidevice = self._check_connection() if pidevice is None: return # Clear any previous errors pidevice.qERR() # Servo on self.put("SVO", True) # Find positive limit self.put("FPL") self.wait() # Find negative limit self.put("FNL") self.wait() def abort(self): """Abort stage motion.""" """ Perform stage initialization sequence with explicit error clearing. """ pidevice = self._check_connection() if pidevice is None: return # Clear any previous errors self.get("ERR") pidevice.qERR() # STOP! self.put("STP", True) Loading Loading @@ -296,16 +278,10 @@ class Stage(Mercury): True if moving, False otherwise. """ for attempt in range(2): pidevice = self._check_connection() if pidevice is None: return None try: return pidevice.IsMoving(self.axis)[self.axis] except (BrokenPipeError, OSError) as e: log.warning(f"Mercury: connection lost ({e}), reconnecting…") self._close() return None @property Loading Loading
noctua/devices/mercury.py +50 −74 Original line number Diff line number Diff line Loading @@ -96,21 +96,6 @@ class Mercury(BaseDevice): return self.pidevice def _close(self): """Reset connection state so _check_connection() will reconnect on next call.""" if self.pidevice is not None: try: self.pidevice.__exit__(None, None, None) except Exception: pass self.pidevice = None if self._gateway is not None: try: self._gateway.__exit__(None, None, None) except Exception: pass self._gateway = None def __del__(self): """ Loading Loading @@ -159,26 +144,20 @@ class Mercury(BaseDevice): The GCS query command name without 'q'. """ for attempt in range(2): pidevice = self._check_connection() if pidevice is None: return None method = getattr(pidevice, f"q{command.upper()}", None) if method is None: log.error(f"Command not found: q{command.upper()}") return None method = getattr(pidevice, f"q{command.upper()}") try: if command == "ERR": if (command == "ERR"): return method() else: return method(self.axis)[self.axis] except GCSError as e: log.error(f"Error: {e}") return None except (BrokenPipeError, OSError) as e: log.warning(f"Mercury: connection lost ({e}), reconnecting…") self._close() self.error.append("Connection lost after reconnect attempt") except AttributeError as e: log.error(f"Command not found: {e}") return None Loading @@ -192,23 +171,17 @@ class Mercury(BaseDevice): The GCS command name. """ for attempt in range(2): pidevice = self._check_connection() if pidevice is None: return None method = getattr(pidevice, command.upper(), None) if method is None: log.error(f"Command not found: {command.upper()}") return None method = getattr(pidevice, command.upper()) try: return method(self.axis, *args) except GCSError as e: log.error(f"Error: {e}") return None except (BrokenPipeError, OSError) as e: log.warning(f"Mercury: connection lost ({e}), reconnecting…") self._close() self.error.append("Connection lost after reconnect attempt") except AttributeError as e: log.error(f"Command not found: {e}") return None def wait(self): Loading @@ -229,13 +202,9 @@ class Mercury(BaseDevice): moving = status_dict[self.axis] except GCSError as e: log.warning(e) except (BrokenPipeError, OSError) as e: log.warning(f"Mercury: connection lost during wait ({e})") self._close() break err = self.get("ERR") if err and err != 0: err = pidevice.qERR() if err != 0: log.warning(err) time.sleep(0.2) Loading @@ -245,21 +214,34 @@ class Mercury(BaseDevice): Perform stage initialization sequence with explicit error clearing. """ # Clear any previous errors (reconnects automatically if socket is stale) self.get("ERR") pidevice = self._check_connection() if pidevice is None: return # Clear any previous errors pidevice.qERR() # Servo on self.put("SVO", True) # Find positive limit self.put("FPL") self.wait() # Find negative limit self.put("FNL") self.wait() def abort(self): """Abort stage motion.""" """ Perform stage initialization sequence with explicit error clearing. """ pidevice = self._check_connection() if pidevice is None: return # Clear any previous errors self.get("ERR") pidevice.qERR() # STOP! self.put("STP", True) Loading Loading @@ -296,16 +278,10 @@ class Stage(Mercury): True if moving, False otherwise. """ for attempt in range(2): pidevice = self._check_connection() if pidevice is None: return None try: return pidevice.IsMoving(self.axis)[self.axis] except (BrokenPipeError, OSError) as e: log.warning(f"Mercury: connection lost ({e}), reconnecting…") self._close() return None @property Loading