Commit a6fd94b7 authored by Elisabetta Giani's avatar Elisabetta Giani
Browse files

CT-69: Added Timeout and polling to handle asynchrnous testing.

Enabled logging for pytest in setup.cfg.
parent 76fa6fd1
Loading
Loading
Loading
Loading
Loading
+4 −0
Original line number Diff line number Diff line
@@ -14,6 +14,10 @@ source = csp-lmc-mid

[tool:pytest]
testpaths = tests
log_cli = True
log_cli_level = INFO
log_file = pytest-logs.txt
log_file_level = INFO
addopts = --verbose
	  --cov=csp_lmc_mid
          --json-report
+55 −20
Original line number Diff line number Diff line
@@ -23,9 +23,11 @@ import unittest

#Local imports
from ska.base.control_model  import ObsState
from utils import Poller, Probe

# TODO: remove call to sleep 

LOGGER = logging.getLogger(__name__)
# Device test case
@pytest.mark.usefixtures("midcsp_master", "midcsp_subarray01", "cbf_subarray01")

@@ -47,34 +49,60 @@ class TestCspSubarrayConfiguration(TestBase):
        """
        # read from the CSP Master the list of available receptors
        unassigned_receptors = self.midcsp_master.unassignedReceptorIDs
        receptor_membership = self.midcsp_master.receptorMembership
        # assign the first one to the subarray
        LOGGER.info("Add receptors")
        self.midcsp_subarray01.AddReceptors([unassigned_receptors[0],])
        # wait for the transition of the CSP subarray to ON
        time.sleep(3)
        receptor_membership[unassigned_receptors[0]- 1] = 1
        prober_first = Probe(self.midcsp_master, 'receptorMembership', receptor_membership, f"Wrong state")
        Poller(10, 0.2).check(prober_first)
        prober_first = Probe(self.midcsp_subarray01, 'state', DevState.ON, f"Wrong state")
        Poller(10, 0.2).check(prober_first)

    def _setup_csp_subarray(self):
        # setup the CSP Subarray
        LOGGER.info("Reset subarray to DISABLE")
        self._reset_subarray_to_init_state()
        state = self.midcsp_subarray01.State()
        assert state == DevState.DISABLE,"assuming that mid_csp_subarray_01 is disabled"

        assert state == DevState.DISABLE
        #switch-on the CspMaster
        LOGGER.info("Switch on Master")
        self.midcsp_master.On("")
        # wait for transition to OFF
        time.sleep(2)
        # wait for the transition of the CSP subarray to OFF
        prober_first = Probe(self.midcsp_subarray01, 'state', DevState.OFF, f"Wrong state")
        Poller(10, 0.2).check(prober_first)
        state = self.midcsp_subarray01.State()
        assert state in [DevState.OFF],"assuming that mid_csp_subarray_01 is OFF"
        self.__set_csp_subarray_to_on()

    def _reset_subarray_to_init_state(self):
        try:
            state = self.midcsp_subarray01.State()
            LOGGER.info("Init CSP Subarray State:{}".format(state))
            if state == DevState.DISABLE:
                return
            if state == DevState.ON:
                num_of_receptors = len(self.midcsp_master.receptorMembership)
                receptor_membership_expected = [0] * num_of_receptors
                LOGGER.info("Remove all receptors")
                self.midcsp_subarray01.RemoveAllReceptors()
            time.sleep(3)
                # wait for the transition of the CSP subarray to OFF
                prober_first = Probe(self.midcsp_subarray01, 'state', DevState.OFF, f"Wrong state")
                Poller(7, 0.2).check(prober_first)
                prober_first = Probe(self.midcsp_master, 'state', DevState.ON, f"Wrong state")
                Poller(7, 0.2).check(prober_first)
            # Set the CSP subarray to OFF issuing the Standby command
            # on CSP Master
            state = self.midcsp_subarray01.State()
            if state == DevState.OFF:
                LOGGER.info("State is OFF...going to execute Standby")
                self.midcsp_master.Standby("")
            time.sleep(3)
                prober_first = Probe(self.midcsp_subarray01, 'state', DevState.DISABLE, f"Wrong state")
                Poller(5, 0.2).check(prober_first)

        except tango.DevFailed as tango_err:
            logging.debug(f"Unable to reset subarray to init state")
            LOGGER.warn(f"Unable to reset subarray to init state")

    def _prepare_configuration_string(self,filename="test_ConfigureScan_ADR4.json"):
        """Create the config string for CSP-CBF"""
@@ -83,18 +111,27 @@ class TestCspSubarrayConfiguration(TestBase):
            configuration_string = json_file.read().replace("\n", "")
            return configuration_string
        except Exception as e:
            logging.debug(f"Unable to locate file {filename}")
            LOGGER.debug(f"Unable to locate file {filename}")

    def test_send_configure_to_cbf_and_json_stored(self):
        """
        Configure the CSP Subarray with a JSon string including
        the new ADR4 fields.
        """
        time.sleep(3)
        prober_first = Probe(self.midcsp_subarray01, 'obsState', ObsState.IDLE, f"Wrong state")
        Poller(5, 0.2).check(prober_first)
        configuration_string = self._prepare_configuration_string()
        state = self.midcsp_subarray01.State()
        assert state == DevState.ON
        self._setup_csp_subarray()
        state = self.midcsp_subarray01.State()
        # exercise the device
        LOGGER.debug("sending configure:{}".format(configuration_string))
        LOGGER.info("Send configuration command")
        self.midcsp_subarray01.Configure(configuration_string)
        time.sleep(3)
        prober_first = Probe(self.midcsp_subarray01, 'obsState', ObsState.READY, f"Wrong state")
        Poller(5, 0.2).check(prober_first)
        # check
        # The CSP Subarray ObsState is READY
        obs_state = self.midcsp_subarray01.obsState
@@ -110,10 +147,9 @@ class TestCspSubarrayConfiguration(TestBase):
        """
        # setup the test
        configuration_string = self._prepare_configuration_string("test_ConfigureScan_without_configID.json")
        self._setup_csp_subarray()
        # Set the CSP Subarray to ON-IDLE state
        obs_state = self.midcsp_subarray01.obsState
        assert obs_state == ObsState.IDLE
        state = self.midcsp_subarray01.State()
        self._setup_csp_subarray()

        # exercise the device
        with pytest.raises(tango.DevFailed) as df:
@@ -128,19 +164,18 @@ class TestCspSubarrayConfiguration(TestBase):
        Configure command on it
        """
        configuration_string = self._prepare_configuration_string()
        logging.debug("configuration string:{}".format(configuration_string))
        self._setup_csp_subarray()
        LOGGER.debug("configuration string:{}".format(configuration_string))
        self._reset_subarray_to_init_state()
        state = self.midcsp_subarray01.State()
        assert state == DevState.DISABLE 
        obs_state = self.midcsp_subarray01.obsState
        assert obs_state == ObsState.IDLE
        state = self.midcsp_subarray01.State()
        assert state == DevState.DISABLE

        # exercise the device
        with pytest.raises(tango.DevFailed) as df:
            self.midcsp_subarray01.Configure(configuration_string)
        if df:
            logging.debug("Command configure failed with er {}".format(str(df.value.args[0].desc)))
            LOGGER.info("Command configure failed with er {}".format(str(df.value.args[0].desc)))
        # check
        # Subarray final ObsState IDLE
        obs_state = self.midcsp_subarray01.obsState
+60 −0
Original line number Diff line number Diff line
import time
import logging
import numpy
from tango import DeviceProxy, DevState

LOGGER = logging.getLogger(__name__)
class Timeout:
    def __init__(self, duration):
        self.endtime = time.time() + duration
    def has_expired(self):
        return time.time() > self.endtime

class Probe:
    def __init__(self, proxy, attr_name, expected_state, message):
        """
        """
        self.proxy = proxy
        self.attr_name = attr_name
        self.expected_state = expected_state
        self.message = message
        self.current_state = DevState.DISABLE
 
    def get_attribute(self):
        return self.attr_name
    def sample(self):
        """
        extract the state of client and store it
        """
        device_attr = self.proxy.read_attribute(self.attr_name)
        self.current_state = device_attr.value
 
    def is_satisfied(self):
        """
        Check if the state satisfies this test condition
        """
        #if type(self.current_state) is numpy.ndarray:
        if isinstance(self.current_state, numpy.ndarray):
            return ((self.expected_state == self.current_state).all())
        return self.expected_state == self.current_state

class Poller:
    def __init__(self, timeout, interval):
        self.timeout = timeout
        self.interval = interval
 
    def check(self, probe: Probe):
        """
        Repeatedly check if the probe is satisfied.
        Assert false when the timeout expires.
        """
        timer = Timeout(self.timeout)
        probe.sample()
        while not probe.is_satisfied():
            if timer.has_expired():
                LOGGER.debug("Check Timeout on:{}".format(probe.get_attribute()))
                assert False, probe.message
            time.sleep(self.interval)
            probe.sample()
        LOGGER.debug("Check success on: {}".format(probe.get_attribute()))
        assert True