Commit 657a6c6b authored by Elisabetta Giani's avatar Elisabetta Giani
Browse files

AT5-262: PSS Master simulator

parent 6c31e105
Loading
Loading
Loading
Loading
+124 −12
Original line number Diff line number Diff line
@@ -14,6 +14,7 @@ It simulates the CbfMaster sub-element.
"""
from __future__ import absolute_import
import sys
from collections import defaultdict
import os
import time

@@ -36,7 +37,7 @@ from tango import AttrWriteType, PipeWriteType
# PROTECTED REGION ID(DeviceTestMaster.additionnal_import) ENABLED START #
from future.utils import with_metaclass
import threading
from cspcommons import HealthState
from cspcommons import HealthState, AdminMode
from skabase.SKAMaster.SKAMaster import SKAMaster
# PROTECTED REGION END #    //  DeviceTestMaster.additionnal_import

@@ -59,16 +60,46 @@ class DeviceTestMaster(with_metaclass(DeviceMeta,SKAMaster)):
    # ----------

    onCommandProgress = attribute(
        dtype='uint16',
        dtype='DevUShort',
        label="Command progress percentage",
        polling_period=1000,
        abs_change=5,
        max_value=100,
        min_value=0,
        doc="Percentage progress implemented for commands that  result in state/mode transitions for a large \nnumber of components and/or are executed in stages (e.g power up, power down)",
    )

    offCommandProgress = attribute(
        dtype='DevDouble',
        polling_period=1000,
        abs_change=5,
        rel_change=2,
        doc="Percentage progress implemented for commands that  result in state/mode transitions for a large \nnumber of components and/or are executed in stages (e.g power up, power down)",
    )

    standbyCommandProgress = attribute(
        dtype='DevDouble',
        polling_period=1000,
        abs_change=5,
    )

    onDurationExpected = attribute(
        dtype='DevUShort',
        access=AttrWriteType.READ_WRITE,
        memorized=True,
    )

    offDurationExpected = attribute(
        dtype='DevUShort',
        access=AttrWriteType.READ_WRITE,
        memorized=True,
    )

    standbyDurationExpected = attribute(
        dtype='DevUShort',
        access=AttrWriteType.READ_WRITE,
        memorized=True,
    )


    # ---------------
    # General methods
    # ---------------
@@ -77,6 +108,7 @@ class DeviceTestMaster(with_metaclass(DeviceMeta,SKAMaster)):
        """
        Simulate the sub-element device initialization
        """
        time.sleep(3)
        self.set_state(tango.DevState.STANDBY)

    def on_subelement(self):
@@ -84,9 +116,15 @@ class DeviceTestMaster(with_metaclass(DeviceMeta,SKAMaster)):
        Simulate the sub-element transition from STANDBY to ON 
        """
        print("Executing the On command...wait")
        time.sleep(10)
        while True:
            elapsed_time = time.time() - self._start_time
            if  elapsed_time >  self._duration_expected['on']:
                break
            if elapsed_time  > 1:
                self._cmd_progress['standby'] = elapsed_time* 100/self._duration_expected['on']
                self.push_change_event("onCmdProgress", self._cmd_progress['on'])
        self.set_state(tango.DevState.ON)
        self._health_state = HealthState.DEGRADED.value
        self._health_state = HealthState.DEGRADED
        print("End On command...wait")

    def standby_subelement(self):
@@ -94,24 +132,49 @@ class DeviceTestMaster(with_metaclass(DeviceMeta,SKAMaster)):
        Simulate the sub-element transition from ON to STANDBY
        """
        print("Executing the Standby command...wait")
        time.sleep(10)
        while True:
            elapsed_time = time.time() - self._start_time
            if  elapsed_time > self._duration_expected['standby']:
                break
            if elapsed_time > 1:
                self._cmd_progress['standby'] = elapsed_time* 100/self._duration_expected['standby']
                self.push_change_event("standbyCmdProgress", self._cmd_progress['standby'])
        self.set_state(tango.DevState.STANDBY)
        self._health_state = HealthState.DEGRADED.value
        print("End Standby command...wait")

    def off_subelement(self):
        """
        Simulate the sub-element transition from STANDBY to OFF
        """
        while True:
            elapsed_time = time.time() - self._start_time
            if  elapsed_time > self._duration_expected['off']:
                break
            if  elapsed_time >  1:
                self._cmd_progress['off'] = elapsed_time* 100/self._duration_expected['ff']
                self.push_change_event("offCmdProgress", self._cmd_progress['off'])
        self.set_state(tango.DevState.OFF)
        self._health_state = HealthState.UNKNOWN.value
        self._health_state = HealthState.UNKNOWN

    def init_device(self):
        SKAMaster.init_device(self)
        # PROTECTED REGION ID(DeviceTestMaster.init_device) ENABLED START #
        
        self.set_state(tango.DevState.INIT)
        self._health_state = HealthState.UNKNOWN.value
        self._health_state = HealthState.UNKNOWN
        self._admin_mode = AdminMode.NOTFITTED
        self._duration_expected = defaultdict(lambda:10)
        self._cmd_progress = defaultdict(lambda:0)
        
        csp_tango_db = tango.Database()
        # read the CSP memorized attributes from the TANGO DB. 
        # Note: a memorized attribute has defined the attribute
        # property '__value'
        attribute_properties = csp_tango_db.get_device_attribute_property(self.get_name(),
                                                                          {'adminMode': ['__value'],
                                                                           'on': ['__value'],
                                                                           'elementLoggingLevel': ['__value'],
                                                                           'centralLoggingLevel': ['__value'],})

        # start a timer to simulate device intialization
        thread = threading.Timer(1, self.init_subelement)
@@ -135,9 +198,58 @@ class DeviceTestMaster(with_metaclass(DeviceMeta,SKAMaster)):

    def read_onCommandProgress(self):
        # PROTECTED REGION ID(DeviceTestMaster.onCommandProgress_read) ENABLED START #
        return 0
        """Return the onCommandProgress attribute."""
        return self._cmd_progress['on']
        # PROTECTED REGION END #    //  DeviceTestMaster.onCommandProgress_read

    def read_offCommandProgress(self):
        # PROTECTED REGION ID(DeviceTestMaster.offCommandProgress_read) ENABLED START #
        """Return the offCommandProgress attribute."""
        return self._cmd_progress['off']
        # PROTECTED REGION END #    //  DeviceTestMaster.offCommandProgress_read

    def read_standbyCommandProgress(self):
        # PROTECTED REGION ID(DeviceTestMaster.standbyCommandProgress_read) ENABLED START #
        """Return the standbyCommandProgress attribute."""
        return self._cmd_progress['standby']
        # PROTECTED REGION END #    //  DeviceTestMaster.standbyCommandProgress_read

    def read_onDurationExpected(self):
        # PROTECTED REGION ID(DeviceTestMaster.onDurationExpected_read) ENABLED START #
        """Return the onDurationExpected attribute."""
        return self._duration_expected['on']
        # PROTECTED REGION END #    //  DeviceTestMaster.onDurationExpected_read

    def write_onDurationExpected(self, value):
        # PROTECTED REGION ID(DeviceTestMaster.onDurationExpected_write) ENABLED START #
        """Set the onDurationExpected attribute."""
        self._duration_expected['on'] = value
        # PROTECTED REGION END #    //  DeviceTestMaster.onDurationExpected_write

    def read_offDurationExpected(self):
        # PROTECTED REGION ID(DeviceTestMaster.offDurationExpected_read) ENABLED START #
        """Return the offDurationExpected attribute."""
        return self._duration_expected['off']
        # PROTECTED REGION END #    //  DeviceTestMaster.offDurationExpected_read

    def write_offDurationExpected(self, value):
        # PROTECTED REGION ID(DeviceTestMaster.offDurationExpected_write) ENABLED START #
        """Set the offDurationExpected attribute."""
        self._duration_expected['off'] = value
        # PROTECTED REGION END #    //  DeviceTestMaster.offDurationExpected_write

    def read_standbyDurationExpected(self):
        # PROTECTED REGION ID(DeviceTestMaster.standbyDurationExpected_read) ENABLED START #
        """Return the standbyDurationExpected attribute."""
        return self._duration_expected['standby']
        # PROTECTED REGION END #    //  DeviceTestMaster.standbyDurationExpected_read

    def write_standbyDurationExpected(self, value):
        # PROTECTED REGION ID(DeviceTestMaster.standbyDurationExpected_write) ENABLED START #
        """Set the standbyDurationExpected attribute."""
        elf._duration_expected['standby'] = value
        # PROTECTED REGION END #    //  DeviceTestMaster.standbyDurationExpected_write

    # --------
    # Commands
    # --------
@@ -148,7 +260,7 @@ class DeviceTestMaster(with_metaclass(DeviceMeta,SKAMaster)):
    @DebugIt()
    def On(self):
        # PROTECTED REGION ID(DeviceTestMaster.On) ENABLED START #
        print("Sono qui")
        print("Processing On command")
        thread = threading.Timer(2, self.on_subelement)
        thread.start()
        # PROTECTED REGION END #    //  DeviceTestMaster.On
+109 −0
Original line number Diff line number Diff line
# -*- coding: utf-8 -*-
#
# This file is part of the PssMasterSimulator project
#
# INAF-SKA Telescope
#
# Distributed under the terms of the GPL license.
# See LICENSE.txt for more info.

""" PssMaster Class simulator

"""

from __future__ import absolute_import
import sys
from collections import defaultdict
import os
import time

file_path = os.path.dirname(os.path.abspath(__file__))
print(file_path)
module_path = os.path.abspath(os.path.join(file_path, os.pardir)) + "/../utils"
print(module_path)
sys.path.insert(0, module_path)

# Tango imports
import tango
from tango import DebugIt
from tango.server import run
from tango.server import Device, DeviceMeta
from tango.server import attribute, command
from tango.server import device_property
from tango import AttrQuality, DispLevel, DevState
from tango import AttrWriteType, PipeWriteType
# Additional import
# PROTECTED REGION ID(DeviceTestMaster.additionnal_import) ENABLED START #
from future.utils import with_metaclass
import threading
from cspcommons import HealthState, AdminMode
from DeviceTestMaster import DeviceTestMaster
# PROTECTED REGION END #    //  DeviceTestMaster.additionnal_import

__all__ = ["PssMasterSimulator", "main"]


class PssMasterSimulator(with_metaclass(DeviceMeta,DeviceTestMaster)):
    """

    **Properties:**

    - Device Property
    """
    # PROTECTED REGION ID(PssMasterSimulator.class_variable) ENABLED START #
    # PROTECTED REGION END #    //  PssMasterSimulator.class_variable

    # -----------------
    # Device Properties
    # -----------------

    # ----------
    # Attributes
    # ----------


    # ---------------
    # General methods
    # ---------------

    def init_device(self):
        """Initialises the attributes and properties of the PssMasterSimulator."""
        DeviceTestMaster.init_device(self)
        # PROTECTED REGION ID(PssMasterSimulator.init_device) ENABLED START #
        # PROTECTED REGION END #    //  PssMasterSimulator.init_device

    def always_executed_hook(self):
        """Method always executed before any TANGO command is executed."""
        # PROTECTED REGION ID(PssMasterSimulator.always_executed_hook) ENABLED START #
        # PROTECTED REGION END #    //  PssMasterSimulator.always_executed_hook

    def delete_device(self):
        """Hook to delete resources allocated in init_device.

        This method allows for any memory or other resources allocated in the
        init_device method to be released.  This method is called by the device
        destructor and by the device Init command.
        """
        # PROTECTED REGION ID(PssMasterSimulator.delete_device) ENABLED START #
        # PROTECTED REGION END #    //  PssMasterSimulator.delete_device
    # ------------------
    # Attributes methods
    # ------------------


    # --------
    # Commands
    # --------

# ----------
# Run server
# ----------


def main(args=None, **kwargs):
    # PROTECTED REGION ID(PssMasterSimulator.main) ENABLED START #
    return run((PssMasterSimulator,), args=args, **kwargs)
    # PROTECTED REGION END #    //  PssMasterSimulator.main

if __name__ == '__main__':
    main()
+292 −0
Original line number Diff line number Diff line
# -*- coding: utf-8 -*-
#
# This file is part of the DeviceTestMaster project
#
#
#
# Distributed under the terms of the GPL license.
# See LICENSE.txt for more info.

""" CSP.LMC subelement Test Master Tango device prototype

Test TANGO device class to test connection with the CSPMaster prototype.
It simulates the CbfMaster sub-element.
"""

# PyTango imports
import tango
from tango import DebugIt
from tango.server import run
from tango.server import Device, DeviceMeta
from tango.server import attribute, command
from tango.server import device_property
from tango import AttrQuality, DispLevel, DevState
from tango import AttrWriteType, PipeWriteType
from SKAMaster import SKAMaster
# Additional import
# PROTECTED REGION ID(DeviceTestMaster.additionnal_import) ENABLED START #
# PROTECTED REGION END #    //  DeviceTestMaster.additionnal_import

__all__ = ["DeviceTestMaster", "main"]


class DeviceTestMaster(SKAMaster):
    """
    Test TANGO device class to test connection with the CSPMaster prototype.
    It simulates the CbfMaster sub-element.

    **Properties:**

    - Device Property
    
    
    
    
    
    
    
    
    """
    __metaclass__ = DeviceMeta
    # PROTECTED REGION ID(DeviceTestMaster.class_variable) ENABLED START #
    # PROTECTED REGION END #    //  DeviceTestMaster.class_variable

    # -----------------
    # Device Properties
    # -----------------









    # ----------
    # Attributes
    # ----------















    onCommandProgress = attribute(
        dtype='DevUShort',
        label="Command progress percentage",
        polling_period=1000,
        abs_change=5,
        max_value=100,
        min_value=0,
        doc="Percentage progress implemented for commands that  result in state/mode transitions for a large \nnumber of components and/or are executed in stages (e.g power up, power down)",
    )

    offCommandProgress = attribute(
        dtype='DevDouble',
        polling_period=1000,
        abs_change=5,
    )

    standbyCommandProgress = attribute(
        dtype='DevDouble',
        polling_period=1000,
        abs_change=5,
    )

    onDurationExpected = attribute(
        dtype='DevUShort',
        access=AttrWriteType.READ_WRITE,
        memorized=True,
    )

    offDurationExpected = attribute(
        dtype='DevUShort',
        access=AttrWriteType.READ_WRITE,
        memorized=True,
    )

    standbyDurationExpected = attribute(
        dtype='DevUShort',
        access=AttrWriteType.READ_WRITE,
        memorized=True,
    )



    # ---------------
    # General methods
    # ---------------

    def init_device(self):
        """Initialises the attributes and properties of the DeviceTestMaster."""
        SKAMaster.init_device(self)
        # PROTECTED REGION ID(DeviceTestMaster.init_device) ENABLED START #
        # PROTECTED REGION END #    //  DeviceTestMaster.init_device

    def always_executed_hook(self):
        """Method always executed before any TANGO command is executed."""
        # PROTECTED REGION ID(DeviceTestMaster.always_executed_hook) ENABLED START #
        # PROTECTED REGION END #    //  DeviceTestMaster.always_executed_hook

    def delete_device(self):
        """Hook to delete resources allocated in init_device.

        This method allows for any memory or other resources allocated in the
        init_device method to be released.  This method is called by the device
        destructor and by the device Init command.
        """
        # PROTECTED REGION ID(DeviceTestMaster.delete_device) ENABLED START #
        # PROTECTED REGION END #    //  DeviceTestMaster.delete_device
    # ------------------
    # Attributes methods
    # ------------------

    def read_onCommandProgress(self):
        # PROTECTED REGION ID(DeviceTestMaster.onCommandProgress_read) ENABLED START #
        """Return the onCommandProgress attribute."""
        return 0
        # PROTECTED REGION END #    //  DeviceTestMaster.onCommandProgress_read

    def read_offCommandProgress(self):
        # PROTECTED REGION ID(DeviceTestMaster.offCommandProgress_read) ENABLED START #
        """Return the offCommandProgress attribute."""
        return 0.0
        # PROTECTED REGION END #    //  DeviceTestMaster.offCommandProgress_read

    def read_standbyCommandProgress(self):
        # PROTECTED REGION ID(DeviceTestMaster.standbyCommandProgress_read) ENABLED START #
        """Return the standbyCommandProgress attribute."""
        return 0.0
        # PROTECTED REGION END #    //  DeviceTestMaster.standbyCommandProgress_read

    def read_onDurationExpected(self):
        # PROTECTED REGION ID(DeviceTestMaster.onDurationExpected_read) ENABLED START #
        """Return the onDurationExpected attribute."""
        return 0
        # PROTECTED REGION END #    //  DeviceTestMaster.onDurationExpected_read

    def write_onDurationExpected(self, value):
        # PROTECTED REGION ID(DeviceTestMaster.onDurationExpected_write) ENABLED START #
        """Set the onDurationExpected attribute."""
        pass
        # PROTECTED REGION END #    //  DeviceTestMaster.onDurationExpected_write

    def read_offDurationExpected(self):
        # PROTECTED REGION ID(DeviceTestMaster.offDurationExpected_read) ENABLED START #
        """Return the offDurationExpected attribute."""
        return 0
        # PROTECTED REGION END #    //  DeviceTestMaster.offDurationExpected_read

    def write_offDurationExpected(self, value):
        # PROTECTED REGION ID(DeviceTestMaster.offDurationExpected_write) ENABLED START #
        """Set the offDurationExpected attribute."""
        pass
        # PROTECTED REGION END #    //  DeviceTestMaster.offDurationExpected_write

    def read_standbyDurationExpected(self):
        # PROTECTED REGION ID(DeviceTestMaster.standbyDurationExpected_read) ENABLED START #
        """Return the standbyDurationExpected attribute."""
        return 0
        # PROTECTED REGION END #    //  DeviceTestMaster.standbyDurationExpected_read

    def write_standbyDurationExpected(self, value):
        # PROTECTED REGION ID(DeviceTestMaster.standbyDurationExpected_write) ENABLED START #
        """Set the standbyDurationExpected attribute."""
        pass
        # PROTECTED REGION END #    //  DeviceTestMaster.standbyDurationExpected_write


    # --------
    # Commands
    # --------

    @command(
        dtype_in='DevVarStringArray',
        doc_in="If the array length is0, the command apllies to the whole\nCSP Element.\nIf the array length is > 1, each array element specifies the FQDN of the\nCSP SubElement to switch ON.",
    )
    @DebugIt()
    def On(self, argin):
        # PROTECTED REGION ID(DeviceTestMaster.On) ENABLED START #
        """
        Transit CSP or one or more CSP SubElements fromSTANDBY to ON

        :param argin: 'DevVarStringArray'
        
            If the array length is0, the command apllies to the whole
            CSP Element.
            If the array length is > 1, each array element specifies the FQDN of the
            CSP SubElement to switch ON.

        :return:None
        """
        pass
        # PROTECTED REGION END #    //  DeviceTestMaster.On

    @command(
        dtype_in='DevVarStringArray',
        doc_in="If the array length is0, the command apllies to the whole\nCSP Element.\nIf the array length is > 1, each array element specifies the FQDN of the\nCSP SubElement to switch OFF.",
    )
    @DebugIt()
    def Off(self, argin):
        # PROTECTED REGION ID(DeviceTestMaster.Off) ENABLED START #
        """
        Transit CSP or one or more CSP SubElements from ON to OFF.

        :param argin: 'DevVarStringArray'
        
            If the array length is0, the command apllies to the whole
            CSP Element.
            If the array length is > 1, each array element specifies the FQDN of the
            CSP SubElement to switch OFF.

        :return:None
        """
        pass
        # PROTECTED REGION END #    //  DeviceTestMaster.Off

    @command(
        dtype_in='DevVarStringArray',
        doc_in="If the array length is0, the command apllies to the whole\nCSP Element.\nIf the array length is > 1, each array element specifies the FQDN of the\nCSP SubElement to put in STANDBY mode.",
    )
    @DebugIt()
    def Standby(self, argin):
        # PROTECTED REGION ID(DeviceTestMaster.Standby) ENABLED START #
        """
        
            Transit CSP or one or more CSP SubElements from ON/OFF to 
            STANDBY.

        :param argin: 'DevVarStringArray'
        
            If the array length is0, the command apllies to the whole
            CSP Element.
            If the array length is > 1, each array element specifies the FQDN of the
            CSP SubElement to put in STANDBY mode.

        :return:None
        """
        pass
        # PROTECTED REGION END #    //  DeviceTestMaster.Standby

# ----------
# Run server
# ----------


def main(args=None, **kwargs):
    # PROTECTED REGION ID(DeviceTestMaster.main) ENABLED START #
    return run((DeviceTestMaster,), args=args, **kwargs)
    # PROTECTED REGION END #    //  DeviceTestMaster.main

if __name__ == '__main__':
    main()
+23 −61

File changed.

Preview size limit exceeded, changes collapsed.

+132 −0
Original line number Diff line number Diff line
# -*- coding: utf-8 -*-
#
# This file is part of the PssMasterSimulator project
#
# INAF-SKA Telescope
#
# Distributed under the terms of the GPL license.
# See LICENSE.txt for more info.

""" PssMaster Class simulator

"""

# PyTango imports
import tango
from tango import DebugIt
from tango.server import run
from tango.server import Device, DeviceMeta
from tango.server import attribute, command
from tango.server import device_property
from tango import AttrQuality, DispLevel, DevState
from tango import AttrWriteType, PipeWriteType
from DeviceTestMaster import DeviceTestMaster
# Additional import
# PROTECTED REGION ID(PssMasterSimulator.additionnal_import) ENABLED START #
# PROTECTED REGION END #    //  PssMasterSimulator.additionnal_import

__all__ = ["PssMasterSimulator", "main"]


class PssMasterSimulator(DeviceTestMaster):
    """

    **Properties:**

    - Device Property
    
    
    
    
    
    
    
    
    """
    __metaclass__ = DeviceMeta
    # PROTECTED REGION ID(PssMasterSimulator.class_variable) ENABLED START #
    # PROTECTED REGION END #    //  PssMasterSimulator.class_variable

    # -----------------
    # Device Properties
    # -----------------









    # ----------
    # Attributes
    # ----------























    # ---------------
    # General methods
    # ---------------

    def init_device(self):
        """Initialises the attributes and properties of the PssMasterSimulator."""
        DeviceTestMaster.init_device(self)
        # PROTECTED REGION ID(PssMasterSimulator.init_device) ENABLED START #
        # PROTECTED REGION END #    //  PssMasterSimulator.init_device

    def always_executed_hook(self):
        """Method always executed before any TANGO command is executed."""
        # PROTECTED REGION ID(PssMasterSimulator.always_executed_hook) ENABLED START #
        # PROTECTED REGION END #    //  PssMasterSimulator.always_executed_hook

    def delete_device(self):
        """Hook to delete resources allocated in init_device.

        This method allows for any memory or other resources allocated in the
        init_device method to be released.  This method is called by the device
        destructor and by the device Init command.
        """
        # PROTECTED REGION ID(PssMasterSimulator.delete_device) ENABLED START #
        # PROTECTED REGION END #    //  PssMasterSimulator.delete_device
    # ------------------
    # Attributes methods
    # ------------------


    # --------
    # Commands
    # --------

# ----------
# Run server
# ----------


def main(args=None, **kwargs):
    # PROTECTED REGION ID(PssMasterSimulator.main) ENABLED START #
    return run((PssMasterSimulator,), args=args, **kwargs)
    # PROTECTED REGION END #    //  PssMasterSimulator.main

if __name__ == '__main__':
    main()
Loading