Loading csp-lmc-common/tests/unit/cspsubarray_unit_test.py 0 → 100644 +205 −0 Original line number Diff line number Diff line import contextlib import importlib import sys import mock import pytest import tango from mock import Mock, MagicMock from ska.base.control_model import HealthState, ObsState from ska.base.commands import ResultCode from tango.test_context import DeviceTestContext from tango import DevState, DevFailed from csp_lmc_common.CspSubarray import CspSubarray def test_cspsubarray_state_and_obstate_value_after_initialization(): """ Test the State and obsState values for the CpSubarray at the end of the initialization process. """ device_under_test = CspSubarray cbf_subarray_fqdn = 'mid_csp_cbf/sub_elt/subarray_01' cbf_subarray_state_attr = 'State' dut_properties = { 'CspMaster':'mid_csp/elt/master', 'CbfSubarray': cbf_subarray_fqdn, 'PssSubarray': 'mid_csp_pss/sub_elt/subarray_01', 'PstSubarray': 'mid_csp_pst/sub_elt/subarray_01', } event_subscription_map = {} cbf_subarray_device_proxy_mock = Mock() cbf_subarray_device_proxy_mock.subscribe_event.side_effect = ( lambda attr_name, event_type, callback, *args, **kwargs: event_subscription_map.update({attr_name: callback})) proxies_to_mock = { cbf_subarray_fqdn: cbf_subarray_device_proxy_mock } with fake_tango_system(device_under_test, initial_dut_properties=dut_properties, proxies_to_mock=proxies_to_mock) as tango_context: dummy_event = create_dummy_event(cbf_subarray_fqdn, DevState.OFF) event_subscription_map[cbf_subarray_state_attr](dummy_event) assert tango_context.device.State() == DevState.OFF assert tango_context.device.obsState == ObsState.EMPTY def test_cspsbarray_state_after_On_WITH_exception_raised_by_subelement_subarray(): """ Test the behavior of the CspSubarray when one of the sub-element subarray raises a DevFailed exception. """ device_under_test = CspSubarray cbf_subarray_fqdn = 'mid_csp_cbf/sub_elt/subarray_01' pss_subarray_fqdn = 'mid_csp_pss/sub_elt/subarray_01' dut_properties = { 'CbfSubarray': cbf_subarray_fqdn, 'PssSubarray': pss_subarray_fqdn } cbf_subarray_device_proxy_mock = Mock() pss_subarray_device_proxy_mock = Mock() proxies_to_mock = { cbf_subarray_fqdn: cbf_subarray_device_proxy_mock, pss_subarray_fqdn: pss_subarray_device_proxy_mock } with fake_tango_system(device_under_test, initial_dut_properties=dut_properties, proxies_to_mock=proxies_to_mock) as tango_context: cbf_subarray_device_proxy_mock.On.side_effect = raise_devfailed_exception pss_subarray_device_proxy_mock.On.side_effect = return_ok tango_context.device.On() assert tango_context.device.State() == DevState.FAULT assert tango_context.device.obsState == ObsState.EMPTY def test_cspsbarray_state_after_On_WITH_command_failed_code_returned_by_subelement_subarray(): """ Test the behavior of the CspSubarray when on the sub-element subarray return a FAILED code. """ device_under_test = CspSubarray cbf_subarray_fqdn = 'mid_csp_cbf/sub_elt/subarray_01' pss_subarray_fqdn = 'mid_csp_pss/sub_elt/subarray_01' dut_properties = { 'CbfSubarray': cbf_subarray_fqdn, 'PssSubarray': pss_subarray_fqdn, } cbf_subarray_device_proxy_mock = Mock() pss_subarray_device_proxy_mock = Mock() proxies_to_mock = { cbf_subarray_fqdn: cbf_subarray_device_proxy_mock, pss_subarray_fqdn: pss_subarray_device_proxy_mock, } with fake_tango_system(device_under_test, initial_dut_properties=dut_properties, proxies_to_mock=proxies_to_mock) as tango_context: cbf_subarray_device_proxy_mock.On.side_effect = return_failed pss_subarray_device_proxy_mock.On.side_effect = return_ok tango_context.device.On() assert tango_context.device.State() == DevState.FAULT assert tango_context.device.obsState == ObsState.EMPTY def test_cspsbarray_state_after_On_WITH_command_failed_code_returned_by_pss_subarray(): """ Test the behavior of the CspSubarray when on the sub-element subarray return a FAILED code. """ device_under_test = CspSubarray cbf_subarray_fqdn = 'mid_csp_cbf/sub_elt/subarray_01' pss_subarray_fqdn = 'mid_csp_pss/sub_elt/subarray_01' dut_properties = { 'CbfSubarray': cbf_subarray_fqdn, 'PssSubarray': pss_subarray_fqdn, } cbf_subarray_device_proxy_mock = Mock() pss_subarray_device_proxy_mock = Mock() proxies_to_mock = { cbf_subarray_fqdn: cbf_subarray_device_proxy_mock, pss_subarray_fqdn: pss_subarray_device_proxy_mock } with fake_tango_system(device_under_test, initial_dut_properties=dut_properties, proxies_to_mock=proxies_to_mock) as tango_context: cbf_subarray_device_proxy_mock.On.side_effect = return_ok pss_subarray_device_proxy_mock.On.side_effect = return_failed tango_context.device.On() assert tango_context.device.State() == DevState.ON assert tango_context.device.obsState == ObsState.EMPTY assert tango_context.device.healthState == HealthState.DEGRADED def test_cspsbarray_state_after_On_forwarded_to_subelement_subarray(): """ Test the behavior of the CspSubarray when on the sub-element subarray return a FAILED code. """ device_under_test = CspSubarray cbf_subarray_fqdn = 'mid_csp_cbf/sub_elt/subarray_01' pss_subarray_fqdn = 'mid_csp_pss/sub_elt/subarray_01' dut_properties = { 'CbfSubarray': cbf_subarray_fqdn, 'PssSubarray': pss_subarray_fqdn, } cbf_subarray_device_proxy_mock = Mock() pss_subarray_device_proxy_mock = Mock() proxies_to_mock = { cbf_subarray_fqdn: cbf_subarray_device_proxy_mock, pss_subarray_fqdn: pss_subarray_device_proxy_mock, } with fake_tango_system(device_under_test, initial_dut_properties=dut_properties, proxies_to_mock=proxies_to_mock) as tango_context: cbf_subarray_device_proxy_mock.On.side_effect = return_ok pss_subarray_device_proxy_mock.On.side_effect = return_ok tango_context.device.On() assert tango_context.device.State() == DevState.ON assert tango_context.device.obsState == ObsState.EMPTY def return_ok(): """ Return a FAILED code in the execution of a device method. """ message = "CBF Subarray Oncommand OK" return (ResultCode.OK, message) def return_failed(): """ Return a FAILED code in the execution of a device method. """ print("return failed") return (ResultCode.FAILED, "On Command failed") def raise_devfailed_exception(): """ Raise an exception to test the failure of a device command """ tango.Except.throw_exception("Commandfailed", "This is error message for devfailed", " ", tango.ErrSeverity.ERR) def mock_event_dev_name(device_name): return device_name def create_dummy_event(cbf_subarray_fqdn, event_value): """ Create a mocked event object to test the event callback method associate to the attribute at subscription. param: cbf_subarray_fqdn the CBF Subarray FQDN event_value the expected value return: the fake event """ fake_event = Mock() fake_event.err = False fake_event.attr_name = f"{cbf_subarray_fqdn}/state" fake_event.attr_value.value = event_value fake_event.attr_value.name = 'State' fake_event.device.name = cbf_subarray_fqdn fake_event.device.dev_name.side_effect=(lambda *args, **kwargs: mock_event_dev_name(cbf_subarray_fqdn)) return fake_event @contextlib.contextmanager def fake_tango_system(device_under_test, initial_dut_properties={}, proxies_to_mock={}, device_proxy_import_path='tango.DeviceProxy'): with mock.patch(device_proxy_import_path) as patched_constructor: patched_constructor.side_effect = lambda device_fqdn: proxies_to_mock.get(device_fqdn, Mock()) patched_module = importlib.reload(sys.modules[device_under_test.__module__]) print("patched_module:", patched_module) print("device_under_test.__name__:", device_under_test.__name__) device_under_test = getattr(patched_module, device_under_test.__name__) device_test_context = DeviceTestContext(device_under_test, properties=initial_dut_properties) device_test_context.start() yield device_test_context device_test_context.stop() Loading
csp-lmc-common/tests/unit/cspsubarray_unit_test.py 0 → 100644 +205 −0 Original line number Diff line number Diff line import contextlib import importlib import sys import mock import pytest import tango from mock import Mock, MagicMock from ska.base.control_model import HealthState, ObsState from ska.base.commands import ResultCode from tango.test_context import DeviceTestContext from tango import DevState, DevFailed from csp_lmc_common.CspSubarray import CspSubarray def test_cspsubarray_state_and_obstate_value_after_initialization(): """ Test the State and obsState values for the CpSubarray at the end of the initialization process. """ device_under_test = CspSubarray cbf_subarray_fqdn = 'mid_csp_cbf/sub_elt/subarray_01' cbf_subarray_state_attr = 'State' dut_properties = { 'CspMaster':'mid_csp/elt/master', 'CbfSubarray': cbf_subarray_fqdn, 'PssSubarray': 'mid_csp_pss/sub_elt/subarray_01', 'PstSubarray': 'mid_csp_pst/sub_elt/subarray_01', } event_subscription_map = {} cbf_subarray_device_proxy_mock = Mock() cbf_subarray_device_proxy_mock.subscribe_event.side_effect = ( lambda attr_name, event_type, callback, *args, **kwargs: event_subscription_map.update({attr_name: callback})) proxies_to_mock = { cbf_subarray_fqdn: cbf_subarray_device_proxy_mock } with fake_tango_system(device_under_test, initial_dut_properties=dut_properties, proxies_to_mock=proxies_to_mock) as tango_context: dummy_event = create_dummy_event(cbf_subarray_fqdn, DevState.OFF) event_subscription_map[cbf_subarray_state_attr](dummy_event) assert tango_context.device.State() == DevState.OFF assert tango_context.device.obsState == ObsState.EMPTY def test_cspsbarray_state_after_On_WITH_exception_raised_by_subelement_subarray(): """ Test the behavior of the CspSubarray when one of the sub-element subarray raises a DevFailed exception. """ device_under_test = CspSubarray cbf_subarray_fqdn = 'mid_csp_cbf/sub_elt/subarray_01' pss_subarray_fqdn = 'mid_csp_pss/sub_elt/subarray_01' dut_properties = { 'CbfSubarray': cbf_subarray_fqdn, 'PssSubarray': pss_subarray_fqdn } cbf_subarray_device_proxy_mock = Mock() pss_subarray_device_proxy_mock = Mock() proxies_to_mock = { cbf_subarray_fqdn: cbf_subarray_device_proxy_mock, pss_subarray_fqdn: pss_subarray_device_proxy_mock } with fake_tango_system(device_under_test, initial_dut_properties=dut_properties, proxies_to_mock=proxies_to_mock) as tango_context: cbf_subarray_device_proxy_mock.On.side_effect = raise_devfailed_exception pss_subarray_device_proxy_mock.On.side_effect = return_ok tango_context.device.On() assert tango_context.device.State() == DevState.FAULT assert tango_context.device.obsState == ObsState.EMPTY def test_cspsbarray_state_after_On_WITH_command_failed_code_returned_by_subelement_subarray(): """ Test the behavior of the CspSubarray when on the sub-element subarray return a FAILED code. """ device_under_test = CspSubarray cbf_subarray_fqdn = 'mid_csp_cbf/sub_elt/subarray_01' pss_subarray_fqdn = 'mid_csp_pss/sub_elt/subarray_01' dut_properties = { 'CbfSubarray': cbf_subarray_fqdn, 'PssSubarray': pss_subarray_fqdn, } cbf_subarray_device_proxy_mock = Mock() pss_subarray_device_proxy_mock = Mock() proxies_to_mock = { cbf_subarray_fqdn: cbf_subarray_device_proxy_mock, pss_subarray_fqdn: pss_subarray_device_proxy_mock, } with fake_tango_system(device_under_test, initial_dut_properties=dut_properties, proxies_to_mock=proxies_to_mock) as tango_context: cbf_subarray_device_proxy_mock.On.side_effect = return_failed pss_subarray_device_proxy_mock.On.side_effect = return_ok tango_context.device.On() assert tango_context.device.State() == DevState.FAULT assert tango_context.device.obsState == ObsState.EMPTY def test_cspsbarray_state_after_On_WITH_command_failed_code_returned_by_pss_subarray(): """ Test the behavior of the CspSubarray when on the sub-element subarray return a FAILED code. """ device_under_test = CspSubarray cbf_subarray_fqdn = 'mid_csp_cbf/sub_elt/subarray_01' pss_subarray_fqdn = 'mid_csp_pss/sub_elt/subarray_01' dut_properties = { 'CbfSubarray': cbf_subarray_fqdn, 'PssSubarray': pss_subarray_fqdn, } cbf_subarray_device_proxy_mock = Mock() pss_subarray_device_proxy_mock = Mock() proxies_to_mock = { cbf_subarray_fqdn: cbf_subarray_device_proxy_mock, pss_subarray_fqdn: pss_subarray_device_proxy_mock } with fake_tango_system(device_under_test, initial_dut_properties=dut_properties, proxies_to_mock=proxies_to_mock) as tango_context: cbf_subarray_device_proxy_mock.On.side_effect = return_ok pss_subarray_device_proxy_mock.On.side_effect = return_failed tango_context.device.On() assert tango_context.device.State() == DevState.ON assert tango_context.device.obsState == ObsState.EMPTY assert tango_context.device.healthState == HealthState.DEGRADED def test_cspsbarray_state_after_On_forwarded_to_subelement_subarray(): """ Test the behavior of the CspSubarray when on the sub-element subarray return a FAILED code. """ device_under_test = CspSubarray cbf_subarray_fqdn = 'mid_csp_cbf/sub_elt/subarray_01' pss_subarray_fqdn = 'mid_csp_pss/sub_elt/subarray_01' dut_properties = { 'CbfSubarray': cbf_subarray_fqdn, 'PssSubarray': pss_subarray_fqdn, } cbf_subarray_device_proxy_mock = Mock() pss_subarray_device_proxy_mock = Mock() proxies_to_mock = { cbf_subarray_fqdn: cbf_subarray_device_proxy_mock, pss_subarray_fqdn: pss_subarray_device_proxy_mock, } with fake_tango_system(device_under_test, initial_dut_properties=dut_properties, proxies_to_mock=proxies_to_mock) as tango_context: cbf_subarray_device_proxy_mock.On.side_effect = return_ok pss_subarray_device_proxy_mock.On.side_effect = return_ok tango_context.device.On() assert tango_context.device.State() == DevState.ON assert tango_context.device.obsState == ObsState.EMPTY def return_ok(): """ Return a FAILED code in the execution of a device method. """ message = "CBF Subarray Oncommand OK" return (ResultCode.OK, message) def return_failed(): """ Return a FAILED code in the execution of a device method. """ print("return failed") return (ResultCode.FAILED, "On Command failed") def raise_devfailed_exception(): """ Raise an exception to test the failure of a device command """ tango.Except.throw_exception("Commandfailed", "This is error message for devfailed", " ", tango.ErrSeverity.ERR) def mock_event_dev_name(device_name): return device_name def create_dummy_event(cbf_subarray_fqdn, event_value): """ Create a mocked event object to test the event callback method associate to the attribute at subscription. param: cbf_subarray_fqdn the CBF Subarray FQDN event_value the expected value return: the fake event """ fake_event = Mock() fake_event.err = False fake_event.attr_name = f"{cbf_subarray_fqdn}/state" fake_event.attr_value.value = event_value fake_event.attr_value.name = 'State' fake_event.device.name = cbf_subarray_fqdn fake_event.device.dev_name.side_effect=(lambda *args, **kwargs: mock_event_dev_name(cbf_subarray_fqdn)) return fake_event @contextlib.contextmanager def fake_tango_system(device_under_test, initial_dut_properties={}, proxies_to_mock={}, device_proxy_import_path='tango.DeviceProxy'): with mock.patch(device_proxy_import_path) as patched_constructor: patched_constructor.side_effect = lambda device_fqdn: proxies_to_mock.get(device_fqdn, Mock()) patched_module = importlib.reload(sys.modules[device_under_test.__module__]) print("patched_module:", patched_module) print("device_under_test.__name__:", device_under_test.__name__) device_under_test = getattr(patched_module, device_under_test.__name__) device_test_context = DeviceTestContext(device_under_test, properties=initial_dut_properties) device_test_context.start() yield device_test_context device_test_context.stop()