Loading csp-lmc-common/csp_lmc_common/CspSubarray.py +75 −42 Original line number Original line Diff line number Diff line Loading @@ -95,10 +95,12 @@ class CspSubarrayStateModel(SKASubarrayStateModel): self._action_breakdown["force_to_empty"] = ("force_to_empty", None) self._action_breakdown["force_to_empty"] = ("force_to_empty", None) self._action_breakdown["force_to_ready"] = ("force_to_ready", None) self._action_breakdown["force_to_ready"] = ("force_to_ready", None) self._action_breakdown["force_to_aborted"] = ("force_to_aborted", None) self._action_breakdown["force_to_aborted"] = ("force_to_aborted", None) self._action_breakdown["force_to_scanning"] = ("force_to_scanning", None) # add transtions to the ObservationStateMachine # add transtions to the ObservationStateMachine self._observation_state_machine.add_transition(trigger='force_to_empty', source='*', dest='EMPTY') self._observation_state_machine.add_transition(trigger='force_to_empty', source='*', dest='EMPTY') self._observation_state_machine.add_transition(trigger='force_to_idle', source='*', dest='IDLE') self._observation_state_machine.add_transition(trigger='force_to_idle', source='*', dest='IDLE') self._observation_state_machine.add_transition(trigger='force_to_ready', source='*', dest='READY') self._observation_state_machine.add_transition(trigger='force_to_ready', source='*', dest='READY') self._observation_state_machine.add_transition(trigger='force_to_scanning', source='*', dest='SCANNING') self._observation_state_machine.add_transition(trigger='force_to_aborted', source='*', dest='ABORTED') self._observation_state_machine.add_transition(trigger='force_to_aborted', source='*', dest='ABORTED') class CspSubarray(SKASubarray): class CspSubarray(SKASubarray): Loading Loading @@ -311,6 +313,7 @@ class CspSubarray(SKASubarray): # keys: the command name('on, 'off'...) # keys: the command name('on, 'off'...) # values: thread instance # values: thread instance device._command_thread = {} device._command_thread = {} device._stop_thread = defaultdict(lambda: False) # _end_scan_event: thread event to signal EndScan # _end_scan_event: thread event to signal EndScan device._end_scan_event = threading.Event() device._end_scan_event = threading.Event() Loading Loading @@ -416,15 +419,6 @@ class CspSubarray(SKASubarray): return (ResultCode.STARTED, "CSP Subarray Init STARTED") return (ResultCode.STARTED, "CSP Subarray Init STARTED") def initialize_thread(self): def initialize_thread(self): allowed_coupled = { 'RESOURCING': 'IDLE', 'RESOURCING': 'EMPTY', 'CONFIGURING': 'READY', 'SCANNING': 'READY', 'ABORTING': 'ABORTED', 'RESETTING': 'IDLE', 'RESTARTING': 'EMPTY' } try: try: with EnsureOmniThread(): with EnsureOmniThread(): self.logger.info("Init thread started") self.logger.info("Init thread started") Loading @@ -433,7 +427,6 @@ class CspSubarray(SKASubarray): device.force_cmd_obj = device.ForceObsStateTransitionCommand(*args) device.force_cmd_obj = device.ForceObsStateTransitionCommand(*args) on_handler = device.OnCommand(*args) on_handler = device.OnCommand(*args) timeout = 10 # seconds # Try connection with the CBF sub-array # Try connection with the CBF sub-array device.connect_to_subarray_subcomponent(device.CbfSubarray) device.connect_to_subarray_subcomponent(device.CbfSubarray) # TODO: add connection to CSPMaster to get information # TODO: add connection to CSPMaster to get information Loading @@ -443,27 +436,42 @@ class CspSubarray(SKASubarray): # put the device to OFF/EMPTY: no transition is allowed from INIT state # put the device to OFF/EMPTY: no transition is allowed from INIT state self.succeeded() self.succeeded() if device._sc_subarray_state[device.CbfSubarray] is not DevState.ON: if device._sc_subarray_state[device.CbfSubarray] is not DevState.ON: self.logger.info('devo leggerlo se sono alla prima inizializzazione') return return # put the device to ON/EMPTY # put the device to ON/EMPTY self.logger.info('Non devo leggerlo se sono alla prima inizializzazione') on_handler.succeeded() on_handler.succeeded() # CASE B: CSP is ON # CASE B: CSP is ON self.logger.info('CSP is already ON. Aligning to Sub-elements...') # start a loop in case of transitional states (CASE 2) timeout = 10 starting_time = time.time() target_obs_state = 'FAULT' while time.time() - starting_time < timeout: target_obs_state = device.obs_state_evaluator() target_obs_state = device.obs_state_evaluator() if target_obs_state in allowed_coupled.values() or (target_obs_state == 'FAULT'): if target_obs_state in ['RESOURCING', 'ABORTING', 'CONFIGURING', 'RESETTING','RESTARTING']: break; self.logger.info("Still to implement transitional state different from SCANNINNG") time.sleep(0.1) return #self.monitor_running_command(target_obs_state) self.logger.info('CSP is already ON. Aligning to Sub-elements...') device.set_csp_obs_state(target_obs_state) device.set_csp_obs_state(target_obs_state) if target_obs_state == 'SCANNING': return self.monitor_running_command(target_obs_state) except Exception as msg: except Exception as msg: self.logger.info(f'error in thread: {msg}') self.logger.info(f'error in thread: {msg}') def monitor_running_command(self, csp_obs_state): """ Helper method to monitor the CSP Subarray observing state at re-initialization if the observing state is in a transitional state. NOTE: Currently onlt the SCANNING obsState is handled. :param csp_obs_state: the CSP.LMC Subarray observing state. :type csp_obs_state: string """ device = self.target if csp_obs_state == 'SCANNING': handler = device.ScanCommand(*args) device._command_thread['scan'] = threading.Thread(target=handler.monitor_scan_execution, name="Thread-Scan", args=(device._sc_subarray_fqdn,)) device._cmd_execution_state['scan'] = CmdExecState.RUNNING self.logger.info("Start scan thread") device._command_thread['scan'].start() def obs_state_evaluator(self): def obs_state_evaluator(self): """ """ Helper method to evaluate the CSP Subarray observing state starting from the Helper method to evaluate the CSP Subarray observing state starting from the Loading @@ -475,7 +483,8 @@ class CspSubarray(SKASubarray): A component not ONLINE/MAINTENANCE does not contribute to the observing A component not ONLINE/MAINTENANCE does not contribute to the observing state value. state value. :return: The observing state string :return: The observing state. :rtype: string with the observing state name. """ """ target_obs_state = 'FAULT' target_obs_state = 'FAULT' obs_states_list = [] obs_states_list = [] Loading Loading @@ -653,6 +662,7 @@ class CspSubarray(SKASubarray): # the dictionary with the scan configuration # the dictionary with the scan configuration self.logger.info("ConfigureCommand at {}".format(time.time())) target_device = self.target target_device = self.target try: try: # if the stored configuration attribute is not empty, check # if the stored configuration attribute is not empty, check Loading Loading @@ -805,9 +815,10 @@ class CspSubarray(SKASubarray): device_done = defaultdict(lambda:False) device_done = defaultdict(lambda:False) # inside the end-less lop check the obsState of each sub-component # inside the end-less lop check the obsState of each sub-component device_list = input_arg[0] device_list = input_arg[0] self.logger.info("Trhead started at {}".format(time.time())) while True: while True: if target_device._abort_obs_event.is_set(): if target_device._abort_obs_event.is_set(): self.logger.info("Received and ABORT request during configuration") self.logger.info("Received and ABORT request during configuration {}".format(time.time())) command_progress = 0 command_progress = 0 for device in device_list: for device in device_list: self.logger.info("Current device {} obsState is {}".format(device, self.logger.info("Current device {} obsState is {}".format(device, Loading @@ -830,8 +841,9 @@ class CspSubarray(SKASubarray): target_device._reconfiguring)) target_device._reconfiguring)) if target_device._sc_subarray_obs_state[device] == dev_successful_state: if target_device._sc_subarray_obs_state[device] == dev_successful_state: if not target_device._reconfiguring: if not target_device._reconfiguring: self.logger.info("Command {} ended with success on device {}.".format(cmd_name, self.logger.info("Command {} ended with success on device {} at {}.".format(cmd_name, device)) device, time.time())) # update the list and number of device that completed the task # update the list and number of device that completed the task target_device._num_dev_completed_task[cmd_name] += 1 target_device._num_dev_completed_task[cmd_name] += 1 target_device._list_dev_completed_task[cmd_name].append(device) target_device._list_dev_completed_task[cmd_name].append(device) Loading Loading @@ -876,7 +888,7 @@ class CspSubarray(SKASubarray): self.logger.info("device {} is in {}: reconfiguring is:{}".format(device, ObsState(target_device._sc_subarray_obs_state[device]).name, self.logger.info("device {} is in {}: reconfiguring is:{}".format(device, ObsState(target_device._sc_subarray_obs_state[device]).name, target_device._reconfiguring)) target_device._reconfiguring)) if all(value == True for value in device_done.values()): if all(value == True for value in device_done.values()): self.logger.info("All devices have been handled!") self.logger.info("All devices have been handled at time {}!".format(time.time())) break break # check for global timeout expiration # check for global timeout expiration # may be this check is not necessary # may be this check is not necessary Loading @@ -890,6 +902,7 @@ class CspSubarray(SKASubarray): # end of the while loop # end of the while loop # acquire the mutex during the check of configuration success/failure. We don't want # acquire the mutex during the check of configuration success/failure. We don't want # to receive an boart during this phase otherwise could happen strange situation # to receive an boart during this phase otherwise could happen strange situation self.logger.info("GOING To lock mutex at {}".format(time.time())) with target_device._mutex_obs_state: with target_device._mutex_obs_state: # check for timeout/failure conditions on each sub-component # check for timeout/failure conditions on each sub-component if any(target_device._sc_subarray_cmd_exec_state[device][cmd_name] == CmdExecState.TIMEOUT for device in device_list): if any(target_device._sc_subarray_cmd_exec_state[device][cmd_name] == CmdExecState.TIMEOUT for device in device_list): Loading @@ -906,7 +919,7 @@ class CspSubarray(SKASubarray): if target_device._abort_obs_event.is_set(): if target_device._abort_obs_event.is_set(): if target_device._timeout_expired or target_device._failure_raised: if target_device._timeout_expired or target_device._failure_raised: return self.failed() return self.failed() self.logger.info("Abort configure ends with success!!") self.logger.info("Abort configure ends with success!! {}".format(time.time())) if all(target_device._sc_subarray_obs_state[fqdn] == ObsState.ABORTED for fqdn in device_list): if all(target_device._sc_subarray_obs_state[fqdn] == ObsState.ABORTED for fqdn in device_list): return target_device.abort_cmd_obj.succeeded() return target_device.abort_cmd_obj.succeeded() return target_device.abort_cmd_obj.abort_monitoring(device_list) return target_device.abort_cmd_obj.abort_monitoring(device_list) Loading @@ -923,7 +936,7 @@ class CspSubarray(SKASubarray): target_device._cmd_duration_measured[cmd_name] = time.time() - command_start_time target_device._cmd_duration_measured[cmd_name] = time.time() - command_start_time target_device._cmd_progress[cmd_name] = 100 target_device._cmd_progress[cmd_name] = 100 target_device._last_executed_command = cmd_name target_device._last_executed_command = cmd_name self.logger.info("Configure ends with success!!") self.logger.info("Configure ends with success!! {}".format(time.time())) return self.succeeded() return self.succeeded() def validate_scan_configuration(self, argin): def validate_scan_configuration(self, argin): Loading Loading @@ -1001,13 +1014,17 @@ class CspSubarray(SKASubarray): target_device._command_thread['scan'] = threading.Thread(target=self.monitor_scan_execution, target_device._command_thread['scan'] = threading.Thread(target=self.monitor_scan_execution, name="Thread-Scan", name="Thread-Scan", args=(target_device._sc_subarray_assigned_fqdn,)) args=(target_device._sc_subarray_assigned_fqdn,)) self.logger.info("Thread scan: {}".format(target_device._command_thread['scan'])) target_device._cmd_execution_state['scan'] = CmdExecState.RUNNING target_device._cmd_execution_state['scan'] = CmdExecState.RUNNING target_device._command_thread['scan'].start() target_device._command_thread['scan'].start() return (ResultCode.STARTED, "Scan command started") return (ResultCode.STARTED, "Scan command started") def monitor_scan_execution(self, device_list): def monitor_scan_execution(self, device_list): self.logger.info("Starting scan thread") cmd_name = 'scan' cmd_name = 'scan' target_device = self.target target_device = self.target target_device._end_scan_event.clear() target_device._abort_obs_event.clear() dev_successful_state = ObsState.READY dev_successful_state = ObsState.READY target_device._num_dev_completed_task[cmd_name] = 0 target_device._num_dev_completed_task[cmd_name] = 0 target_device._list_dev_completed_task[cmd_name] = [] target_device._list_dev_completed_task[cmd_name] = [] Loading @@ -1023,12 +1040,14 @@ class CspSubarray(SKASubarray): elapsed_time = 0 elapsed_time = 0 starting_time = time.time() starting_time = time.time() stop_scan = False stop_scan = False target_device._end_scan_event.clear() target_device._abort_obs_event.clear() # inside the end-less loop check the obsState of each sub-component # inside the end-less loop check the obsState of each sub-component while True: while True: self.logger.info("abort:{}".format(target_device._abort_obs_event.is_set())) #self.logger.info("abort:{}".format(target_device._abort_obs_event.is_set())) self.logger.info("end:{}".format(target_device._end_scan_event.is_set())) #self.logger.info("end:{}".format(target_device._end_scan_event.is_set())) if target_device._stop_thread[cmd_name]: target_device._stop_thread[cmd_name] = False self.logger.info("STOPPING THE THREAD!!!") return if target_device._end_scan_event.is_set() or target_device._abort_obs_event.is_set(): if target_device._end_scan_event.is_set() or target_device._abort_obs_event.is_set(): if not stop_scan: if not stop_scan: stop_scan = True stop_scan = True Loading Loading @@ -1068,7 +1087,6 @@ class CspSubarray(SKASubarray): def do(self): def do(self): target_device = self.target target_device = self.target device_list = target_device._sc_subarray_assigned_fqdn device_list = target_device._sc_subarray_assigned_fqdn self.logger.info("EndScan assigned_fqdn: {}".format(device_list)) if not any(target_device._sc_subarray_assigned_fqdn): if not any(target_device._sc_subarray_assigned_fqdn): # need to add a check also on PSTBeams belonging to subarray # need to add a check also on PSTBeams belonging to subarray device_list = target_device._sc_subarray_fqdn device_list = target_device._sc_subarray_fqdn Loading @@ -1087,7 +1105,7 @@ class CspSubarray(SKASubarray): self.logger.error("device {}: {}-{}".format(reply.dev_name(), err.desc, err.reason)) self.logger.error("device {}: {}-{}".format(reply.dev_name(), err.desc, err.reason)) else: else: (result_code,msg) = reply.get_data() (result_code,msg) = reply.get_data() self.logger.error("device {}: {}".format(reply.dev_name(), msg)) self.logger.info("device {}: {}".format(reply.dev_name(), msg)) if any(target_device._sc_subarray_obs_state[device]== ObsState.FAULT for device in device_list): if any(target_device._sc_subarray_obs_state[device]== ObsState.FAULT for device in device_list): return (ResultCode.FAILED, "EndScan Command FAILED") return (ResultCode.FAILED, "EndScan Command FAILED") return (ResultCode.OK, "EndScan command executed OK") return (ResultCode.OK, "EndScan command executed OK") Loading Loading @@ -1141,6 +1159,10 @@ class CspSubarray(SKASubarray): device_done = defaultdict(lambda:False) device_done = defaultdict(lambda:False) # inside the end-less loop check the obsState of each sub-component # inside the end-less loop check the obsState of each sub-component while True: while True: if target_device._stop_thread[cmd_name]: target_device._stop_thread[cmd_name] = False self.logger.info("STOPPING THE THREAD!!!") return time.sleep(0.1) time.sleep(0.1) for device in device_list: for device in device_list: if device_done[device] == True: if device_done[device] == True: Loading Loading @@ -1300,6 +1322,10 @@ class CspSubarray(SKASubarray): elapsed_time = 0 elapsed_time = 0 starting_time = time.time() starting_time = time.time() while True: while True: if target_device._stop_thread[cmd_name]: target_device._stop_thread[cmd_name] = False self.logger.info("STOPPING THE THREAD!!!") return for device in device_list: for device in device_list: if device_done[device] == True: if device_done[device] == True: continue continue Loading Loading @@ -1449,8 +1475,10 @@ class CspSubarray(SKASubarray): self.logger.info(log_msg) self.logger.info(log_msg) # update CSP sub-array SCM # update CSP sub-array SCM #07-2020: with the new base classes, transitions are handled via actions. #07-2020: with the new base classes, transitions are handled via actions. #if evt.attr_value.name.lower() in ["state", "healthstate", "adminmode", "obsstate"]: #if evt.attr_value.name.lower() in ["obsstate"]: # self.update_subarray_state() # self.update_subarray_state() if evt.attr_value.name.lower() in ["healthstate"]: self._update_subarray_health_state() except tango.DevFailed as df: except tango.DevFailed as df: self.logger.error(str(df.args[0].desc)) self.logger.error(str(df.args[0].desc)) except Exception as except_occurred: except Exception as except_occurred: Loading Loading @@ -1592,17 +1620,20 @@ class CspSubarray(SKASubarray): Class protected method. Class protected method. Retrieve the State attribute values of the CSP sub-elements and aggregate Retrieve the State attribute values of the CSP sub-elements and aggregate them to build up the CSP global state. them to build up the CSP global state. This method should be called only when no command is running. :param: None :param: None :return: None :return: None """ """ self.logger.info("update_subarray_state") self.logger.info("update_subarray_state") self._update_subarray_health_state() self._update_subarray_health_state() # check if a long-running command is in execution for key, thread in self._command_thread.items(): for key, thread in self._command_thread.items(): if thread.is_alive(): if thread.is_alive(): self.logger.info("Tread {} is running".format(key)) self.logger.info("Tread {} is running".format(key)) return return target_obs_state = self.obs_state_evaluator() target_obs_state = self.obs_state_evaluator() if target_obs_state != self._obs_state: self.set_csp_obs_state(target_obs_state) self.set_csp_obs_state(target_obs_state) def _update_subarray_health_state(self): def _update_subarray_health_state(self): Loading Loading @@ -1855,11 +1886,6 @@ class CspSubarray(SKASubarray): args = (self, self.state_model, self.logger) args = (self, self.state_model, self.logger) self.gotoidle_cmd_obj = self.GoToIdleCommand(*args) self.gotoidle_cmd_obj = self.GoToIdleCommand(*args) self.abort_cmd_obj = self.AbortCommand(*args) self.abort_cmd_obj = self.AbortCommand(*args) #self._assignresources_cmd_obj = self.AssignResourcesCommand(*args) #self._releaseresources_cmd_obj = self.ReleaseResourcesCommand(*args) #self.register_command_object("AssignResources", self.AssignResourcesCommand(*args)) #self.register_command_object("ReleaseResources", self.ReleaseResourcesCommand(*args)) #self.register_command_object("ReleaseAllResources", self.ReleaseAllResourcesCommand(*args)) self.register_command_object("GoToIdle", self.GoToIdleCommand(*args)) self.register_command_object("GoToIdle", self.GoToIdleCommand(*args)) self.register_command_object("Configure", self.ConfigureCommand(*args)) self.register_command_object("Configure", self.ConfigureCommand(*args)) self.register_command_object("Scan", self.ScanCommand(*args)) self.register_command_object("Scan", self.ScanCommand(*args)) Loading Loading @@ -2319,6 +2345,12 @@ class CspSubarray(SKASubarray): """ """ # PROTECTED REGION ID(CspSubarray.delete_device) ENABLED START # # PROTECTED REGION ID(CspSubarray.delete_device) ENABLED START # #release the allocated event resources #release the allocated event resources # check for running threads and stop them for key, thread in self._command_thread.items(): is_alive = thread.is_alive() if is_alive: self._stop_thread[key] = True thread.join() event_to_remove = {} event_to_remove = {} for fqdn in self._sc_subarray_fqdn: for fqdn in self._sc_subarray_fqdn: try: try: Loading Loading @@ -2842,6 +2874,7 @@ class CspSubarray(SKASubarray): :return:'DevVarLongStringArray' :return:'DevVarLongStringArray' """ """ self.logger.info("CALL ABORT at time {}".format(time.time())) with self._mutex_obs_state: with self._mutex_obs_state: handler = self.get_command_object("Abort") handler = self.get_command_object("Abort") (result_code, message) = handler() (result_code, message) = handler() Loading csp-lmc-mid/csp_lmc_mid/MidCspSubarrayBase.py +0 −1 Original line number Original line Diff line number Diff line Loading @@ -519,7 +519,6 @@ class MidCspSubarrayBase(CspSubarray): self.logger.info("Going to assign receptors {}".format(receptors_to_be_added)) self.logger.info("Going to assign receptors {}".format(receptors_to_be_added)) while True: while True: self.logger.info("device {} obs_state:{}".format(device, self._sc_subarray_obs_state[device])) if self._sc_subarray_obs_state[device] == ObsState.IDLE: if self._sc_subarray_obs_state[device] == ObsState.IDLE: self.logger.info("Reconfiguring is:{}".format(self._reconfiguring)) self.logger.info("Reconfiguring is:{}".format(self._reconfiguring)) assigned_receptors = self._receptors.assigned_to_subarray(self.SubID) assigned_receptors = self._receptors.assigned_to_subarray(self.SubID) Loading csp-lmc-mid/tests/integration/MidCspSubarray_test.py +58 −35 File changed.Preview size limit exceeded, changes collapsed. Show changes csp-lmc-mid/tests/unit/midcspsubarray_unit_test.py +0 −1 Original line number Original line Diff line number Diff line Loading @@ -543,7 +543,6 @@ def test_midcspsubarray_obsstate_AFTER_timeout_during_configuration(): 'dish': {'receptorIDList': receptor_list}} 'dish': {'receptorIDList': receptor_list}} json_config = json.dumps(param) json_config = json.dumps(param) tango_context.device.AssignResources(json_config) tango_context.device.AssignResources(json_config) #assert tango_context.device.obsState == ObsState.IDLE #assert tango_context.device.obsState == ObsState.IDLE configuration_string = load_json_file("test_ConfigureScan_ADR4.json") configuration_string = load_json_file("test_ConfigureScan_ADR4.json") tango_context.device.Configure(configuration_string) tango_context.device.Configure(configuration_string) Loading Loading
csp-lmc-common/csp_lmc_common/CspSubarray.py +75 −42 Original line number Original line Diff line number Diff line Loading @@ -95,10 +95,12 @@ class CspSubarrayStateModel(SKASubarrayStateModel): self._action_breakdown["force_to_empty"] = ("force_to_empty", None) self._action_breakdown["force_to_empty"] = ("force_to_empty", None) self._action_breakdown["force_to_ready"] = ("force_to_ready", None) self._action_breakdown["force_to_ready"] = ("force_to_ready", None) self._action_breakdown["force_to_aborted"] = ("force_to_aborted", None) self._action_breakdown["force_to_aborted"] = ("force_to_aborted", None) self._action_breakdown["force_to_scanning"] = ("force_to_scanning", None) # add transtions to the ObservationStateMachine # add transtions to the ObservationStateMachine self._observation_state_machine.add_transition(trigger='force_to_empty', source='*', dest='EMPTY') self._observation_state_machine.add_transition(trigger='force_to_empty', source='*', dest='EMPTY') self._observation_state_machine.add_transition(trigger='force_to_idle', source='*', dest='IDLE') self._observation_state_machine.add_transition(trigger='force_to_idle', source='*', dest='IDLE') self._observation_state_machine.add_transition(trigger='force_to_ready', source='*', dest='READY') self._observation_state_machine.add_transition(trigger='force_to_ready', source='*', dest='READY') self._observation_state_machine.add_transition(trigger='force_to_scanning', source='*', dest='SCANNING') self._observation_state_machine.add_transition(trigger='force_to_aborted', source='*', dest='ABORTED') self._observation_state_machine.add_transition(trigger='force_to_aborted', source='*', dest='ABORTED') class CspSubarray(SKASubarray): class CspSubarray(SKASubarray): Loading Loading @@ -311,6 +313,7 @@ class CspSubarray(SKASubarray): # keys: the command name('on, 'off'...) # keys: the command name('on, 'off'...) # values: thread instance # values: thread instance device._command_thread = {} device._command_thread = {} device._stop_thread = defaultdict(lambda: False) # _end_scan_event: thread event to signal EndScan # _end_scan_event: thread event to signal EndScan device._end_scan_event = threading.Event() device._end_scan_event = threading.Event() Loading Loading @@ -416,15 +419,6 @@ class CspSubarray(SKASubarray): return (ResultCode.STARTED, "CSP Subarray Init STARTED") return (ResultCode.STARTED, "CSP Subarray Init STARTED") def initialize_thread(self): def initialize_thread(self): allowed_coupled = { 'RESOURCING': 'IDLE', 'RESOURCING': 'EMPTY', 'CONFIGURING': 'READY', 'SCANNING': 'READY', 'ABORTING': 'ABORTED', 'RESETTING': 'IDLE', 'RESTARTING': 'EMPTY' } try: try: with EnsureOmniThread(): with EnsureOmniThread(): self.logger.info("Init thread started") self.logger.info("Init thread started") Loading @@ -433,7 +427,6 @@ class CspSubarray(SKASubarray): device.force_cmd_obj = device.ForceObsStateTransitionCommand(*args) device.force_cmd_obj = device.ForceObsStateTransitionCommand(*args) on_handler = device.OnCommand(*args) on_handler = device.OnCommand(*args) timeout = 10 # seconds # Try connection with the CBF sub-array # Try connection with the CBF sub-array device.connect_to_subarray_subcomponent(device.CbfSubarray) device.connect_to_subarray_subcomponent(device.CbfSubarray) # TODO: add connection to CSPMaster to get information # TODO: add connection to CSPMaster to get information Loading @@ -443,27 +436,42 @@ class CspSubarray(SKASubarray): # put the device to OFF/EMPTY: no transition is allowed from INIT state # put the device to OFF/EMPTY: no transition is allowed from INIT state self.succeeded() self.succeeded() if device._sc_subarray_state[device.CbfSubarray] is not DevState.ON: if device._sc_subarray_state[device.CbfSubarray] is not DevState.ON: self.logger.info('devo leggerlo se sono alla prima inizializzazione') return return # put the device to ON/EMPTY # put the device to ON/EMPTY self.logger.info('Non devo leggerlo se sono alla prima inizializzazione') on_handler.succeeded() on_handler.succeeded() # CASE B: CSP is ON # CASE B: CSP is ON self.logger.info('CSP is already ON. Aligning to Sub-elements...') # start a loop in case of transitional states (CASE 2) timeout = 10 starting_time = time.time() target_obs_state = 'FAULT' while time.time() - starting_time < timeout: target_obs_state = device.obs_state_evaluator() target_obs_state = device.obs_state_evaluator() if target_obs_state in allowed_coupled.values() or (target_obs_state == 'FAULT'): if target_obs_state in ['RESOURCING', 'ABORTING', 'CONFIGURING', 'RESETTING','RESTARTING']: break; self.logger.info("Still to implement transitional state different from SCANNINNG") time.sleep(0.1) return #self.monitor_running_command(target_obs_state) self.logger.info('CSP is already ON. Aligning to Sub-elements...') device.set_csp_obs_state(target_obs_state) device.set_csp_obs_state(target_obs_state) if target_obs_state == 'SCANNING': return self.monitor_running_command(target_obs_state) except Exception as msg: except Exception as msg: self.logger.info(f'error in thread: {msg}') self.logger.info(f'error in thread: {msg}') def monitor_running_command(self, csp_obs_state): """ Helper method to monitor the CSP Subarray observing state at re-initialization if the observing state is in a transitional state. NOTE: Currently onlt the SCANNING obsState is handled. :param csp_obs_state: the CSP.LMC Subarray observing state. :type csp_obs_state: string """ device = self.target if csp_obs_state == 'SCANNING': handler = device.ScanCommand(*args) device._command_thread['scan'] = threading.Thread(target=handler.monitor_scan_execution, name="Thread-Scan", args=(device._sc_subarray_fqdn,)) device._cmd_execution_state['scan'] = CmdExecState.RUNNING self.logger.info("Start scan thread") device._command_thread['scan'].start() def obs_state_evaluator(self): def obs_state_evaluator(self): """ """ Helper method to evaluate the CSP Subarray observing state starting from the Helper method to evaluate the CSP Subarray observing state starting from the Loading @@ -475,7 +483,8 @@ class CspSubarray(SKASubarray): A component not ONLINE/MAINTENANCE does not contribute to the observing A component not ONLINE/MAINTENANCE does not contribute to the observing state value. state value. :return: The observing state string :return: The observing state. :rtype: string with the observing state name. """ """ target_obs_state = 'FAULT' target_obs_state = 'FAULT' obs_states_list = [] obs_states_list = [] Loading Loading @@ -653,6 +662,7 @@ class CspSubarray(SKASubarray): # the dictionary with the scan configuration # the dictionary with the scan configuration self.logger.info("ConfigureCommand at {}".format(time.time())) target_device = self.target target_device = self.target try: try: # if the stored configuration attribute is not empty, check # if the stored configuration attribute is not empty, check Loading Loading @@ -805,9 +815,10 @@ class CspSubarray(SKASubarray): device_done = defaultdict(lambda:False) device_done = defaultdict(lambda:False) # inside the end-less lop check the obsState of each sub-component # inside the end-less lop check the obsState of each sub-component device_list = input_arg[0] device_list = input_arg[0] self.logger.info("Trhead started at {}".format(time.time())) while True: while True: if target_device._abort_obs_event.is_set(): if target_device._abort_obs_event.is_set(): self.logger.info("Received and ABORT request during configuration") self.logger.info("Received and ABORT request during configuration {}".format(time.time())) command_progress = 0 command_progress = 0 for device in device_list: for device in device_list: self.logger.info("Current device {} obsState is {}".format(device, self.logger.info("Current device {} obsState is {}".format(device, Loading @@ -830,8 +841,9 @@ class CspSubarray(SKASubarray): target_device._reconfiguring)) target_device._reconfiguring)) if target_device._sc_subarray_obs_state[device] == dev_successful_state: if target_device._sc_subarray_obs_state[device] == dev_successful_state: if not target_device._reconfiguring: if not target_device._reconfiguring: self.logger.info("Command {} ended with success on device {}.".format(cmd_name, self.logger.info("Command {} ended with success on device {} at {}.".format(cmd_name, device)) device, time.time())) # update the list and number of device that completed the task # update the list and number of device that completed the task target_device._num_dev_completed_task[cmd_name] += 1 target_device._num_dev_completed_task[cmd_name] += 1 target_device._list_dev_completed_task[cmd_name].append(device) target_device._list_dev_completed_task[cmd_name].append(device) Loading Loading @@ -876,7 +888,7 @@ class CspSubarray(SKASubarray): self.logger.info("device {} is in {}: reconfiguring is:{}".format(device, ObsState(target_device._sc_subarray_obs_state[device]).name, self.logger.info("device {} is in {}: reconfiguring is:{}".format(device, ObsState(target_device._sc_subarray_obs_state[device]).name, target_device._reconfiguring)) target_device._reconfiguring)) if all(value == True for value in device_done.values()): if all(value == True for value in device_done.values()): self.logger.info("All devices have been handled!") self.logger.info("All devices have been handled at time {}!".format(time.time())) break break # check for global timeout expiration # check for global timeout expiration # may be this check is not necessary # may be this check is not necessary Loading @@ -890,6 +902,7 @@ class CspSubarray(SKASubarray): # end of the while loop # end of the while loop # acquire the mutex during the check of configuration success/failure. We don't want # acquire the mutex during the check of configuration success/failure. We don't want # to receive an boart during this phase otherwise could happen strange situation # to receive an boart during this phase otherwise could happen strange situation self.logger.info("GOING To lock mutex at {}".format(time.time())) with target_device._mutex_obs_state: with target_device._mutex_obs_state: # check for timeout/failure conditions on each sub-component # check for timeout/failure conditions on each sub-component if any(target_device._sc_subarray_cmd_exec_state[device][cmd_name] == CmdExecState.TIMEOUT for device in device_list): if any(target_device._sc_subarray_cmd_exec_state[device][cmd_name] == CmdExecState.TIMEOUT for device in device_list): Loading @@ -906,7 +919,7 @@ class CspSubarray(SKASubarray): if target_device._abort_obs_event.is_set(): if target_device._abort_obs_event.is_set(): if target_device._timeout_expired or target_device._failure_raised: if target_device._timeout_expired or target_device._failure_raised: return self.failed() return self.failed() self.logger.info("Abort configure ends with success!!") self.logger.info("Abort configure ends with success!! {}".format(time.time())) if all(target_device._sc_subarray_obs_state[fqdn] == ObsState.ABORTED for fqdn in device_list): if all(target_device._sc_subarray_obs_state[fqdn] == ObsState.ABORTED for fqdn in device_list): return target_device.abort_cmd_obj.succeeded() return target_device.abort_cmd_obj.succeeded() return target_device.abort_cmd_obj.abort_monitoring(device_list) return target_device.abort_cmd_obj.abort_monitoring(device_list) Loading @@ -923,7 +936,7 @@ class CspSubarray(SKASubarray): target_device._cmd_duration_measured[cmd_name] = time.time() - command_start_time target_device._cmd_duration_measured[cmd_name] = time.time() - command_start_time target_device._cmd_progress[cmd_name] = 100 target_device._cmd_progress[cmd_name] = 100 target_device._last_executed_command = cmd_name target_device._last_executed_command = cmd_name self.logger.info("Configure ends with success!!") self.logger.info("Configure ends with success!! {}".format(time.time())) return self.succeeded() return self.succeeded() def validate_scan_configuration(self, argin): def validate_scan_configuration(self, argin): Loading Loading @@ -1001,13 +1014,17 @@ class CspSubarray(SKASubarray): target_device._command_thread['scan'] = threading.Thread(target=self.monitor_scan_execution, target_device._command_thread['scan'] = threading.Thread(target=self.monitor_scan_execution, name="Thread-Scan", name="Thread-Scan", args=(target_device._sc_subarray_assigned_fqdn,)) args=(target_device._sc_subarray_assigned_fqdn,)) self.logger.info("Thread scan: {}".format(target_device._command_thread['scan'])) target_device._cmd_execution_state['scan'] = CmdExecState.RUNNING target_device._cmd_execution_state['scan'] = CmdExecState.RUNNING target_device._command_thread['scan'].start() target_device._command_thread['scan'].start() return (ResultCode.STARTED, "Scan command started") return (ResultCode.STARTED, "Scan command started") def monitor_scan_execution(self, device_list): def monitor_scan_execution(self, device_list): self.logger.info("Starting scan thread") cmd_name = 'scan' cmd_name = 'scan' target_device = self.target target_device = self.target target_device._end_scan_event.clear() target_device._abort_obs_event.clear() dev_successful_state = ObsState.READY dev_successful_state = ObsState.READY target_device._num_dev_completed_task[cmd_name] = 0 target_device._num_dev_completed_task[cmd_name] = 0 target_device._list_dev_completed_task[cmd_name] = [] target_device._list_dev_completed_task[cmd_name] = [] Loading @@ -1023,12 +1040,14 @@ class CspSubarray(SKASubarray): elapsed_time = 0 elapsed_time = 0 starting_time = time.time() starting_time = time.time() stop_scan = False stop_scan = False target_device._end_scan_event.clear() target_device._abort_obs_event.clear() # inside the end-less loop check the obsState of each sub-component # inside the end-less loop check the obsState of each sub-component while True: while True: self.logger.info("abort:{}".format(target_device._abort_obs_event.is_set())) #self.logger.info("abort:{}".format(target_device._abort_obs_event.is_set())) self.logger.info("end:{}".format(target_device._end_scan_event.is_set())) #self.logger.info("end:{}".format(target_device._end_scan_event.is_set())) if target_device._stop_thread[cmd_name]: target_device._stop_thread[cmd_name] = False self.logger.info("STOPPING THE THREAD!!!") return if target_device._end_scan_event.is_set() or target_device._abort_obs_event.is_set(): if target_device._end_scan_event.is_set() or target_device._abort_obs_event.is_set(): if not stop_scan: if not stop_scan: stop_scan = True stop_scan = True Loading Loading @@ -1068,7 +1087,6 @@ class CspSubarray(SKASubarray): def do(self): def do(self): target_device = self.target target_device = self.target device_list = target_device._sc_subarray_assigned_fqdn device_list = target_device._sc_subarray_assigned_fqdn self.logger.info("EndScan assigned_fqdn: {}".format(device_list)) if not any(target_device._sc_subarray_assigned_fqdn): if not any(target_device._sc_subarray_assigned_fqdn): # need to add a check also on PSTBeams belonging to subarray # need to add a check also on PSTBeams belonging to subarray device_list = target_device._sc_subarray_fqdn device_list = target_device._sc_subarray_fqdn Loading @@ -1087,7 +1105,7 @@ class CspSubarray(SKASubarray): self.logger.error("device {}: {}-{}".format(reply.dev_name(), err.desc, err.reason)) self.logger.error("device {}: {}-{}".format(reply.dev_name(), err.desc, err.reason)) else: else: (result_code,msg) = reply.get_data() (result_code,msg) = reply.get_data() self.logger.error("device {}: {}".format(reply.dev_name(), msg)) self.logger.info("device {}: {}".format(reply.dev_name(), msg)) if any(target_device._sc_subarray_obs_state[device]== ObsState.FAULT for device in device_list): if any(target_device._sc_subarray_obs_state[device]== ObsState.FAULT for device in device_list): return (ResultCode.FAILED, "EndScan Command FAILED") return (ResultCode.FAILED, "EndScan Command FAILED") return (ResultCode.OK, "EndScan command executed OK") return (ResultCode.OK, "EndScan command executed OK") Loading Loading @@ -1141,6 +1159,10 @@ class CspSubarray(SKASubarray): device_done = defaultdict(lambda:False) device_done = defaultdict(lambda:False) # inside the end-less loop check the obsState of each sub-component # inside the end-less loop check the obsState of each sub-component while True: while True: if target_device._stop_thread[cmd_name]: target_device._stop_thread[cmd_name] = False self.logger.info("STOPPING THE THREAD!!!") return time.sleep(0.1) time.sleep(0.1) for device in device_list: for device in device_list: if device_done[device] == True: if device_done[device] == True: Loading Loading @@ -1300,6 +1322,10 @@ class CspSubarray(SKASubarray): elapsed_time = 0 elapsed_time = 0 starting_time = time.time() starting_time = time.time() while True: while True: if target_device._stop_thread[cmd_name]: target_device._stop_thread[cmd_name] = False self.logger.info("STOPPING THE THREAD!!!") return for device in device_list: for device in device_list: if device_done[device] == True: if device_done[device] == True: continue continue Loading Loading @@ -1449,8 +1475,10 @@ class CspSubarray(SKASubarray): self.logger.info(log_msg) self.logger.info(log_msg) # update CSP sub-array SCM # update CSP sub-array SCM #07-2020: with the new base classes, transitions are handled via actions. #07-2020: with the new base classes, transitions are handled via actions. #if evt.attr_value.name.lower() in ["state", "healthstate", "adminmode", "obsstate"]: #if evt.attr_value.name.lower() in ["obsstate"]: # self.update_subarray_state() # self.update_subarray_state() if evt.attr_value.name.lower() in ["healthstate"]: self._update_subarray_health_state() except tango.DevFailed as df: except tango.DevFailed as df: self.logger.error(str(df.args[0].desc)) self.logger.error(str(df.args[0].desc)) except Exception as except_occurred: except Exception as except_occurred: Loading Loading @@ -1592,17 +1620,20 @@ class CspSubarray(SKASubarray): Class protected method. Class protected method. Retrieve the State attribute values of the CSP sub-elements and aggregate Retrieve the State attribute values of the CSP sub-elements and aggregate them to build up the CSP global state. them to build up the CSP global state. This method should be called only when no command is running. :param: None :param: None :return: None :return: None """ """ self.logger.info("update_subarray_state") self.logger.info("update_subarray_state") self._update_subarray_health_state() self._update_subarray_health_state() # check if a long-running command is in execution for key, thread in self._command_thread.items(): for key, thread in self._command_thread.items(): if thread.is_alive(): if thread.is_alive(): self.logger.info("Tread {} is running".format(key)) self.logger.info("Tread {} is running".format(key)) return return target_obs_state = self.obs_state_evaluator() target_obs_state = self.obs_state_evaluator() if target_obs_state != self._obs_state: self.set_csp_obs_state(target_obs_state) self.set_csp_obs_state(target_obs_state) def _update_subarray_health_state(self): def _update_subarray_health_state(self): Loading Loading @@ -1855,11 +1886,6 @@ class CspSubarray(SKASubarray): args = (self, self.state_model, self.logger) args = (self, self.state_model, self.logger) self.gotoidle_cmd_obj = self.GoToIdleCommand(*args) self.gotoidle_cmd_obj = self.GoToIdleCommand(*args) self.abort_cmd_obj = self.AbortCommand(*args) self.abort_cmd_obj = self.AbortCommand(*args) #self._assignresources_cmd_obj = self.AssignResourcesCommand(*args) #self._releaseresources_cmd_obj = self.ReleaseResourcesCommand(*args) #self.register_command_object("AssignResources", self.AssignResourcesCommand(*args)) #self.register_command_object("ReleaseResources", self.ReleaseResourcesCommand(*args)) #self.register_command_object("ReleaseAllResources", self.ReleaseAllResourcesCommand(*args)) self.register_command_object("GoToIdle", self.GoToIdleCommand(*args)) self.register_command_object("GoToIdle", self.GoToIdleCommand(*args)) self.register_command_object("Configure", self.ConfigureCommand(*args)) self.register_command_object("Configure", self.ConfigureCommand(*args)) self.register_command_object("Scan", self.ScanCommand(*args)) self.register_command_object("Scan", self.ScanCommand(*args)) Loading Loading @@ -2319,6 +2345,12 @@ class CspSubarray(SKASubarray): """ """ # PROTECTED REGION ID(CspSubarray.delete_device) ENABLED START # # PROTECTED REGION ID(CspSubarray.delete_device) ENABLED START # #release the allocated event resources #release the allocated event resources # check for running threads and stop them for key, thread in self._command_thread.items(): is_alive = thread.is_alive() if is_alive: self._stop_thread[key] = True thread.join() event_to_remove = {} event_to_remove = {} for fqdn in self._sc_subarray_fqdn: for fqdn in self._sc_subarray_fqdn: try: try: Loading Loading @@ -2842,6 +2874,7 @@ class CspSubarray(SKASubarray): :return:'DevVarLongStringArray' :return:'DevVarLongStringArray' """ """ self.logger.info("CALL ABORT at time {}".format(time.time())) with self._mutex_obs_state: with self._mutex_obs_state: handler = self.get_command_object("Abort") handler = self.get_command_object("Abort") (result_code, message) = handler() (result_code, message) = handler() Loading
csp-lmc-mid/csp_lmc_mid/MidCspSubarrayBase.py +0 −1 Original line number Original line Diff line number Diff line Loading @@ -519,7 +519,6 @@ class MidCspSubarrayBase(CspSubarray): self.logger.info("Going to assign receptors {}".format(receptors_to_be_added)) self.logger.info("Going to assign receptors {}".format(receptors_to_be_added)) while True: while True: self.logger.info("device {} obs_state:{}".format(device, self._sc_subarray_obs_state[device])) if self._sc_subarray_obs_state[device] == ObsState.IDLE: if self._sc_subarray_obs_state[device] == ObsState.IDLE: self.logger.info("Reconfiguring is:{}".format(self._reconfiguring)) self.logger.info("Reconfiguring is:{}".format(self._reconfiguring)) assigned_receptors = self._receptors.assigned_to_subarray(self.SubID) assigned_receptors = self._receptors.assigned_to_subarray(self.SubID) Loading
csp-lmc-mid/tests/integration/MidCspSubarray_test.py +58 −35 File changed.Preview size limit exceeded, changes collapsed. Show changes
csp-lmc-mid/tests/unit/midcspsubarray_unit_test.py +0 −1 Original line number Original line Diff line number Diff line Loading @@ -543,7 +543,6 @@ def test_midcspsubarray_obsstate_AFTER_timeout_during_configuration(): 'dish': {'receptorIDList': receptor_list}} 'dish': {'receptorIDList': receptor_list}} json_config = json.dumps(param) json_config = json.dumps(param) tango_context.device.AssignResources(json_config) tango_context.device.AssignResources(json_config) #assert tango_context.device.obsState == ObsState.IDLE #assert tango_context.device.obsState == ObsState.IDLE configuration_string = load_json_file("test_ConfigureScan_ADR4.json") configuration_string = load_json_file("test_ConfigureScan_ADR4.json") tango_context.device.Configure(configuration_string) tango_context.device.Configure(configuration_string) Loading