Commit f026d4c1 authored by Marco Buttu's avatar Marco Buttu
Browse files

checkScan() tests passed

parent 722a56f7
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -6,7 +6,7 @@ import subprocess

def run(test_case):
    # Use the simulators and the testing CDB
    server_name = 'mscu' if os.getenv('TARGETSYS') == 'SRT' else ''
    server_name = 'mscu' if test_case.telescope == 'SRT' else ''
    if not server_name:
        sys.exit(0) 

+253 −0
Original line number Diff line number Diff line
from __future__ import with_statement
import math
import time
import os
import random
from datetime import datetime

import unittest2 # https://pypi.python.org/pypi/unittest2
import Management
import MinorServo
import Antenna

from MinorServoErrors import MinorServoErrorsEx
from Acspy.Common.TimeHelper import getTimeStamp
from Acspy.Clients.SimpleClient import PySimpleClient
from Acspy.Util import ACSCorba

from PyMinorServoTest import simunittest


__author__ = "Marco Buttu <mbuttu@oa-cagliari.inaf.it>"

class CheckEmptyScanTest(unittest2.TestCase):
    """Test the MinorServoBoss.checkScan() when is_empty_scan is True"""

class CheckScanTest(unittest2.TestCase):

    telescope = os.getenv('TARGETSYS')

    @classmethod
    def setUpClass(cls):
        cls.client = PySimpleClient()
        cls.boss = cls.client.getComponent('MINORSERVO/Boss')
        
    @classmethod
    def tearDownClass(cls):
        cls.client.releaseComponent('MINORSERVO/Boss')

    def setUp(self):    
        code = 'KKG' if os.getenv('TARGETSYS') == 'SRT' else 'KKC'
        if hasattr(self, 'boss') and self.boss.isReady():
            pass
        else:
            client = PySimpleClient()
            self.boss = client.getComponent('MINORSERVO/Boss')
        code = 'KKG' if self.telescope == 'SRT' else 'KKC'

        # Wait (maximum one minute) in case the boss is parking
        if self.boss.isParking():
            t0 = datetime.now()
            while self.boss.isParking() and (datetime.now() - t0).seconds < 60:
                time.sleep(2)
            if self.boss.isParking():
                self.fail('The system can not exit form a parking state')

        if self.boss.getActualSetup() != code:
            self.boss.setup(code)
            # Wait (maximum 5 minutes) in case the boss is starting
            t0 = datetime.now()
            while not self.boss.isReady() and (datetime.now() - t0).seconds < 60*5:
                time.sleep(2)
            self.assertTrue(self.boss.isReady()) # Timeout expired?

        self.assertEqual(self.boss.getActualSetup(), code)

        self.scan = MinorServo.MinorServoScan(
            range=50,
            total_time=500000000, # 50 seconds
            axis_code='SRP_TZ',
            is_empty_scan=True)
            if not self.boss.isReady():
                self.fail('The system is not ready for executing the tests')

        self.antennaInfo = Antenna.TRunTimeParameters(
            targetName='dummy',
            azimuth=math.pi,
            elevation=math.pi/8,
            elevation=math.pi/2 * 1/random.randrange(2, 10),
            startEpoch=getTimeStamp().value + 100000000,
            onTheFly=False,
            slewingTime=100000000,
@@ -50,120 +65,102 @@ class CheckEmptyScanTest(unittest2.TestCase):
            axis=Management.MNG_TRACK,
            timeToStop=0)

        self.msInfo = MinorServo.TRunTimeParameters( # TODO: remove me
            startEpoch=0,
            onTheFly=False,
            centerScan=0,
            scanAxis='SRP_TZ',
            timeToStop=0
        )
        self.scan = MinorServo.MinorServoScan(
            range=50,
            total_time=500000000, # 50 seconds
            axis_code='SRP_TZ',
            is_empty_scan=True)

        self.boss.setElevationTracking('OFF')
        self.boss.setASConfiguration('OFF')
        axes, units = self.boss.getAxesInfo()
        self.idx = axes.index('SRP_TZ')

    def test_start_ASAP_without_startTime(self):
        """The starting time is unknown and the scan must start ASAP"""
        startTime = 0
        getPosition = getattr(self, 'get%sPosition' %os.getenv('TARGETSYS'))
        getPosition = getattr(self, 'get%sPosition' %self.telescope)
        centerScanPosition = getPosition(
                self.boss.getActualSetup(), 
                'SRP', 
                math.degrees(self.antennaInfo.elevation))
        centerScan = centerScanPosition[self.idx]
        self.centerScan = centerScanPosition[self.idx]

    def test_start_ASAP_without_startTime(self):
        """The starting time is unknown and the scan must start ASAP"""
        startTime = 0

        res, msInfo = self.boss.checkScan(startTime, self.scan, self.antennaInfo)
        self.assertTrue(res)
        self.assertAlmostEqual(msInfo.centerScan, centerScan, delta=0.01)
        self.assertFalse(msInfo.onTheFly)
        self.assertAlmostEqual(msInfo.centerScan, self.centerScan, delta=0.01)
        self.assertGreater(msInfo.startEpoch, getTimeStamp().value)
        self.assertEqual(msInfo.scanAxis, 'SRP_TZ')
        self.assertEqual(
                msInfo.timeToStop, 
                msInfo.startEpoch + self.scan.total_time)

    def _testStartAtGivenTime(self):
        """Check the behavior when the scan must start at a given time"""
        center = self.boss.getAxesPosition(getTimeStamp().value)['SRP_TZ']
        startTime = getTimeStamp().value + 60*10**7 # Start in a minute from now
        msInfo = MinorServo.TRunTimeParameters
        res = self.boss.checkScan(startTime, self.scan, self.antennaInfo, msInfo)
    def test_not_empty_scan_start_ASAP_without_startTime(self):
        """Scan not empty: starting time unknown, the scan must start ASAP"""
        self.scan.is_empty_scan = False
        startTime = 0

        res, msInfo = self.boss.checkScan(startTime, self.scan, self.antennaInfo)
        self.assertTrue(res)
        self.assertEqual(msInfo.startEpoch, startTime)
        self.assertAlmostEqual(msInfo.centerScan, self.centerScan, delta=0.01)
        self.assertTrue(msInfo.onTheFly)
        self.assertEqual(msInfo.centerScan, center)
        self.assertGreater(msInfo.startEpoch, getTimeStamp().value)
        self.assertEqual(msInfo.scanAxis, 'SRP_TZ')
        # Cannot know the timeToStop time...
        self.assertGreater(msInfo.timeToStop, getTimeStamp.now().value+50000000)

    def _testTooFast(self):
        """The result must be False when the total time is not enough"""
        center = self.boss.getAxesPosition(getTimeStamp().value)['SRP_TZ']
        startTime = getTimeStamp().value + 60*10**7 # Start in a minute from now
        msInfo = MinorServo.TRunTimeParameters
        self.scan.total_time = 500000 # 50 ms
        res = self.boss.checkScan(startTime, self.scan, self.antennaInfo, msInfo)
        self.assertFalse(res)
        self.assertEqual(
                msInfo.timeToStop, 
                msInfo.startEpoch + self.scan.total_time)

    def _testOutOfRange(self):
        """The result must be False in case of scan out of range"""
        center = self.boss.getAxesPosition(getTimeStamp().value)['SRP_TZ']
        startTime = getTimeStamp().value + 60*10**7 # Start in a minute from now
        msInfo = MinorServo.TRunTimeParameters
        self.scan.range = 5000 # 5 meters...
        res = self.boss.checkScan(startTime, self.scan, self.antennaInfo, msInfo)
        self.assertFalse(res) # does it currently throw an exception?

    def _testTooCloseToNow(self):
        """The result must be False when the starting time is too close to now"""
        center = self.boss.getAxesPosition(getTimeStamp().value)['SRP_TZ']
        startTime = getTimeStamp().value + 2 # Start in 2 seconds from now
        msInfo = MinorServo.TRunTimeParameters
        res = self.boss.checkScan(startTime, self.scan, self.antennaInfo, msInfo)
        self.assertFalse(res) # does it currently throw an exception?

    def _testStartASAPEmptyScan(self):
        """An empty scan when that must start as soon as possible"""
        self.scan = MinorServo.MinorServoScan(
            range=0,
            total_time=0,
            axis_code='',
            is_empty_scan=True)
        center = self.boss.getAxesPosition(getTimeStamp().value)['SRP_TZ']
        startTime = 0
        msInfo = MinorServo.TRunTimeParameters
        res = self.boss.checkScan(startTime, self.scan, self.antennaInfo, msInfo)
    def test_start_ASAP_at_given_time(self):
        """The starting time is known and the scan must start ASAP"""
        startTime = getTimeStamp().value + 60*10**7 # Start in a minute 

        res, msInfo = self.boss.checkScan(startTime, self.scan, self.antennaInfo)
        self.assertTrue(res)
        # Cannot know the startEpoch time...
        self.assertGreater(msInfo.startEpoch, getTimeStamp.now().value)
        self.assertAlmostEqual(msInfo.centerScan, self.centerScan, delta=0.01)
        self.assertFalse(msInfo.onTheFly)
        self.assertEqual(msInfo.centerScan, center)
        self.assertEqual(msInfo.scanAxis, '')
        self.assertEqual(msInfo.timeToStop, 0)
        self.assertEqual(msInfo.startEpoch, startTime)
        self.assertEqual(msInfo.scanAxis, 'SRP_TZ')
        self.assertEqual(
                msInfo.timeToStop, 
                msInfo.startEpoch + self.scan.total_time)

    def _testStartAtGivenTimeEmptyScan(self):
        """An empty scan that must start at a given time"""
        self.scan = MinorServo.MinorServoScan(
            range=0,
            total_time=0,
            axis_code='',
            is_empty_scan=True)
        center = self.boss.getAxesPosition(getTimeStamp().value)['SRP_TZ']
        startTime = getTimeStamp().value + 60*10**7 # Start in a minute from now
        msInfo = MinorServo.TRunTimeParameters
        res = self.boss.checkScan(startTime, self.scan, self.antennaInfo, msInfo)
    def test_not_empty_scan_start_ASAP_at_given_time(self):
        """Scan not empty: starting time known, the scan must start ASAP"""
        self.scan.is_empty_scan = False
        startTime = getTimeStamp().value + 60*10**7 # Start in a minute 

        res, msInfo = self.boss.checkScan(startTime, self.scan, self.antennaInfo)
        self.assertTrue(res)
        self.assertAlmostEqual(msInfo.centerScan, self.centerScan, delta=0.01)
        self.assertTrue(msInfo.onTheFly)
        self.assertEqual(msInfo.startEpoch, startTime)
        self.assertFalse(msInfo.onTheFly)
        self.assertEqual(msInfo.centerScan, center)
        self.assertEqual(msInfo.scanAxis, '')
        self.assertGreater(msInfo.timeToStop, 0)
        self.assertEqual(msInfo.scanAxis, 'SRP_TZ')
        self.assertEqual(
                msInfo.timeToStop, 
                msInfo.startEpoch + self.scan.total_time)

    def test_scan_too_fast(self):
        """Servo not enought fast for accomplishing the scan in total_time"""
        startTime = getTimeStamp().value + 60*10**7 # Start in a minute from now
        self.scan.total_time = 5000000 # 0.5 seconds
        res, msInfo = self.boss.checkScan(startTime, self.scan, self.antennaInfo)
        self.assertFalse(res)

    def test_out_of_range(self):
        """The scan goes out of the servo position limits"""
        startTime = 0
        self.scan.range = 1000 # 1 meter
        res, msInfo = self.boss.checkScan(startTime, self.scan, self.antennaInfo)
        self.assertFalse(res) 

    def test_start_time_too_close_to_now(self):
        """The starting time is too close to the current time"""
        startTime = getTimeStamp().value + 1 # Start in 1 second from now
        res, msInfo = self.boss.checkScan(startTime, self.scan, self.antennaInfo)
        self.assertFalse(res)

    def getSRTPosition(self, conf_code, servo_name, elevation):
    def getSRTPosition(self, conf_code, servo_name, elevation=45):
        """Return the servo position related to the elevation.

        Parameters:
@@ -181,6 +178,9 @@ class CheckEmptyScanTest(unittest2.TestCase):
               name, value = conf.split(':')
               servos_conf[name.strip()] = value.strip()
        
        # Example of servo_conf: 
        #   >>> servos_conf['PFP'] 
        #   'RY(mm)=(-25.75); TX(mm)=(458); TZ(mm)=(-46.2);'
        srp_conf = servos_conf[servo_name]
        srp_items = [item.strip() for item in srp_conf.split(';')]
        srp_axes = []
@@ -189,6 +189,9 @@ class CheckEmptyScanTest(unittest2.TestCase):
               name, value = item.split('=')
               srp_axes.append(value.strip())

        # Example of srp_axes:
        #   >>> srp_axes
        #   ['(-25.75)', '(458)', '(-46.2)']
        position = []
        for axis in srp_axes:
            axis = axis.lstrip('(')
@@ -198,11 +201,53 @@ class CheckEmptyScanTest(unittest2.TestCase):
            value = 0
            for idx, coeff in enumerate(coeffs):
                value += coeff * elevation**idx
            # Value: -25.75*elevation**0 + 485*elevation**1 -46.2*elevation**2
            position.append(value)
        return position


if __name__ == '__main__':
    unittest2.main()
class CheckScanInterfaceTest(unittest2.TestCase):

    telescope = os.getenv('TARGETSYS')

    @classmethod
    def setUpClass(cls):
        cls.client = PySimpleClient()
        cls.boss = cls.client.getComponent('MINORSERVO/Boss')
        if cls.boss.isReady():
            self.fail('Somone has forgot to release the component...')
        
    @classmethod
    def tearDownClass(cls):
        cls.client.releaseComponent('MINORSERVO/Boss')

    def setUp(self):
        self.antennaInfo = Antenna.TRunTimeParameters(
            targetName='dummy',
            azimuth=math.pi,
            elevation=math.pi/2 * 1/random.randrange(2, 10),
            startEpoch=getTimeStamp().value + 100000000,
            onTheFly=False,
            slewingTime=100000000,
            section=Antenna.ANT_SOUTH,
            axis=Management.MNG_TRACK,
            timeToStop=0)

        self.scan = MinorServo.MinorServoScan(
            range=50,
            total_time=500000000, # 50 seconds
            axis_code='SRP_TZ',
            is_empty_scan=True)

    def test_system_not_ready(self):
        """Raise a MinorServoErrorsEx in case the system is not ready"""
        with self.assertRaises(MinorServoErrorsEx):
            t = self.boss.checkScan(0, self.scan, self.antennaInfo) 


if __name__ == '__main__':
    if 'Configuration' in os.getenv('ACS_CDB'):
        unittest2.main() # Real test using the antenna CDB
    else:
        simunittest.run(CheckScanTest)
        simunittest.run(CheckScanInterfaceTest)
+0 −52
Original line number Diff line number Diff line
from __future__ import with_statement

import os
import math
import unittest2

import MinorServo
import Management
import Antenna

from PyMinorServoTest import simunittest
from Acspy.Clients.SimpleClient import PySimpleClient
from MinorServoErrors import MinorServoErrorsEx
from Acspy.Common.TimeHelper import getTimeStamp

__author__ = "Marco Buttu <mbuttu@oa-cagliari.inaf.it>"

class TestCheckScanInterface(unittest2.TestCase):
    """checkScan() must raise the expected exceptions"""

    def setUp(self):
        client = PySimpleClient()
        self.boss = client.getComponent('MINORSERVO/Boss')

        self.scan = MinorServo.MinorServoScan(
            range=50,
            total_time=500000000, # 50 seconds
            axis_code='SRP_TZ',
            is_empty_scan=False)

        self.antennaInfo = Antenna.TRunTimeParameters(
            targetName='dummy',
            azimuth=math.pi,
            elevation=math.pi/8,
            startEpoch=getTimeStamp().value + 100000000,
            onTheFly=False,
            slewingTime=100000000,
            section=Antenna.ANT_SOUTH,
            axis=Management.MNG_TRACK,
            timeToStop=0)

    def test_not_ready(self):
        """Raise a MinorServoErrorsEx in case the system is not ready"""
        with self.assertRaises(MinorServoErrorsEx):
            t = self.boss.checkScan(0, self.scan, self.antennaInfo) 

if __name__ == '__main__':
    if 'Configuration' in os.getenv('ACS_CDB'):
        unittest2.main() # Real test using the antenna CDB
    else:
        simunittest.run(TestCheckScanInterface)