Loading csp-lmc-common/simulators/DeviceTestMaster/DeviceTestMaster.py +24 −8 Original line number Diff line number Diff line Loading @@ -121,10 +121,14 @@ class DeviceTestMaster(with_metaclass(DeviceMeta,SKAMaster)): if elapsed_time > self._duration_expected['on']: break if elapsed_time > 1: self._cmd_progress['standby'] = elapsed_time* 100/self._duration_expected['on'] self.push_change_event("onCmdProgress", self._cmd_progress['on']) self._cmd_progress['on'] = elapsed_time* 100/self._duration_expected['on'] print(self._cmd_progress['on']) self.push_change_event("onCommandProgress", int(self._cmd_progress['on'])) time.sleep(1) self.set_state(tango.DevState.ON) self._health_state = HealthState.DEGRADED self.push_change_event("onCommandProgress", 100) print("Final state:", self.get_state()) print("End On command...wait") def standby_subelement(self): Loading @@ -138,8 +142,11 @@ class DeviceTestMaster(with_metaclass(DeviceMeta,SKAMaster)): break if elapsed_time > 1: self._cmd_progress['standby'] = elapsed_time* 100/self._duration_expected['standby'] self.push_change_event("standbyCmdProgress", self._cmd_progress['standby']) print(self._cmd_progress['standby']) self.push_change_event("standbyCommandProgress", int(self._cmd_progress['standby'])) time.sleep(1) self.set_state(tango.DevState.STANDBY) self.push_change_event("standbyCommandProgress", 100) print("End Standby command...wait") def off_subelement(self): Loading @@ -152,9 +159,11 @@ class DeviceTestMaster(with_metaclass(DeviceMeta,SKAMaster)): break if elapsed_time > 1: self._cmd_progress['off'] = elapsed_time* 100/self._duration_expected['ff'] self.push_change_event("offCmdProgress", self._cmd_progress['off']) self.set_state(tango.DevState.OFF) self.push_change_event("offCommandProgress", int(self._cmd_progress['off'])) time.sleep(1) self._health_state = HealthState.UNKNOWN self.set_state(tango.DevState.OFF) self.push_change_event("offCommandProgress", 100) def init_device(self): SKAMaster.init_device(self) Loading @@ -176,6 +185,7 @@ class DeviceTestMaster(with_metaclass(DeviceMeta,SKAMaster)): 'elementLoggingLevel': ['__value'], 'centralLoggingLevel': ['__value'],}) self._admin_mode = int(attribute_properties['adminMode']['__value'][0]) # start a timer to simulate device intialization thread = threading.Timer(1, self.init_subelement) thread.start() Loading @@ -199,19 +209,19 @@ class DeviceTestMaster(with_metaclass(DeviceMeta,SKAMaster)): def read_onCommandProgress(self): # PROTECTED REGION ID(DeviceTestMaster.onCommandProgress_read) ENABLED START # """Return the onCommandProgress attribute.""" return self._cmd_progress['on'] return int(self._cmd_progress['on']) # PROTECTED REGION END # // DeviceTestMaster.onCommandProgress_read def read_offCommandProgress(self): # PROTECTED REGION ID(DeviceTestMaster.offCommandProgress_read) ENABLED START # """Return the offCommandProgress attribute.""" return self._cmd_progress['off'] return int(self._cmd_progress['off']) # PROTECTED REGION END # // DeviceTestMaster.offCommandProgress_read def read_standbyCommandProgress(self): # PROTECTED REGION ID(DeviceTestMaster.standbyCommandProgress_read) ENABLED START # """Return the standbyCommandProgress attribute.""" return self._cmd_progress['standby'] return int(self._cmd_progress['standby']) # PROTECTED REGION END # // DeviceTestMaster.standbyCommandProgress_read def read_onDurationExpected(self): Loading Loading @@ -261,6 +271,8 @@ class DeviceTestMaster(with_metaclass(DeviceMeta,SKAMaster)): def On(self): # PROTECTED REGION ID(DeviceTestMaster.On) ENABLED START # print("Processing On command") self._cmd_progress['on'] = 0 self._start_time = time.time() thread = threading.Timer(2, self.on_subelement) thread.start() # PROTECTED REGION END # // DeviceTestMaster.On Loading @@ -272,6 +284,8 @@ class DeviceTestMaster(with_metaclass(DeviceMeta,SKAMaster)): @DebugIt() def Off(self): # PROTECTED REGION ID(DeviceTestMaster.Off) ENABLED START # self._cmd_progress['off'] = 0 self._start_time = time.time() thread = threading.Timer(1, self.off_subelement) thread.start() # PROTECTED REGION END # // DeviceTestMaster.Off Loading @@ -285,6 +299,8 @@ class DeviceTestMaster(with_metaclass(DeviceMeta,SKAMaster)): @DebugIt() def Standby(self): # PROTECTED REGION ID(DeviceTestMaster.Standby) ENABLED START # self._cmd_progress['standby'] = 0 self._start_time = time.time() thread = threading.Timer(2, self.standby_subelement) thread.start() # PROTECTED REGION END # // DeviceTestMaster.Standby Loading Loading
csp-lmc-common/simulators/DeviceTestMaster/DeviceTestMaster.py +24 −8 Original line number Diff line number Diff line Loading @@ -121,10 +121,14 @@ class DeviceTestMaster(with_metaclass(DeviceMeta,SKAMaster)): if elapsed_time > self._duration_expected['on']: break if elapsed_time > 1: self._cmd_progress['standby'] = elapsed_time* 100/self._duration_expected['on'] self.push_change_event("onCmdProgress", self._cmd_progress['on']) self._cmd_progress['on'] = elapsed_time* 100/self._duration_expected['on'] print(self._cmd_progress['on']) self.push_change_event("onCommandProgress", int(self._cmd_progress['on'])) time.sleep(1) self.set_state(tango.DevState.ON) self._health_state = HealthState.DEGRADED self.push_change_event("onCommandProgress", 100) print("Final state:", self.get_state()) print("End On command...wait") def standby_subelement(self): Loading @@ -138,8 +142,11 @@ class DeviceTestMaster(with_metaclass(DeviceMeta,SKAMaster)): break if elapsed_time > 1: self._cmd_progress['standby'] = elapsed_time* 100/self._duration_expected['standby'] self.push_change_event("standbyCmdProgress", self._cmd_progress['standby']) print(self._cmd_progress['standby']) self.push_change_event("standbyCommandProgress", int(self._cmd_progress['standby'])) time.sleep(1) self.set_state(tango.DevState.STANDBY) self.push_change_event("standbyCommandProgress", 100) print("End Standby command...wait") def off_subelement(self): Loading @@ -152,9 +159,11 @@ class DeviceTestMaster(with_metaclass(DeviceMeta,SKAMaster)): break if elapsed_time > 1: self._cmd_progress['off'] = elapsed_time* 100/self._duration_expected['ff'] self.push_change_event("offCmdProgress", self._cmd_progress['off']) self.set_state(tango.DevState.OFF) self.push_change_event("offCommandProgress", int(self._cmd_progress['off'])) time.sleep(1) self._health_state = HealthState.UNKNOWN self.set_state(tango.DevState.OFF) self.push_change_event("offCommandProgress", 100) def init_device(self): SKAMaster.init_device(self) Loading @@ -176,6 +185,7 @@ class DeviceTestMaster(with_metaclass(DeviceMeta,SKAMaster)): 'elementLoggingLevel': ['__value'], 'centralLoggingLevel': ['__value'],}) self._admin_mode = int(attribute_properties['adminMode']['__value'][0]) # start a timer to simulate device intialization thread = threading.Timer(1, self.init_subelement) thread.start() Loading @@ -199,19 +209,19 @@ class DeviceTestMaster(with_metaclass(DeviceMeta,SKAMaster)): def read_onCommandProgress(self): # PROTECTED REGION ID(DeviceTestMaster.onCommandProgress_read) ENABLED START # """Return the onCommandProgress attribute.""" return self._cmd_progress['on'] return int(self._cmd_progress['on']) # PROTECTED REGION END # // DeviceTestMaster.onCommandProgress_read def read_offCommandProgress(self): # PROTECTED REGION ID(DeviceTestMaster.offCommandProgress_read) ENABLED START # """Return the offCommandProgress attribute.""" return self._cmd_progress['off'] return int(self._cmd_progress['off']) # PROTECTED REGION END # // DeviceTestMaster.offCommandProgress_read def read_standbyCommandProgress(self): # PROTECTED REGION ID(DeviceTestMaster.standbyCommandProgress_read) ENABLED START # """Return the standbyCommandProgress attribute.""" return self._cmd_progress['standby'] return int(self._cmd_progress['standby']) # PROTECTED REGION END # // DeviceTestMaster.standbyCommandProgress_read def read_onDurationExpected(self): Loading Loading @@ -261,6 +271,8 @@ class DeviceTestMaster(with_metaclass(DeviceMeta,SKAMaster)): def On(self): # PROTECTED REGION ID(DeviceTestMaster.On) ENABLED START # print("Processing On command") self._cmd_progress['on'] = 0 self._start_time = time.time() thread = threading.Timer(2, self.on_subelement) thread.start() # PROTECTED REGION END # // DeviceTestMaster.On Loading @@ -272,6 +284,8 @@ class DeviceTestMaster(with_metaclass(DeviceMeta,SKAMaster)): @DebugIt() def Off(self): # PROTECTED REGION ID(DeviceTestMaster.Off) ENABLED START # self._cmd_progress['off'] = 0 self._start_time = time.time() thread = threading.Timer(1, self.off_subelement) thread.start() # PROTECTED REGION END # // DeviceTestMaster.Off Loading @@ -285,6 +299,8 @@ class DeviceTestMaster(with_metaclass(DeviceMeta,SKAMaster)): @DebugIt() def Standby(self): # PROTECTED REGION ID(DeviceTestMaster.Standby) ENABLED START # self._cmd_progress['standby'] = 0 self._start_time = time.time() thread = threading.Timer(2, self.standby_subelement) thread.start() # PROTECTED REGION END # // DeviceTestMaster.Standby Loading