Loading Common/Interfaces/MinorServoInterface/test/functional/_test_checkScan.py +21 −11 Original line number Diff line number Diff line Loading @@ -23,9 +23,10 @@ class CheckScanTest(unittest2.TestCase): else: client = PySimpleClient() self.boss = client.getComponent('MINORSERVO/Boss') if self.boss.getActualSetup() != code: self.boss.setup(code) t0 = datetime.now() while not self.boss.isReady() and (datetime.now() - t0).seconds < 60*5): while not self.boss.isReady() and (datetime.now() - t0).seconds < 60*5: time.sleep(2) self.assertTrue(self.boss.isReady()) # Timeout expired? Loading @@ -48,15 +49,24 @@ class CheckScanTest(unittest2.TestCase): axis=Management.MNG_TRACK, timeToStop=0) self.msInfo = MinorServo.TRunTimeParameters( # TODO: remove me startEpoch=0, onTheFly=False, centerScan=0, scanAxis='SRP_TZ', timeToStop=0 ) self.boss.setElevationTracking('OFF') self.boss.setASConfiguration('OFF') axes, units = self.boss.getAxesInfo() self.idx = axes.index('SRP_TZ') def testStartASAP(self): """Check the behavior when the scan must start as soon as possible""" center = self.boss.getAxesPosition(getTimeStamp().value)['SRP_TZ'] center = self.boss.getAxesPosition(getTimeStamp().value)[self.idx] startTime = 0 msInfo = MinorServo.TRunTimeParameters res = self.boss.checkScan(startTime, self.scan, self.antennaInfo, msInfo) res, msInfo = self.boss.checkScan(startTime, self.scan, self.antennaInfo) self.assertTrue(res) # Cannot know the startEpoch time... Loading @@ -67,7 +77,7 @@ class CheckScanTest(unittest2.TestCase): # Cannot know the timeToStop time... self.assertGreater(msInfo.timeToStop, getTimeStamp.now().value+50000000) def testStartAtGivenTime(self): def _testStartAtGivenTime(self): """Check the behavior when the scan must start at a given time""" center = self.boss.getAxesPosition(getTimeStamp().value)['SRP_TZ'] startTime = getTimeStamp().value + 60*10**7 # Start in a minute from now Loading @@ -82,7 +92,7 @@ class CheckScanTest(unittest2.TestCase): # Cannot know the timeToStop time... self.assertGreater(msInfo.timeToStop, getTimeStamp.now().value+50000000) def testTooFast(self): def _testTooFast(self): """The result must be False when the total time is not enough""" center = self.boss.getAxesPosition(getTimeStamp().value)['SRP_TZ'] startTime = getTimeStamp().value + 60*10**7 # Start in a minute from now Loading @@ -91,7 +101,7 @@ class CheckScanTest(unittest2.TestCase): res = self.boss.checkScan(startTime, self.scan, self.antennaInfo, msInfo) self.assertFalse(res) def testOutOfRange(self): def _testOutOfRange(self): """The result must be False in case of scan out of range""" center = self.boss.getAxesPosition(getTimeStamp().value)['SRP_TZ'] startTime = getTimeStamp().value + 60*10**7 # Start in a minute from now Loading @@ -100,7 +110,7 @@ class CheckScanTest(unittest2.TestCase): res = self.boss.checkScan(startTime, self.scan, self.antennaInfo, msInfo) self.assertFalse(res) # does it currently throw an exception? def testTooCloseToNow(self): def _testTooCloseToNow(self): """The result must be False when the starting time is too close to now""" center = self.boss.getAxesPosition(getTimeStamp().value)['SRP_TZ'] startTime = getTimeStamp().value + 2 # Start in 2 seconds from now Loading @@ -108,7 +118,7 @@ class CheckScanTest(unittest2.TestCase): res = self.boss.checkScan(startTime, self.scan, self.antennaInfo, msInfo) self.assertFalse(res) # does it currently throw an exception? def testStartASAPEmptyScan(self): def _testStartASAPEmptyScan(self): """An empty scan when that must start as soon as possible""" self.scan = MinorServo.MinorServoScan( range=0, Loading @@ -128,7 +138,7 @@ class CheckScanTest(unittest2.TestCase): self.assertEqual(msInfo.scanAxis, '') self.assertEqual(msInfo.timeToStop, 0) def testStartAtGivenTimeEmptyScan(self): def _testStartAtGivenTimeEmptyScan(self): """An empty scan that must start at a given time""" self.scan = MinorServo.MinorServoScan( range=0, Loading Common/Interfaces/MinorServoInterface/test/functional/test_checkEmptyScan.py 0 → 100644 +208 −0 Original line number Diff line number Diff line import math import time import os from datetime import datetime import unittest2 # https://pypi.python.org/pypi/unittest2 import Management import MinorServo import Antenna from Acspy.Common.TimeHelper import getTimeStamp from Acspy.Clients.SimpleClient import PySimpleClient from Acspy.Util import ACSCorba __author__ = "Marco Buttu <mbuttu@oa-cagliari.inaf.it>" class CheckEmptyScanTest(unittest2.TestCase): """Test the MinorServoBoss.checkScan() when is_empty_scan is True""" def setUp(self): code = 'KKG' if os.getenv('TARGETSYS') == 'SRT' else 'KKC' if hasattr(self, 'boss') and self.boss.isReady(): pass else: client = PySimpleClient() self.boss = client.getComponent('MINORSERVO/Boss') if self.boss.getActualSetup() != code: self.boss.setup(code) t0 = datetime.now() while not self.boss.isReady() and (datetime.now() - t0).seconds < 60*5: time.sleep(2) self.assertTrue(self.boss.isReady()) # Timeout expired? self.assertEqual(self.boss.getActualSetup(), code) self.scan = MinorServo.MinorServoScan( range=50, total_time=500000000, # 50 seconds axis_code='SRP_TZ', is_empty_scan=True) self.antennaInfo = Antenna.TRunTimeParameters( targetName='dummy', azimuth=math.pi, elevation=math.pi/8, startEpoch=getTimeStamp().value + 100000000, onTheFly=False, slewingTime=100000000, section=Antenna.ANT_SOUTH, axis=Management.MNG_TRACK, timeToStop=0) self.msInfo = MinorServo.TRunTimeParameters( # TODO: remove me startEpoch=0, onTheFly=False, centerScan=0, scanAxis='SRP_TZ', timeToStop=0 ) self.boss.setElevationTracking('OFF') self.boss.setASConfiguration('OFF') axes, units = self.boss.getAxesInfo() self.idx = axes.index('SRP_TZ') def test_start_ASAP_without_startTime(self): """The starting time is unknown and the scan must start ASAP""" startTime = 0 getPosition = getattr(self, 'get%sPosition' %os.getenv('TARGETSYS')) centerScanPosition = getPosition( self.boss.getActualSetup(), 'SRP', math.degrees(self.antennaInfo.elevation)) centerScan = centerScanPosition[self.idx] res, msInfo = self.boss.checkScan(startTime, self.scan, self.antennaInfo) self.assertTrue(res) self.assertAlmostEqual(msInfo.centerScan, centerScan, delta=0.01) self.assertFalse(msInfo.onTheFly) self.assertGreater(msInfo.startEpoch, getTimeStamp().value) self.assertEqual(msInfo.scanAxis, 'SRP_TZ') self.assertEqual( msInfo.timeToStop, msInfo.startEpoch + self.scan.total_time) def _testStartAtGivenTime(self): """Check the behavior when the scan must start at a given time""" center = self.boss.getAxesPosition(getTimeStamp().value)['SRP_TZ'] startTime = getTimeStamp().value + 60*10**7 # Start in a minute from now msInfo = MinorServo.TRunTimeParameters res = self.boss.checkScan(startTime, self.scan, self.antennaInfo, msInfo) self.assertTrue(res) self.assertEqual(msInfo.startEpoch, startTime) self.assertTrue(msInfo.onTheFly) self.assertEqual(msInfo.centerScan, center) self.assertEqual(msInfo.scanAxis, 'SRP_TZ') # Cannot know the timeToStop time... self.assertGreater(msInfo.timeToStop, getTimeStamp.now().value+50000000) def _testTooFast(self): """The result must be False when the total time is not enough""" center = self.boss.getAxesPosition(getTimeStamp().value)['SRP_TZ'] startTime = getTimeStamp().value + 60*10**7 # Start in a minute from now msInfo = MinorServo.TRunTimeParameters self.scan.total_time = 500000 # 50 ms res = self.boss.checkScan(startTime, self.scan, self.antennaInfo, msInfo) self.assertFalse(res) def _testOutOfRange(self): """The result must be False in case of scan out of range""" center = self.boss.getAxesPosition(getTimeStamp().value)['SRP_TZ'] startTime = getTimeStamp().value + 60*10**7 # Start in a minute from now msInfo = MinorServo.TRunTimeParameters self.scan.range = 5000 # 5 meters... res = self.boss.checkScan(startTime, self.scan, self.antennaInfo, msInfo) self.assertFalse(res) # does it currently throw an exception? def _testTooCloseToNow(self): """The result must be False when the starting time is too close to now""" center = self.boss.getAxesPosition(getTimeStamp().value)['SRP_TZ'] startTime = getTimeStamp().value + 2 # Start in 2 seconds from now msInfo = MinorServo.TRunTimeParameters res = self.boss.checkScan(startTime, self.scan, self.antennaInfo, msInfo) self.assertFalse(res) # does it currently throw an exception? def _testStartASAPEmptyScan(self): """An empty scan when that must start as soon as possible""" self.scan = MinorServo.MinorServoScan( range=0, total_time=0, axis_code='', is_empty_scan=True) center = self.boss.getAxesPosition(getTimeStamp().value)['SRP_TZ'] startTime = 0 msInfo = MinorServo.TRunTimeParameters res = self.boss.checkScan(startTime, self.scan, self.antennaInfo, msInfo) self.assertTrue(res) # Cannot know the startEpoch time... self.assertGreater(msInfo.startEpoch, getTimeStamp.now().value) self.assertFalse(msInfo.onTheFly) self.assertEqual(msInfo.centerScan, center) self.assertEqual(msInfo.scanAxis, '') self.assertEqual(msInfo.timeToStop, 0) def _testStartAtGivenTimeEmptyScan(self): """An empty scan that must start at a given time""" self.scan = MinorServo.MinorServoScan( range=0, total_time=0, axis_code='', is_empty_scan=True) center = self.boss.getAxesPosition(getTimeStamp().value)['SRP_TZ'] startTime = getTimeStamp().value + 60*10**7 # Start in a minute from now msInfo = MinorServo.TRunTimeParameters res = self.boss.checkScan(startTime, self.scan, self.antennaInfo, msInfo) self.assertTrue(res) self.assertEqual(msInfo.startEpoch, startTime) self.assertFalse(msInfo.onTheFly) self.assertEqual(msInfo.centerScan, center) self.assertEqual(msInfo.scanAxis, '') self.assertGreater(msInfo.timeToStop, 0) def getSRTPosition(self, conf_code, servo_name, elevation): """Return the servo position related to the elevation. Parameters: - conf_code: value returned by getActualSetup() (CCB, CCB_ASACTIVE,...) - servo_name: SRP, GFR, M3R, PFP - elevation: the antenna elevation, in degrees """ dal = ACSCorba.cdb() dao = dal.get_DAO_Servant('alma/MINORSERVO/Boss') body = dao.get_field_data(conf_code) configurations = body.strip().split('@') servos_conf = {} for conf in configurations: if conf: name, value = conf.split(':') servos_conf[name.strip()] = value.strip() srp_conf = servos_conf[servo_name] srp_items = [item.strip() for item in srp_conf.split(';')] srp_axes = [] for item in srp_items: if '=' in item: name, value = item.split('=') srp_axes.append(value.strip()) position = [] for axis in srp_axes: axis = axis.lstrip('(') axis = axis.rstrip(')') # At this point, axis is something like '-0.23, 0.01, 3.2' coeffs = [float(item) for item in axis.split(',')] value = 0 for idx, coeff in enumerate(coeffs): value += coeff * elevation**idx position.append(value) return position if __name__ == '__main__': unittest2.main() SRT/Servers/SRTMinorServo/include/MSBossConfiguration.h +8 −0 Original line number Diff line number Diff line Loading @@ -129,6 +129,14 @@ public: bool wasElevationTrackingEn; }; /** * Return a doubleSeq of positions to set * @param comp_name string component name * @param elevation the elevation (radians) of the antenna at the beginning of the scan * @return doubleSeq of positions to set */ ACS::doubleSeq getPositionFromElevation(string comp_name, double elevation) throw (ManagementErrors::ConfigurationErrorExImpl); /** * Return a doubleSeq of positions to set Loading SRT/Servers/SRTMinorServo/include/MinorServoBossImpl.h +16 −11 Original line number Diff line number Diff line Loading @@ -217,18 +217,18 @@ public: * Check if the scan is achievable (IDL interface) * * @param starting_time the time the scan will start * @param scan structure containing the description of the scan to be executed * @param msScanInfo structure containing the description of the scan to be executed * @param antennaInfo auxiliary information from the antenna * @param runTime auxiliary information computed at run time by the subsystem * @param msParameters auxiliary information computed at run time by the subsystem * * @return true if the scan is achievable * @throw ManagementErrors::ConfigurationErrorEx, ManagementErrors::SubscanErrorEx */ virtual CORBA::Boolean checkScan( const ACS::Time starting_time, const MinorServo::MinorServoScan & scan, const MinorServo::MinorServoScan & msScanInfo, const Antenna::TRunTimeParameters & antennaInfo, MinorServo::TRunTimeParameters_out runTime MinorServo::TRunTimeParameters_out msParameters ) throw (MinorServoErrors::MinorServoErrorsEx, ComponentErrors::ComponentErrorsEx); Loading @@ -236,9 +236,9 @@ public: * Check if the scan is achievable (implementation) * * @param starting_time the time the scan will start * @param range the total axis movement in mm (centered in the actual position) * @param total_time the duration of axis movement * @param axis_code the identification code of the axis * @param msScanInfo structure containing the description of the scan to be executed * @param antennaInfo auxiliary information from the antenna * @param msParameters auxiliary information computed at run time by the subsystem * * @return true if the scan is achievable * @throw MinorServoErrors::MinorServoErrorsEx, Loading @@ -246,9 +246,9 @@ public: */ virtual CORBA::Boolean checkScanImpl( const ACS::Time starting_time, double range, const ACS::Time total_time, const string axis_code const MinorServo::MinorServoScan & msScanInfo, const Antenna::TRunTimeParameters & antennaInfo, MinorServo::TRunTimeParameters_out msParameters ) throw (MinorServoErrors::MinorServoErrorsEx, ComponentErrors::ComponentErrorsEx); Loading Loading @@ -548,7 +548,12 @@ private: throw (MinorServoErrors::OperationNotPermittedEx); /** Return the minumun starting time **/ ACS::Time getMinScanStartingTime(double range, const string axis_code, double & acceleration, double & max_speed) ACS::Time getMinScanStartingTime( double & range, const string axis_code, const double elevation, double & acceleration, double & max_speed) throw (ManagementErrors::ConfigurationErrorExImpl, ManagementErrors::SubscanErrorExImpl); void operator=(const MinorServoBossImpl &); Loading SRT/Servers/SRTMinorServo/src/MSBossConfiguration.cpp +34 −17 Original line number Diff line number Diff line Loading @@ -5,6 +5,7 @@ using namespace IRA; using namespace std; static pthread_mutex_t init_mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t position_mutex = PTHREAD_MUTEX_INITIALIZER; MSBossConfiguration::MSBossConfiguration(maci::ContainerServices *Services, MinorServoBossImpl * bossImpl_ptr) { Loading Loading @@ -296,18 +297,15 @@ void MSBossConfiguration::setScan( } ACS::doubleSeq MSBossConfiguration::getPosition(string comp_name, ACS::Time time) throw (ManagementErrors::ConfigurationErrorExImpl) { double elevation = 45.0; double elevation = 45.0 / DR2D; // From decimal to radians double azimuth; if(isElevationTrackingEn()) { try { if(!CORBA::is_nil(m_antennaBoss)) { m_antennaBoss->getRawCoordinates(time, azimuth, elevation); elevation = elevation * DR2D; // From radians to decimal (be sure we want to use decimal...) elevation = (elevation > MIN_ELEVATION) ? elevation : 45.0; m_isElevationTracking = true; m_antennaBossError = false; } Loading Loading @@ -375,9 +373,20 @@ ACS::doubleSeq MSBossConfiguration::getPosition(string comp_name, ACS::Time time m_isElevationTracking = false; } } return getPositionFromElevation(comp_name, elevation); } ACS::doubleSeq MSBossConfiguration::getPositionFromElevation(string comp_name, double elevation) throw (ManagementErrors::ConfigurationErrorExImpl) { ACS::doubleSeq positions; if(m_servosCoefficients.count(comp_name)) { try { pthread_mutex_lock(&position_mutex); elevation = elevation * DR2D; // From radians to decimal (be sure we want to use decimal...) vector<vector<double> > vcoeff = m_servosCoefficients[comp_name]; positions.length(vcoeff.size()); size_t position_idx(0); Loading @@ -391,11 +400,19 @@ ACS::doubleSeq MSBossConfiguration::getPosition(string comp_name, ACS::Time time positions[position_idx] = axis_value; position_idx++; } pthread_mutex_unlock(&position_mutex); } catch (...) { pthread_mutex_unlock(&position_mutex); THROW_EX(ManagementErrors, ConfigurationErrorEx, "cannot compute the" + comp_name + "position", false); } } else { THROW_EX(ManagementErrors, ConfigurationErrorEx, comp_name + " has no coefficients", false); } return positions; } Loading
Common/Interfaces/MinorServoInterface/test/functional/_test_checkScan.py +21 −11 Original line number Diff line number Diff line Loading @@ -23,9 +23,10 @@ class CheckScanTest(unittest2.TestCase): else: client = PySimpleClient() self.boss = client.getComponent('MINORSERVO/Boss') if self.boss.getActualSetup() != code: self.boss.setup(code) t0 = datetime.now() while not self.boss.isReady() and (datetime.now() - t0).seconds < 60*5): while not self.boss.isReady() and (datetime.now() - t0).seconds < 60*5: time.sleep(2) self.assertTrue(self.boss.isReady()) # Timeout expired? Loading @@ -48,15 +49,24 @@ class CheckScanTest(unittest2.TestCase): axis=Management.MNG_TRACK, timeToStop=0) self.msInfo = MinorServo.TRunTimeParameters( # TODO: remove me startEpoch=0, onTheFly=False, centerScan=0, scanAxis='SRP_TZ', timeToStop=0 ) self.boss.setElevationTracking('OFF') self.boss.setASConfiguration('OFF') axes, units = self.boss.getAxesInfo() self.idx = axes.index('SRP_TZ') def testStartASAP(self): """Check the behavior when the scan must start as soon as possible""" center = self.boss.getAxesPosition(getTimeStamp().value)['SRP_TZ'] center = self.boss.getAxesPosition(getTimeStamp().value)[self.idx] startTime = 0 msInfo = MinorServo.TRunTimeParameters res = self.boss.checkScan(startTime, self.scan, self.antennaInfo, msInfo) res, msInfo = self.boss.checkScan(startTime, self.scan, self.antennaInfo) self.assertTrue(res) # Cannot know the startEpoch time... Loading @@ -67,7 +77,7 @@ class CheckScanTest(unittest2.TestCase): # Cannot know the timeToStop time... self.assertGreater(msInfo.timeToStop, getTimeStamp.now().value+50000000) def testStartAtGivenTime(self): def _testStartAtGivenTime(self): """Check the behavior when the scan must start at a given time""" center = self.boss.getAxesPosition(getTimeStamp().value)['SRP_TZ'] startTime = getTimeStamp().value + 60*10**7 # Start in a minute from now Loading @@ -82,7 +92,7 @@ class CheckScanTest(unittest2.TestCase): # Cannot know the timeToStop time... self.assertGreater(msInfo.timeToStop, getTimeStamp.now().value+50000000) def testTooFast(self): def _testTooFast(self): """The result must be False when the total time is not enough""" center = self.boss.getAxesPosition(getTimeStamp().value)['SRP_TZ'] startTime = getTimeStamp().value + 60*10**7 # Start in a minute from now Loading @@ -91,7 +101,7 @@ class CheckScanTest(unittest2.TestCase): res = self.boss.checkScan(startTime, self.scan, self.antennaInfo, msInfo) self.assertFalse(res) def testOutOfRange(self): def _testOutOfRange(self): """The result must be False in case of scan out of range""" center = self.boss.getAxesPosition(getTimeStamp().value)['SRP_TZ'] startTime = getTimeStamp().value + 60*10**7 # Start in a minute from now Loading @@ -100,7 +110,7 @@ class CheckScanTest(unittest2.TestCase): res = self.boss.checkScan(startTime, self.scan, self.antennaInfo, msInfo) self.assertFalse(res) # does it currently throw an exception? def testTooCloseToNow(self): def _testTooCloseToNow(self): """The result must be False when the starting time is too close to now""" center = self.boss.getAxesPosition(getTimeStamp().value)['SRP_TZ'] startTime = getTimeStamp().value + 2 # Start in 2 seconds from now Loading @@ -108,7 +118,7 @@ class CheckScanTest(unittest2.TestCase): res = self.boss.checkScan(startTime, self.scan, self.antennaInfo, msInfo) self.assertFalse(res) # does it currently throw an exception? def testStartASAPEmptyScan(self): def _testStartASAPEmptyScan(self): """An empty scan when that must start as soon as possible""" self.scan = MinorServo.MinorServoScan( range=0, Loading @@ -128,7 +138,7 @@ class CheckScanTest(unittest2.TestCase): self.assertEqual(msInfo.scanAxis, '') self.assertEqual(msInfo.timeToStop, 0) def testStartAtGivenTimeEmptyScan(self): def _testStartAtGivenTimeEmptyScan(self): """An empty scan that must start at a given time""" self.scan = MinorServo.MinorServoScan( range=0, Loading
Common/Interfaces/MinorServoInterface/test/functional/test_checkEmptyScan.py 0 → 100644 +208 −0 Original line number Diff line number Diff line import math import time import os from datetime import datetime import unittest2 # https://pypi.python.org/pypi/unittest2 import Management import MinorServo import Antenna from Acspy.Common.TimeHelper import getTimeStamp from Acspy.Clients.SimpleClient import PySimpleClient from Acspy.Util import ACSCorba __author__ = "Marco Buttu <mbuttu@oa-cagliari.inaf.it>" class CheckEmptyScanTest(unittest2.TestCase): """Test the MinorServoBoss.checkScan() when is_empty_scan is True""" def setUp(self): code = 'KKG' if os.getenv('TARGETSYS') == 'SRT' else 'KKC' if hasattr(self, 'boss') and self.boss.isReady(): pass else: client = PySimpleClient() self.boss = client.getComponent('MINORSERVO/Boss') if self.boss.getActualSetup() != code: self.boss.setup(code) t0 = datetime.now() while not self.boss.isReady() and (datetime.now() - t0).seconds < 60*5: time.sleep(2) self.assertTrue(self.boss.isReady()) # Timeout expired? self.assertEqual(self.boss.getActualSetup(), code) self.scan = MinorServo.MinorServoScan( range=50, total_time=500000000, # 50 seconds axis_code='SRP_TZ', is_empty_scan=True) self.antennaInfo = Antenna.TRunTimeParameters( targetName='dummy', azimuth=math.pi, elevation=math.pi/8, startEpoch=getTimeStamp().value + 100000000, onTheFly=False, slewingTime=100000000, section=Antenna.ANT_SOUTH, axis=Management.MNG_TRACK, timeToStop=0) self.msInfo = MinorServo.TRunTimeParameters( # TODO: remove me startEpoch=0, onTheFly=False, centerScan=0, scanAxis='SRP_TZ', timeToStop=0 ) self.boss.setElevationTracking('OFF') self.boss.setASConfiguration('OFF') axes, units = self.boss.getAxesInfo() self.idx = axes.index('SRP_TZ') def test_start_ASAP_without_startTime(self): """The starting time is unknown and the scan must start ASAP""" startTime = 0 getPosition = getattr(self, 'get%sPosition' %os.getenv('TARGETSYS')) centerScanPosition = getPosition( self.boss.getActualSetup(), 'SRP', math.degrees(self.antennaInfo.elevation)) centerScan = centerScanPosition[self.idx] res, msInfo = self.boss.checkScan(startTime, self.scan, self.antennaInfo) self.assertTrue(res) self.assertAlmostEqual(msInfo.centerScan, centerScan, delta=0.01) self.assertFalse(msInfo.onTheFly) self.assertGreater(msInfo.startEpoch, getTimeStamp().value) self.assertEqual(msInfo.scanAxis, 'SRP_TZ') self.assertEqual( msInfo.timeToStop, msInfo.startEpoch + self.scan.total_time) def _testStartAtGivenTime(self): """Check the behavior when the scan must start at a given time""" center = self.boss.getAxesPosition(getTimeStamp().value)['SRP_TZ'] startTime = getTimeStamp().value + 60*10**7 # Start in a minute from now msInfo = MinorServo.TRunTimeParameters res = self.boss.checkScan(startTime, self.scan, self.antennaInfo, msInfo) self.assertTrue(res) self.assertEqual(msInfo.startEpoch, startTime) self.assertTrue(msInfo.onTheFly) self.assertEqual(msInfo.centerScan, center) self.assertEqual(msInfo.scanAxis, 'SRP_TZ') # Cannot know the timeToStop time... self.assertGreater(msInfo.timeToStop, getTimeStamp.now().value+50000000) def _testTooFast(self): """The result must be False when the total time is not enough""" center = self.boss.getAxesPosition(getTimeStamp().value)['SRP_TZ'] startTime = getTimeStamp().value + 60*10**7 # Start in a minute from now msInfo = MinorServo.TRunTimeParameters self.scan.total_time = 500000 # 50 ms res = self.boss.checkScan(startTime, self.scan, self.antennaInfo, msInfo) self.assertFalse(res) def _testOutOfRange(self): """The result must be False in case of scan out of range""" center = self.boss.getAxesPosition(getTimeStamp().value)['SRP_TZ'] startTime = getTimeStamp().value + 60*10**7 # Start in a minute from now msInfo = MinorServo.TRunTimeParameters self.scan.range = 5000 # 5 meters... res = self.boss.checkScan(startTime, self.scan, self.antennaInfo, msInfo) self.assertFalse(res) # does it currently throw an exception? def _testTooCloseToNow(self): """The result must be False when the starting time is too close to now""" center = self.boss.getAxesPosition(getTimeStamp().value)['SRP_TZ'] startTime = getTimeStamp().value + 2 # Start in 2 seconds from now msInfo = MinorServo.TRunTimeParameters res = self.boss.checkScan(startTime, self.scan, self.antennaInfo, msInfo) self.assertFalse(res) # does it currently throw an exception? def _testStartASAPEmptyScan(self): """An empty scan when that must start as soon as possible""" self.scan = MinorServo.MinorServoScan( range=0, total_time=0, axis_code='', is_empty_scan=True) center = self.boss.getAxesPosition(getTimeStamp().value)['SRP_TZ'] startTime = 0 msInfo = MinorServo.TRunTimeParameters res = self.boss.checkScan(startTime, self.scan, self.antennaInfo, msInfo) self.assertTrue(res) # Cannot know the startEpoch time... self.assertGreater(msInfo.startEpoch, getTimeStamp.now().value) self.assertFalse(msInfo.onTheFly) self.assertEqual(msInfo.centerScan, center) self.assertEqual(msInfo.scanAxis, '') self.assertEqual(msInfo.timeToStop, 0) def _testStartAtGivenTimeEmptyScan(self): """An empty scan that must start at a given time""" self.scan = MinorServo.MinorServoScan( range=0, total_time=0, axis_code='', is_empty_scan=True) center = self.boss.getAxesPosition(getTimeStamp().value)['SRP_TZ'] startTime = getTimeStamp().value + 60*10**7 # Start in a minute from now msInfo = MinorServo.TRunTimeParameters res = self.boss.checkScan(startTime, self.scan, self.antennaInfo, msInfo) self.assertTrue(res) self.assertEqual(msInfo.startEpoch, startTime) self.assertFalse(msInfo.onTheFly) self.assertEqual(msInfo.centerScan, center) self.assertEqual(msInfo.scanAxis, '') self.assertGreater(msInfo.timeToStop, 0) def getSRTPosition(self, conf_code, servo_name, elevation): """Return the servo position related to the elevation. Parameters: - conf_code: value returned by getActualSetup() (CCB, CCB_ASACTIVE,...) - servo_name: SRP, GFR, M3R, PFP - elevation: the antenna elevation, in degrees """ dal = ACSCorba.cdb() dao = dal.get_DAO_Servant('alma/MINORSERVO/Boss') body = dao.get_field_data(conf_code) configurations = body.strip().split('@') servos_conf = {} for conf in configurations: if conf: name, value = conf.split(':') servos_conf[name.strip()] = value.strip() srp_conf = servos_conf[servo_name] srp_items = [item.strip() for item in srp_conf.split(';')] srp_axes = [] for item in srp_items: if '=' in item: name, value = item.split('=') srp_axes.append(value.strip()) position = [] for axis in srp_axes: axis = axis.lstrip('(') axis = axis.rstrip(')') # At this point, axis is something like '-0.23, 0.01, 3.2' coeffs = [float(item) for item in axis.split(',')] value = 0 for idx, coeff in enumerate(coeffs): value += coeff * elevation**idx position.append(value) return position if __name__ == '__main__': unittest2.main()
SRT/Servers/SRTMinorServo/include/MSBossConfiguration.h +8 −0 Original line number Diff line number Diff line Loading @@ -129,6 +129,14 @@ public: bool wasElevationTrackingEn; }; /** * Return a doubleSeq of positions to set * @param comp_name string component name * @param elevation the elevation (radians) of the antenna at the beginning of the scan * @return doubleSeq of positions to set */ ACS::doubleSeq getPositionFromElevation(string comp_name, double elevation) throw (ManagementErrors::ConfigurationErrorExImpl); /** * Return a doubleSeq of positions to set Loading
SRT/Servers/SRTMinorServo/include/MinorServoBossImpl.h +16 −11 Original line number Diff line number Diff line Loading @@ -217,18 +217,18 @@ public: * Check if the scan is achievable (IDL interface) * * @param starting_time the time the scan will start * @param scan structure containing the description of the scan to be executed * @param msScanInfo structure containing the description of the scan to be executed * @param antennaInfo auxiliary information from the antenna * @param runTime auxiliary information computed at run time by the subsystem * @param msParameters auxiliary information computed at run time by the subsystem * * @return true if the scan is achievable * @throw ManagementErrors::ConfigurationErrorEx, ManagementErrors::SubscanErrorEx */ virtual CORBA::Boolean checkScan( const ACS::Time starting_time, const MinorServo::MinorServoScan & scan, const MinorServo::MinorServoScan & msScanInfo, const Antenna::TRunTimeParameters & antennaInfo, MinorServo::TRunTimeParameters_out runTime MinorServo::TRunTimeParameters_out msParameters ) throw (MinorServoErrors::MinorServoErrorsEx, ComponentErrors::ComponentErrorsEx); Loading @@ -236,9 +236,9 @@ public: * Check if the scan is achievable (implementation) * * @param starting_time the time the scan will start * @param range the total axis movement in mm (centered in the actual position) * @param total_time the duration of axis movement * @param axis_code the identification code of the axis * @param msScanInfo structure containing the description of the scan to be executed * @param antennaInfo auxiliary information from the antenna * @param msParameters auxiliary information computed at run time by the subsystem * * @return true if the scan is achievable * @throw MinorServoErrors::MinorServoErrorsEx, Loading @@ -246,9 +246,9 @@ public: */ virtual CORBA::Boolean checkScanImpl( const ACS::Time starting_time, double range, const ACS::Time total_time, const string axis_code const MinorServo::MinorServoScan & msScanInfo, const Antenna::TRunTimeParameters & antennaInfo, MinorServo::TRunTimeParameters_out msParameters ) throw (MinorServoErrors::MinorServoErrorsEx, ComponentErrors::ComponentErrorsEx); Loading Loading @@ -548,7 +548,12 @@ private: throw (MinorServoErrors::OperationNotPermittedEx); /** Return the minumun starting time **/ ACS::Time getMinScanStartingTime(double range, const string axis_code, double & acceleration, double & max_speed) ACS::Time getMinScanStartingTime( double & range, const string axis_code, const double elevation, double & acceleration, double & max_speed) throw (ManagementErrors::ConfigurationErrorExImpl, ManagementErrors::SubscanErrorExImpl); void operator=(const MinorServoBossImpl &); Loading
SRT/Servers/SRTMinorServo/src/MSBossConfiguration.cpp +34 −17 Original line number Diff line number Diff line Loading @@ -5,6 +5,7 @@ using namespace IRA; using namespace std; static pthread_mutex_t init_mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t position_mutex = PTHREAD_MUTEX_INITIALIZER; MSBossConfiguration::MSBossConfiguration(maci::ContainerServices *Services, MinorServoBossImpl * bossImpl_ptr) { Loading Loading @@ -296,18 +297,15 @@ void MSBossConfiguration::setScan( } ACS::doubleSeq MSBossConfiguration::getPosition(string comp_name, ACS::Time time) throw (ManagementErrors::ConfigurationErrorExImpl) { double elevation = 45.0; double elevation = 45.0 / DR2D; // From decimal to radians double azimuth; if(isElevationTrackingEn()) { try { if(!CORBA::is_nil(m_antennaBoss)) { m_antennaBoss->getRawCoordinates(time, azimuth, elevation); elevation = elevation * DR2D; // From radians to decimal (be sure we want to use decimal...) elevation = (elevation > MIN_ELEVATION) ? elevation : 45.0; m_isElevationTracking = true; m_antennaBossError = false; } Loading Loading @@ -375,9 +373,20 @@ ACS::doubleSeq MSBossConfiguration::getPosition(string comp_name, ACS::Time time m_isElevationTracking = false; } } return getPositionFromElevation(comp_name, elevation); } ACS::doubleSeq MSBossConfiguration::getPositionFromElevation(string comp_name, double elevation) throw (ManagementErrors::ConfigurationErrorExImpl) { ACS::doubleSeq positions; if(m_servosCoefficients.count(comp_name)) { try { pthread_mutex_lock(&position_mutex); elevation = elevation * DR2D; // From radians to decimal (be sure we want to use decimal...) vector<vector<double> > vcoeff = m_servosCoefficients[comp_name]; positions.length(vcoeff.size()); size_t position_idx(0); Loading @@ -391,11 +400,19 @@ ACS::doubleSeq MSBossConfiguration::getPosition(string comp_name, ACS::Time time positions[position_idx] = axis_value; position_idx++; } pthread_mutex_unlock(&position_mutex); } catch (...) { pthread_mutex_unlock(&position_mutex); THROW_EX(ManagementErrors, ConfigurationErrorEx, "cannot compute the" + comp_name + "position", false); } } else { THROW_EX(ManagementErrors, ConfigurationErrorEx, comp_name + " has no coefficients", false); } return positions; }