Commit 2323e321 authored by Elisabetta Giani's avatar Elisabetta Giani
Browse files

k8s-configuration: added plot to show CSP and CBF obsState transitions in the same

window.
parent a47fb4c9
Loading
Loading
Loading
Loading
+138 −0
Original line number Diff line number Diff line
from threading import Thread
import tango
import time
import collections
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from matplotlib.widgets import Button
#import struct
from tango import EventType
 
 
class attributePlot:
    def __init__(self, plotLength = 100, device='mid_csp/elt/subarray_01'):
        self.plotMaxLength = plotLength
        self.data = collections.deque([0] * plotLength, maxlen=plotLength)
        self.cbf_data = collections.deque([0] * plotLength, maxlen=plotLength)
        self.test_number = 0
        self.previous_test_num = 0
        self.plotTimer = 0
        self.previousTimer = 0
        self.proxy = 0
        self.proxy_cbf = 0
        self.device = device
        self.obs_state = 0
        self.cbf_obs_state = 0
        self.anim = None
        try:
            self.proxy = tango.DeviceProxy("mid_csp/elt/subarray_01")
            self.cbf_proxy = tango.DeviceProxy("mid_csp_cbf/sub_elt/subarray_01")
            self.proxy.testNumber = 0
            print('Connected to device {}'.format(device))
        except:
            print("Failed to connect to device {}".format(device) )

    def readAttributeStart(self):
        if self.proxy:
            self.proxy.subscribe_event("obsState", tango.EventType.CHANGE_EVENT,
                                      self.attributes_change_evt_cb,
                                      stateless=False)
            self.proxy.subscribe_event("testNumber", tango.EventType.CHANGE_EVENT,
                                      self.attributes_change_evt_cb,
                                      stateless=False)
        if self.cbf_proxy:
            self.cbf_proxy.subscribe_event("obsState", tango.EventType.CHANGE_EVENT,
                                      self.attributes_change_evt_cb,
                                      stateless=False)

    def getAttributeData(self, frame, lines, cbf_lines, lineValueText, lineLabel, timeText):
        currentTimer = time.perf_counter()
        self.plotTimer = int((currentTimer - self.previousTimer) * 1000)     # the first reading will be erroneous
        self.previousTimer = currentTimer
        #timeText.set_text('Plot Interval = ' + str(self.plotTimer) + 'ms')
        self.data.append(self.obs_state)    # we get the latest data point and append it to our array
        self.cbf_data.append(self.cbf_obs_state)  
        lines.set_data(range(self.plotMaxLength), self.data)
        cbf_lines.set_data(range(self.plotMaxLength), self.cbf_data)
        #lineValueText.set_text('[' + lineLabel + '] = ' + str(self.obs_state))
        if self.test_number != self.previous_test_num:
            lineValueText.set_text('[Test number] = ' + str(self.test_number))

    def attributes_change_evt_cb(self, evt):
        dev_name = evt.device.dev_name()
        if not evt.err:
            try: 
                if evt.attr_value.name.lower() == "obsstate": 
                    if 'cbf' in dev_name:
                        self.cbf_obs_state = evt.attr_value.value
                        self.cbf_data.append(self.cbf_obs_state)
                        self.cbf_data.append(self.cbf_obs_state)
                        self.data.append(self.obs_state) 
                        self.data.append(self.obs_state)
                    else :
                        self.obs_state = evt.attr_value.value
                        # we get the latest data point and append it to our array
                        self.data.append(self.obs_state) 
                        self.data.append(self.obs_state)
                        self.cbf_data.append(self.cbf_obs_state)
                        self.cbf_data.append(self.cbf_obs_state)
                if evt.attr_value.name.lower() == "testnumber": 
                    self.previous_test_num = self.test_number
                    self.test_number = evt.attr_value.value
                    #print("Received event on {}/{}: {}".format(dev_name, 
                    #                                      str(evt.attr_value.name),
                    #                                      str(evt.attr_value.value)))
            except tango.DevFailed as df:
                self.logger.error(str(df.args[0].desc))
            except Exception as except_occurred:
                self.logger.error(str(except_occurred))
        else:
            for item in evt.errors:
                if item.reason == "API_EventTimeout":
                    print("API_EventTimeout")
    def on_press(self, event): 
        if event.key == 'x':
            self.anim.event_source.stop()
        if event.key == 'z':
            self.anim.event_source.start()
        if event.key == 'escape':
            exit()

def main():
    maxPlotLength = 200
    s = attributePlot(maxPlotLength, 'mid_csp/elt/subarray_01')   # initializes all required variables
    s.readAttributeStart()                                               # starts background thread
 
    # plotting starts below
    pltInterval = 300    # Period at which the plot animation updates [ms]
    xmin = 0
    xmax = maxPlotLength
    ymin = -1 
    ymax = 12
    fig = plt.figure("MID CSP Subarray ADR-8 transitions")
    fig.set_size_inches(18.5, 8.5)
    ax = plt.axes(xlim=(xmin, xmax), ylim=(ymin, ymax ))
    plt.grid()
    ax.set_title('obsState real-time graph')
    ax.set_xlabel("time")
    ax.set_ylabel("obsState")
    plt.yticks(range(11), ('EMPTY', 'RESOURCING', 'IDLE', 'CONFIGURING', 'READY', 'SCANNING', 'ABORTING', 'ABORTED', 'RESETTING', 'FAULT', 'RESTARTING'))
 
    lineLabel = 'CSP obsState'
    cbflineLabel = 'CBF obsState'
    timeText = ax.text(0.50, 0.95, '', transform=ax.transAxes)
    lines = ax.plot([], [], label=lineLabel)[0]
    lineValueText = ax.text(0.50, 0.90, '', transform=ax.transAxes)
    cbf_lines = ax.plot([], [], 'r-', label=cbflineLabel)[0]
    fig.canvas.mpl_connect('key_press_event', s.on_press)
    #s.anim = animation.FuncAnimation(fig, s.getAttributeData, fargs=(lines, lineValueText, lineLabel, timeText), interval=pltInterval)    # fargs has to be a tuple
    s.anim = animation.FuncAnimation(fig, s.getAttributeData, fargs=(lines, cbf_lines, lineValueText, lineLabel, ''), interval=pltInterval)    # fargs has to be a tuple
    plt.legend(loc="upper left")
    plt.show()
    exit()
 
 
 
if __name__ == '__main__':
    main()