Commit 5de54e49 authored by Elisabetta Giani's avatar Elisabetta Giani
Browse files

CT-60: Implemented Scan,EndScan with new base classes.

Introduced Receptors class to handle receptors info and
simplify tests with mock devices.
Added unit tests with mock devices.
parent db58e3b2
Loading
Loading
Loading
Loading
Loading
+154 −125
Original line number Diff line number Diff line
@@ -125,18 +125,13 @@ class CspSubarray(SKASubarray):
                information purpose only.
            :rtype: (ResultCode, str)
            """
            super().do()
            (result_code, message) = super().do()

            device = self.target
            device._build_state = '{}, {}, {}'.format(release.name, release.version, release.description)
            self.logger.info("Initial state is:{}".format(device.get_state()))
            device._version_id = release.version
            # connect to CSP.LMC TANGO DB
            #device.set_state(tango.DevState.INIT)
            #device._health_state = HealthState.UNKNOWN
            #device._admin_mode = AdminMode.ONLINE
            #device._obs_mode = ObsMode.IDLE
            #device._obs_state = ObsState.IDLE
            # _config_delay_expected: inherited from the SKAObsDevice base class
            # Note: the _config_delay_expected could be set equal the max among the sub-elements
            # sub-arrays expected times for the command
@@ -326,7 +321,7 @@ class CspSubarray(SKASubarray):

            message = "CspSubarray Init command completed OK"
            self.logger.info(message)
            return (ResultCode.OK, message)
            return (result_code, message)

    class OnCommand(SKASubarray.OnCommand):
        def do(self):
@@ -371,7 +366,7 @@ class CspSubarray(SKASubarray):
                        # state of the subarray is READY than subarray is not re-configured.
                        if (stored_configuration == received_configuration) and (target_device._obs_state == ObsState.READY):
                            self.logger.info("Subarray is going to use the same configuration")
                            return
                            return (ResultCode.OK, msg)
                    except Exception as e:
                        self.logger.warning(str(e))
                # go ahead and parse the received configuration
@@ -384,6 +379,9 @@ class CspSubarray(SKASubarray):
                                             msg,
                                             "Configure",
                                             tango.ErrSeverity.ERR)
            target_device._reconfiguring = False
            if target_device.state_model._obs_state == ObsState.READY:
               target_device._reconfiguring = True
            # Forward the Configure command to the sub-elements
            # components (subarrays, pst beams)
            for device in target_device._sc_subarray_assigned_fqdn:
@@ -422,7 +420,8 @@ class CspSubarray(SKASubarray):

                try:
                    # read the timeout configured for the operation on the device
                    target_device._sc_subarray_duration_expected[device]['configurescan'] = proxy.configureDelayExpected
                    target_device._sc_subarray_cmd_duration_expected[device]['configurescan'] = target_device._get_expected_delay(proxy, "configureDelayExpected")
                    self.logger.info("config delay: {}".format(target_device._sc_subarray_cmd_duration_expected[device]['configurescan']))
                except AttributeError as attr_err:
                    self.logger.info("No attribute {} on device {}".format(str(attr_err), device))
                try:
@@ -538,6 +537,7 @@ class CspSubarray(SKASubarray):
                    # Note: the second check, can be useful if the timeout event is not received
                    # (for example for a temporary connection timeout)
                    elapsed_time = time.time() - target_device._sc_subarray_cmd_starting_time[device]
                    self.logger.info("elapsed_time:{}".format(elapsed_time))

                    if (elapsed_time > target_device._sc_subarray_cmd_duration_expected[device][cmd_name] or
                        target_device._sc_subarray_cmd_exec_state[device][cmd_name] == CmdExecState.TIMEOUT):
@@ -548,7 +548,9 @@ class CspSubarray(SKASubarray):
                        self.logger.info("elapsed_time:{} device {}".format(elapsed_time, device))
                    # check if sub-element command ended throwing an exception: in this case the
                    # 'cmd_ended_cb' callback is invoked.
                    if target_device._sc_subarray_cmd_exec_state[device][cmd_name] == CmdExecState.FAILED:
                    if target_device._sc_subarray_cmd_exec_state[device][cmd_name] == CmdExecState.FAILED or \
                       target_device._sc_subarray_obs_state[device] == ObsState.FAULT:
                       target_device._sc_subarray_cmd_exec_state[device][cmd_name] = CmdExecState.FAILED
                       # execution ended for this sub-element, skip to the next one
                       device_done[device] = True
                    # update the progress counter inside the loop taking into account the number of devices
@@ -563,7 +565,6 @@ class CspSubarray(SKASubarray):
                if all(value == True for value in device_done.values()):
                    self.logger.info("All devices have been handled!")
                    break
                self.logger.info("6")
                # check for global timeout expiration
                # may be this check is not necessary 
                if target_device._cmd_duration_expected[cmd_name] < (time.time() - command_start_time):
@@ -575,9 +576,9 @@ class CspSubarray(SKASubarray):
                time.sleep(0.1)
            # end of the while loop
            # check for timeout/failure conditions on each sub-component
            if any(target_device._sc_subarray_cmd_exec_state[device] == CmdExecState.TIMEOUT for device in device_list):
            if any(target_device._sc_subarray_cmd_exec_state[device][cmd_name] == CmdExecState.TIMEOUT for device in device_list):
                target_device._timeout_expired = True
            if any(target_device._sc_subarray_cmd_exec_state[device] == CmdExecState.FAILED for device in device_list):
            if any(target_device._sc_subarray_cmd_exec_state[device][cmd_name] == CmdExecState.FAILED for device in device_list):
                target_device._failure_raised = True
            # reset sub-component execution flag
            target_device._sc_subarray_cmd_exec_state[device][cmd_name] = CmdExecState.IDLE
@@ -600,7 +601,8 @@ class CspSubarray(SKASubarray):
                target_device._cmd_duration_measured[cmd_name] = time.time() - command_start_time
                target_device._cmd_progress[cmd_name] = 100
                target_device._last_executed_command = cmd_name
                target_device.configure_cmd_obj.succeeded()
                self.logger.info("Configure ends with success!!")
                return target_device.configure_cmd_obj.succeeded()
            else:
                return target_device.configure_cmd_obj.failed()
                
@@ -640,41 +642,43 @@ class CspSubarray(SKASubarray):

    class ScanCommand(SKASubarray.ScanCommand):
        def do(self, argin):

            device = self.target
            target_device = self.target
            try:
                device._scan_id = int(argin[0])
                target_device._scan_id = int(argin[0])
            except (ValueError, Exception) as err:
                msg = "Scan command invalid argument:{}".format(str(err))
                self.logging.error(msg)
                return (ResultCode.FAILED, msg) 
            # invoke the constructor for the command thread
            self.logger.info("Received Scan command with id:{}".format(self._scan_id))
            for device in device._sc_subarray_assigned_fqdn:
            self.logger.info("Received Scan command with id:{}".format(target_device._scan_id))
            self.logger.info("sc_subarray_assigned_fqdn: {}".format(target_device._sc_subarray_assigned_fqdn))
            for device in target_device._sc_subarray_assigned_fqdn:
                try:
                    proxy = device._sc_subarray_proxies[device]
                    if not device._sc_subarray_event_id[device]['scancmdprogress']:
                    proxy = target_device._sc_subarray_proxies[device]
                    if not target_device._sc_subarray_event_id[device]['scancmdprogress']:
                        evt_id = proxy.subscribe_event("scanCmdProgress",
                                                       tango.EventType.CHANGE_EVENT,
                                                       device._attributes_change_evt_cb,
                                                       target_device._attributes_change_evt_cb,
                                                       stateless=False)
                        device._sc_subarray_event_id[device]['scancmdprogress'] = evt_id
                        target_device._sc_subarray_event_id[device]['scancmdprogress'] = evt_id
                except KeyError as key_err:
                    self.logger.warning("No key {} found".format(key_err)) 
                except tango.DevFailed as tango_err:
                    self.logger.info(tango_err.args[0].desc)
                try:
                    proxy.command_inout_asynch("Scan", argin[0], device._cmd_ended_cb)
                    self.logger.info("Forwarding scan")
                    proxy.command_inout_asynch("Scan", argin[0], target_device._cmd_ended_cb)
                    self.logger.info("after Forwarding scan")
                except tango.DevFailed as tango_err:
                    self.logger.info(tango_err.args[0].desc)
                    # TODO: add check on the failed device. If CBF
                    # throw an exception.
            # invoke the constructor for the command thread
                device._command_thread['scan'] = threading.Thread(target=self.__monitor_scan_execution,
            target_device._command_thread['scan'] = threading.Thread(target=self.__monitor_scan_execution,
                                                               name="Thread-Scan",
                                                               args=(device._sc_subarray_assigned_fqdn,))
                device._cmd_execution_state['scan'] = CmdExecState.RUNNING
                device._command_thread['scan'].start()
                                                               args=(target_device._sc_subarray_assigned_fqdn,))
            target_device._cmd_execution_state['scan'] = CmdExecState.RUNNING
            target_device._command_thread['scan'].start()
            return (ResultCode.STARTED, "Scan command started") 

        def __monitor_scan_execution(self, device_list):
@@ -694,8 +698,8 @@ class CspSubarray(SKASubarray):
            device_done = defaultdict(lambda:False)
            elapsed_time = 0
            starting_time = time.time()
                self._end_scan_event.clear()
                # inside the end-less lop check the obsState of each sub-component
            target_device._end_scan_event.clear()
            # inside the end-less loop check the obsState of each sub-component
            while True:
                # Note: CbfSubarray changes the obsState value after forwarding the command
                # (synchrnously) to FSP and VCC devices. This means that When the thread function enters,
@@ -707,13 +711,13 @@ class CspSubarray(SKASubarray):
                #if self._abort_command_event.is_set():
                #    dev_successful_state = ObsState.IDLE
                #if self._end_scan_event.is_set() or self._abort_command_event.is_set():
                    if target_device._end_scan_event.is_set():
                for device in device_list:
                    if device_done[device] == True:
                        continue
                    # if the sub-component execution flag is no more RUNNING, the command has
                    # ended with or without success. Go to check next device state.
                    if target_device._sc_subarray_obs_state[device] == dev_successful_state:
                        if target_device._end_scan_event.is_set():
                            self.logger.info("Command {} ended with success on device {}.".format(cmd_name,
                                                                                                device))
                            # update the list and number of device that completed the task
@@ -727,31 +731,39 @@ class CspSubarray(SKASubarray):
                            device_done[device] = True
                        # check if sub-element command ended throwing an exception: in this case the
                        # 'cmd_ended_cb' callback is invoked.
                            if target_device._sc_subarray_cmd_exec_state[device][cmd_name] == CmdExecState.FAILED:
                    if target_device._sc_subarray_cmd_exec_state[device][cmd_name] == CmdExecState.FAILED or\
                            target_device._sc_subarray_obs_state[device] == ObsState.FAULT:
                        # execution ended for this sub-element, skip to the next one
                        target_device._failure_raised = True
                        device_done[device] = True
                        target_device._sc_subarray_cmd_exec_state[device][cmd_name] = CmdExecState.IDLE
                    # TODO: handle connection problems
                    #if target_device._sc_subarray_state[device] == DevState.UNKNOWN:
                    #    self.logger.warning("Connection with device {} temporaly down".format(device))
                if any(device_done.values()) and all(value == True for value in device_done.values()):
                    self.logger.info("All devices have been handled!")
                    break
                self.logger.info("Going to sleep")
                time.sleep(0.2)
                           
            # end of the while loop
            target_device._cmd_execution_state[cmd_name] = CmdExecState.IDLE
            # check for timeout/failure conditions on each sub-component
            if any(target_device._sc_subarray_cmd_exec_state[device][cmd_name] == CmdExecState.FAILED for device in device_list):
                target_device._failure_raised = True
                    # reset sub-component execution flag
                    target_device._sc_subarray_cmd_exec_state[device][cmd_name] = CmdExecState.IDLE
                    target_device.scan_cmd_obj.failed()
                #return target_device.scan_cmd_obj.failed()
                return 
                # update the progress counter at the end of the loop 
            if all(target_device._sc_subarray_obs_state[fqdn] == dev_successful_state for fqdn in device_list):
                target_device._cmd_progress[cmd_name] = 100
                elapsed_time = time.time() - starting_time 
                self.logger.info("Scan elapsed time:{}".format(elapsed_time))
                    target_device._cmd_execution_state[cmd_name] = CmdExecState.IDLE
                target_device._last_executed_command = cmd_name
                    target_device.scan_cmd_obj.succeeded()
                #return target_device.scan_cmd_obj.succeeded()
                self.logger.info("Exiting from the scan thread!")
                return

    class EndScanCommand(SKASubarray.ScanCommand):
    class EndScanCommand(SKASubarray.EndScanCommand):

        def do(self):
            target_device = self.target
@@ -762,7 +774,10 @@ class CspSubarray(SKASubarray):
            for device in device_list:
                try:
                    proxy = target_device._sc_subarray_proxies[device]
                    proxy.command_inout_asynch("EndScan", target_device._cmd_ended_cb)
                    #proxy.command_inout_asynch("EndScan", target_device._cmd_ended_cb)
                    # Note: at the moment EndScan does not support STARTED code so can't start
                    # asynchrnously
                    proxy.command_inout("EndScan")
                except KeyError as key_err:
                    self.logger.warning("No key {} found".format(key_err))
                except tango.DevFailed as tango_err:
@@ -771,7 +786,7 @@ class CspSubarray(SKASubarray):
                    # signal the failure raising the failure flag?
            # set the threading endScan event 
            target_device._end_scan_event.set()
            return (ResultCode.OK, "EndScan command executed OK")
            return (ResultCode.STARTED, "EndScan command executed STARTED")

    '''
    class AbortCommand(SKASubarray.AbortCommand):
@@ -1058,6 +1073,7 @@ class CspSubarray(SKASubarray):
                if evt.attr_value.name.lower() in ["state", "healthstate", "adminmode", "obsstate"]:
                    self.logger.info("call to update_subarray_state()")
                    self.update_subarray_state()
                    self.logger.info("after update_subarray_state()")
            except tango.DevFailed as df:
                self.logger.error(str(df.args[0].desc))
            except Exception as except_occurred:
@@ -1152,15 +1168,17 @@ class CspSubarray(SKASubarray):
            if evt:
                if not evt.err:
                    if evt.argout[0] == ResultCode.STARTED:
                        self.logger.info("Device {} is processing the command {}".format(evt.device,
                        self.logger.info("Device {} is processing the command {}".format(evt.device.dev_name(),
                                                                                      evt.cmd_name))
                    if evt.argout[0] == ResultCode.OK:
                        self.logger.info("Device {} successfully processed the command {}".format(evt.device,
                        self.logger.info("Device {} successfully processed the command {}".format(evt.device.dev_name(),
                                                                                               evt.cmd_name))
                    if evt.argout[0] == ResultCode.FAILED:
                        self.logger.info("Failure in Device {} while processing the command {}".format(evt.device,
                        self.logger.info("Failure in Device {} while processing the command {}".format(evt.device.dev_name(),
                                                                                               evt.cmd_name))
                        self._sc_subarray_cmd_exec_state[evt.device.dev_name()][evt.cmd_name.lower()] = CmdExecState.FAILED
                        self.logger.info("sc_subarray_cmd_exec_state[{}][{}]:{}".format(evt.device.dev_name(), evt.cmd_name.lower(), 
                                                           self._sc_subarray_cmd_exec_state[evt.device.dev_name()][evt.cmd_name.lower()]))
                        self._failure_message[evt.cmd_name.lower()] += evt.argout[1]
                else:
                    msg = "Error!!Command {} ended on device {}.\n".format(evt.cmd_name,
@@ -1209,13 +1227,16 @@ class CspSubarray(SKASubarray):
        self.set_state(self._sc_subarray_state[self.CbfSubarray])
        if self.get_state() == DevState.OFF:
            if self._sc_subarray_obs_state[self.CbfSubarray] == ObsState.EMPTY:
                self.logger.info("1")
                self.off_cmd_obj.succeeded()
        if self.get_state() == DevState.ON:
            if self._sc_subarray_obs_state[self.CbfSubarray] == ObsState.EMPTY:
                self.logger.info("2")
                self.on_cmd_obj.succeeded()
            if self._sc_subarray_obs_state[self.CbfSubarray] == ObsState.READY:
                self.configure_cmd_obj.succeeded()
        self.logger.info("Csp subarray state: {} obsState: {}".format(self.get_state(), self.state_model.obs_state))
                self.logger.info("3")
        #self.logger.info("Csp subarray state: {} obsState: {}".format(self.get_state(), self.state_model._obs_state))
        return True

    def _update_subarray_health_state(self):
@@ -1297,6 +1318,14 @@ class CspSubarray(SKASubarray):
                                  "Subarray obsState:{}".format( self._obs_state))
        self.logger.info("Subarray ObsState:{}".format( self._obs_state))
    '''
    def _open_connection(self, fqdn):
        device_proxy = DeviceProxy(fqdn)
        return device_proxy

    def _get_expected_delay(self, attr_name, proxy):
        attr_value = proxy.read_attribute(attr_name)
        return attr_value.value()


    def connect_to_subarray_subcomponent(self, fqdn):
        """
@@ -1314,18 +1343,18 @@ class CspSubarray(SKASubarray):
            return
        # read the sub-componet adminMode (memorized) attribute from
        # the CSP.LMC TANGO DB. 
        #attribute_properties = self._csp_tango_db.get_device_attribute_property(fqdn, 
        #                                                                     {'adminMode': ['__value']})
        #self.logger.debug("fqdn: {} attribute_properties: {}".format(fqdn, attribute_properties))
        #try:
        #    admin_mode_memorized = attribute_properties['adminMode']['__value']
        #    self._sc_subarray_admin_mode[fqdn] = int(admin_mode_memorized[0])
        #except KeyError as key_error:
        #    self.logger.warning("No key {} found".format(str(key_error)))    
        attribute_properties = self._csp_tango_db.get_device_attribute_property(fqdn, 
                                                                             {'adminMode': ['__value']})
        self.logger.debug("fqdn: {} attribute_properties: {}".format(fqdn, attribute_properties))
        try:
            admin_mode_memorized = attribute_properties['adminMode']['__value']
            self._sc_subarray_admin_mode[fqdn] = int(admin_mode_memorized[0])
        except KeyError as key_error:
            self.logger.warning("No key {} found".format(str(key_error)))    
        try:
            log_msg = "Trying connection to " + str(fqdn) + " device"
            self.logger.info(log_msg)
            device_proxy = DeviceProxy(fqdn)
            device_proxy = self._open_connection(fqdn)
            self.logger.info("fqdn: {} device_proxy: {}".format(fqdn, device_proxy))
            # Note: The DeviceProxy is initialized even if the sub-component
            # device is not running (but defined into the TANGO DB! If not defined in the
@@ -1364,11 +1393,11 @@ class CspSubarray(SKASubarray):
                                                 stateless=True)
            self._sc_subarray_event_id[fqdn]['obsState'] = ev_id
                
            ev_id = device_proxy.subscribe_event("obsMode",
                                                 EventType.CHANGE_EVENT,
                                                 self._sc_scm_change_event_cb,
                                                 stateless=True)
            self._sc_subarray_event_id[fqdn]['obsMode'] = ev_id
            #ev_id = device_proxy.subscribe_event("obsMode",
            #                                     EventType.CHANGE_EVENT,
            #                                     self._sc_scm_change_event_cb,
            #                                     stateless=True)
            #self._sc_subarray_event_id[fqdn]['obsMode'] = ev_id
                
        except KeyError as key_err:
            log_msg = ("No key {} found".format(str(key_err)))
+117 −95

File changed.

Preview size limit exceeded, changes collapsed.

+126 −0

File added.

Preview size limit exceeded, changes collapsed.

+699 −0

File added.

Preview size limit exceeded, changes collapsed.