Loading Common/Servers/PyDewarPositioner/src/DewarPositioner/DewarPositionerImpl.py +0 −1 Original line number Diff line number Diff line Loading @@ -359,7 +359,6 @@ class DewarPositionerImpl(POA, cc, services, lcycle): def checkUpdating(self, stime, axis, sector, az, el, ra, dec): print('In checkUpdating()') try: return self.positioner.checkUpdating( stime, axis, sector, az, el, ra, dec Loading Common/Servers/PyDewarPositioner/src/DewarPositioner/positioner.py +9 −9 Original line number Diff line number Diff line Loading @@ -16,6 +16,10 @@ from IRAPy import logger __docformat__ = 'restructuredtext' TOLERANCE = 2*10**7 # 2 seconds SAFETY_FACTOR = 1.2 class Positioner(object): generalLock = threading.Lock() rewindingLock = threading.Lock() Loading Loading @@ -189,27 +193,23 @@ class Positioner(object): if self.isRewinding(): # rdistance -> rewinding distance, rtime -> rewinding time rdistance = abs(self.getPosition() - self.getCmdPosition()) # Safety factor of 1.2 rtime_distance = round((1.2 * rdistance)/speed, 4) rtime_distance = round((SAFETY_FACTOR * rdistance)/speed, 4) rtime = getTimeStamp().value + int(rtime_distance * 10**7) # tdistance -> target distance, ttime -> target time # After rewinding, the derotator has to move to the target tdistance = abs(self.getCmdPosition() - target) # Safety factor of 1.2 ttime_distance = round((1.2 * tdistance)/speed, 4) ttime_distance = round((SAFETY_FACTOR * tdistance)/speed, 4) ttime = getTimeStamp().value + int(ttime_distance) future_time = rtime + ttime else: distance = abs(self.getPosition() - target) # Safety factor of 1.2 time_distance = round((1.2 * distance)/speed, 4) time_distance = round((SAFETY_FACTOR * distance)/speed, 4) future_time = getTimeStamp().value + int(time_distance * 10**7) tolerance = 2*10**7 # 2 seconds now = getTimeStamp().value if stime == 0 and (future_time - now) < tolerance: if stime == 0 and (future_time - now) < TOLERANCE: ready = True elif stime == 0 and (future_time - now) >= tolerance: elif stime == 0 and (future_time - now) >= TOLERANCE: ready = False elif stime >= future_time: ready = True Loading Common/Servers/PyDewarPositioner/test/functional/test_checkUpdating.py +20 −24 Original line number Diff line number Diff line Loading @@ -4,6 +4,7 @@ import time from Antenna import ANT_HORIZONTAL, ANT_NORTH, ANT_SOUTH from Management import MNG_TRACK, MNG_BEAMPARK from DewarPositioner.posgenerator import PosGenerator from DewarPositioner.positioner import TOLERANCE, SAFETY_FACTOR from Acspy.Clients.SimpleClient import PySimpleClient from Acspy.Common.TimeHelper import getTimeStamp Loading @@ -11,7 +12,7 @@ from ComponentErrors import ComponentErrorsEx class TestCheckUpdating(unittest.TestCase): """Test the DewarPositioner.startUpdating() end-to-end method""" """Test the DewarPositioner.checkUpdating() end-to-end method""" def setUp(self): self.client = PySimpleClient() self.device = self.client.getComponent('RECEIVERS/SRTKBandDerotator') Loading @@ -30,24 +31,17 @@ class TestCheckUpdating(unittest.TestCase): self.client.releaseComponent('RECEIVERS/DewarPositioner') def _test_system_not_ready(self): def test_system_not_ready(self): """Raise ComponentErrorsEx when the system is not ready""" with self.assertRaisesRegexp(ComponentErrorsEx, 'positioner not configured'): self.positioner.park() time.sleep(0.5) starting_time = getTimeStamp().value + 100000000 az, el, ra, dec = 0, 0, 0, 0 self.positioner.checkUpdating( starting_time, MNG_TRACK, ANT_SOUTH, az, el, ra, dec ) self.positioner.checkUpdating(0, MNG_TRACK, ANT_SOUTH, 0, 0, 0, 0) def _test_mng_beampark(self): def test_mng_beampark(self): """Return (True, starting_time) when axis is MNG_BEAMPARK""" starting_time = getTimeStamp().value + 100000000 starting_time = getTimeStamp().value + 10**7 # 1 sec in the future az, el, ra, dec = 0, 0, 0, 0 value, feasible_time = self.positioner.checkUpdating( starting_time, Loading @@ -61,9 +55,9 @@ class TestCheckUpdating(unittest.TestCase): def test_starting_time_0_feasible_time_less_than_tolerance(self): """We have starting_time == 0 and the system is ready (because it requires less than 2 seconds to reach the position). In this case it has to return True and a feasible_time less than current time plus the tolerance.""" it requires less than a 2 seconds TOLERANCE to reach the position). In this case it has to return True and a feasible_time less than current time plus the TOLERANCE.""" starting_time = 0 az, el, ra, dec = 0.6109, 0.6109, 0, 0 # 0.6109 -> 35 degrees self.antenna.setOffsets(az, el, ANT_HORIZONTAL) Loading @@ -80,14 +74,15 @@ class TestCheckUpdating(unittest.TestCase): ) self.assertEqual(value, True) self.assertGreater(feasible_time, timestamp) self.assertLess(feasible_time, timestamp + 10**7) # 1 sec self.assertLess(feasible_time, timestamp + TOLERANCE) def test_starting_time_0_feasible_time_greater_than_tolerance(self): """We have starting_time == 0 but the system is not ready (because it requires more than 2 seconds to reach the position). In this case it has to return False and a feasible_time greater or equal to the time required to reach the posizion.""" it requires more than 2 seconds TOLERANCE to reach the position). In this case it has to return False and a feasible_time greater than the time required to reach the posizion, and less than the required time moltiplied for a safety factor.""" starting_time = 0 az, el = 1.2217, 0.6109 # 70 degrees, 35 degrees p0 = 0 Loading @@ -105,7 +100,7 @@ class TestCheckUpdating(unittest.TestCase): self.assertEqual(value, False) minimum_time = timestamp + int(required_time) self.assertGreater(feasible_time, minimum_time) safety_time = int(1.3*minimum_time) safety_time = int(SAFETY_FACTOR * minimum_time) self.assertLess(feasible_time, safety_time) Loading @@ -130,7 +125,7 @@ class TestCheckUpdating(unittest.TestCase): ) self.assertEqual(value, True) self.assertLess(feasible_time, starting_time) self.assertGreater(feasible_time, required_time) self.assertGreater(feasible_time, timestamp) def test_starting_time_not_0_feasible_time_greater_than_starting_time(self): Loading @@ -144,7 +139,8 @@ class TestCheckUpdating(unittest.TestCase): speed = self.device.getSpeed() required_time = abs(p1 - p0)/speed * 10**7 timestamp = getTimeStamp().value # Starting time is 2 seconds more than the required time # The required_time does not add the SAFETY_FACTOR, so the # feasbile_time is greater than starting_time starting_time = timestamp + int(required_time) value, feasible_time = self.positioner.checkUpdating( starting_time, Loading @@ -154,7 +150,7 @@ class TestCheckUpdating(unittest.TestCase): ) self.assertEqual(value, False) self.assertGreater(feasible_time, starting_time) safety_time = timestamp + int(required_time*1.2) + 2*10**7 safety_time = timestamp + int(required_time * SAFETY_FACTOR) self.assertLess(feasible_time, safety_time) Loading Common/Servers/PyDewarPositioner/test/pyunit/positioner/test_startUpdating.py +12 −0 Original line number Diff line number Diff line Loading @@ -22,9 +22,11 @@ class PositionerStartUpdatingTest(unittest.TestCase): self.cdbconf.attributes['UpdatingTime'] = 0.1 self.p = Positioner(self.cdbconf) def tearDown(self): self.p.park() def test_alreadyUpdating(self): """The call to startUpdating() have to stop and start again a new updating""" self.cdbconf.setup('KKG') Loading @@ -43,6 +45,7 @@ class PositionerStartUpdatingTest(unittest.TestCase): finally: self.p.stopUpdating() def test_cannotSetPosition(self): """Cannot set position during an updating""" self.cdbconf.setup('KKG') Loading @@ -58,6 +61,7 @@ class PositionerStartUpdatingTest(unittest.TestCase): finally: self.p.stopUpdating() def test_do_nothing_with_axis_MNG_BEAMPARK(self): """Do nothing in case the axis is MNG_BEAMPARK""" self.cdbconf.setup('KKG') Loading @@ -73,6 +77,7 @@ class PositionerStartUpdatingTest(unittest.TestCase): finally: self.p.stopUpdating() def test_notYetConfigured(self): """Verify startUpdating()""" # startUpdating() raises NotAllowedError when the system is not configured Loading Loading @@ -110,6 +115,7 @@ class PositionerStartUpdatingTest(unittest.TestCase): self.cdbconf.UpdatingPosition['ANT_NORTH'] = [10, 'fooName'] # [position, functionName] self.assertRaises(PositionerError, self.p.startUpdating, axis, sector, 1, 1, None, None) def test_custom(self): self.cdbconf.setup('KKG') self.cdbconf.setConfiguration('CUSTOM') Loading @@ -135,6 +141,7 @@ class PositionerStartUpdatingTest(unittest.TestCase): finally: self.p.stopUpdating() def test_change_of_sign_negative_to_positive(self): "The parallactic angle do not have to change from -180 to 180 degrees" site_info = {'latitude': radians(39.49)} Loading @@ -153,6 +160,7 @@ class PositionerStartUpdatingTest(unittest.TestCase): self.assertLess(delta, 180) p0 = angle def test_change_of_sign_positive_to_negative(self): "The parallactic angle do not have to change from 180 to -180 degrees" site_info = {'latitude': radians(39.49)} Loading Loading @@ -296,6 +304,7 @@ class PositionerStartUpdatingTest(unittest.TestCase): finally: self.p.stopUpdating() def test_bsc(self): self.cdbconf.setup('KKG') self.cdbconf.setConfiguration('BSC') Loading @@ -322,6 +331,7 @@ class PositionerStartUpdatingTest(unittest.TestCase): finally: self.p.stopUpdating() def test_bsc_opt(self): self.cdbconf.setup('KKG') self.cdbconf.setConfiguration('BSC_OPT') Loading Loading @@ -349,6 +359,7 @@ class PositionerStartUpdatingTest(unittest.TestCase): finally: self.p.stopUpdating() def test_BSC_staticX(self): self.cdbconf.setup('KKG') self.cdbconf.setConfiguration('BSC') Loading @@ -375,6 +386,7 @@ class PositionerStartUpdatingTest(unittest.TestCase): finally: self.p.stopUpdating() def test_CUSTOM_staticX(self): self.cdbconf.setup('KKG') self.cdbconf.setConfiguration('CUSTOM') Loading Loading
Common/Servers/PyDewarPositioner/src/DewarPositioner/DewarPositionerImpl.py +0 −1 Original line number Diff line number Diff line Loading @@ -359,7 +359,6 @@ class DewarPositionerImpl(POA, cc, services, lcycle): def checkUpdating(self, stime, axis, sector, az, el, ra, dec): print('In checkUpdating()') try: return self.positioner.checkUpdating( stime, axis, sector, az, el, ra, dec Loading
Common/Servers/PyDewarPositioner/src/DewarPositioner/positioner.py +9 −9 Original line number Diff line number Diff line Loading @@ -16,6 +16,10 @@ from IRAPy import logger __docformat__ = 'restructuredtext' TOLERANCE = 2*10**7 # 2 seconds SAFETY_FACTOR = 1.2 class Positioner(object): generalLock = threading.Lock() rewindingLock = threading.Lock() Loading Loading @@ -189,27 +193,23 @@ class Positioner(object): if self.isRewinding(): # rdistance -> rewinding distance, rtime -> rewinding time rdistance = abs(self.getPosition() - self.getCmdPosition()) # Safety factor of 1.2 rtime_distance = round((1.2 * rdistance)/speed, 4) rtime_distance = round((SAFETY_FACTOR * rdistance)/speed, 4) rtime = getTimeStamp().value + int(rtime_distance * 10**7) # tdistance -> target distance, ttime -> target time # After rewinding, the derotator has to move to the target tdistance = abs(self.getCmdPosition() - target) # Safety factor of 1.2 ttime_distance = round((1.2 * tdistance)/speed, 4) ttime_distance = round((SAFETY_FACTOR * tdistance)/speed, 4) ttime = getTimeStamp().value + int(ttime_distance) future_time = rtime + ttime else: distance = abs(self.getPosition() - target) # Safety factor of 1.2 time_distance = round((1.2 * distance)/speed, 4) time_distance = round((SAFETY_FACTOR * distance)/speed, 4) future_time = getTimeStamp().value + int(time_distance * 10**7) tolerance = 2*10**7 # 2 seconds now = getTimeStamp().value if stime == 0 and (future_time - now) < tolerance: if stime == 0 and (future_time - now) < TOLERANCE: ready = True elif stime == 0 and (future_time - now) >= tolerance: elif stime == 0 and (future_time - now) >= TOLERANCE: ready = False elif stime >= future_time: ready = True Loading
Common/Servers/PyDewarPositioner/test/functional/test_checkUpdating.py +20 −24 Original line number Diff line number Diff line Loading @@ -4,6 +4,7 @@ import time from Antenna import ANT_HORIZONTAL, ANT_NORTH, ANT_SOUTH from Management import MNG_TRACK, MNG_BEAMPARK from DewarPositioner.posgenerator import PosGenerator from DewarPositioner.positioner import TOLERANCE, SAFETY_FACTOR from Acspy.Clients.SimpleClient import PySimpleClient from Acspy.Common.TimeHelper import getTimeStamp Loading @@ -11,7 +12,7 @@ from ComponentErrors import ComponentErrorsEx class TestCheckUpdating(unittest.TestCase): """Test the DewarPositioner.startUpdating() end-to-end method""" """Test the DewarPositioner.checkUpdating() end-to-end method""" def setUp(self): self.client = PySimpleClient() self.device = self.client.getComponent('RECEIVERS/SRTKBandDerotator') Loading @@ -30,24 +31,17 @@ class TestCheckUpdating(unittest.TestCase): self.client.releaseComponent('RECEIVERS/DewarPositioner') def _test_system_not_ready(self): def test_system_not_ready(self): """Raise ComponentErrorsEx when the system is not ready""" with self.assertRaisesRegexp(ComponentErrorsEx, 'positioner not configured'): self.positioner.park() time.sleep(0.5) starting_time = getTimeStamp().value + 100000000 az, el, ra, dec = 0, 0, 0, 0 self.positioner.checkUpdating( starting_time, MNG_TRACK, ANT_SOUTH, az, el, ra, dec ) self.positioner.checkUpdating(0, MNG_TRACK, ANT_SOUTH, 0, 0, 0, 0) def _test_mng_beampark(self): def test_mng_beampark(self): """Return (True, starting_time) when axis is MNG_BEAMPARK""" starting_time = getTimeStamp().value + 100000000 starting_time = getTimeStamp().value + 10**7 # 1 sec in the future az, el, ra, dec = 0, 0, 0, 0 value, feasible_time = self.positioner.checkUpdating( starting_time, Loading @@ -61,9 +55,9 @@ class TestCheckUpdating(unittest.TestCase): def test_starting_time_0_feasible_time_less_than_tolerance(self): """We have starting_time == 0 and the system is ready (because it requires less than 2 seconds to reach the position). In this case it has to return True and a feasible_time less than current time plus the tolerance.""" it requires less than a 2 seconds TOLERANCE to reach the position). In this case it has to return True and a feasible_time less than current time plus the TOLERANCE.""" starting_time = 0 az, el, ra, dec = 0.6109, 0.6109, 0, 0 # 0.6109 -> 35 degrees self.antenna.setOffsets(az, el, ANT_HORIZONTAL) Loading @@ -80,14 +74,15 @@ class TestCheckUpdating(unittest.TestCase): ) self.assertEqual(value, True) self.assertGreater(feasible_time, timestamp) self.assertLess(feasible_time, timestamp + 10**7) # 1 sec self.assertLess(feasible_time, timestamp + TOLERANCE) def test_starting_time_0_feasible_time_greater_than_tolerance(self): """We have starting_time == 0 but the system is not ready (because it requires more than 2 seconds to reach the position). In this case it has to return False and a feasible_time greater or equal to the time required to reach the posizion.""" it requires more than 2 seconds TOLERANCE to reach the position). In this case it has to return False and a feasible_time greater than the time required to reach the posizion, and less than the required time moltiplied for a safety factor.""" starting_time = 0 az, el = 1.2217, 0.6109 # 70 degrees, 35 degrees p0 = 0 Loading @@ -105,7 +100,7 @@ class TestCheckUpdating(unittest.TestCase): self.assertEqual(value, False) minimum_time = timestamp + int(required_time) self.assertGreater(feasible_time, minimum_time) safety_time = int(1.3*minimum_time) safety_time = int(SAFETY_FACTOR * minimum_time) self.assertLess(feasible_time, safety_time) Loading @@ -130,7 +125,7 @@ class TestCheckUpdating(unittest.TestCase): ) self.assertEqual(value, True) self.assertLess(feasible_time, starting_time) self.assertGreater(feasible_time, required_time) self.assertGreater(feasible_time, timestamp) def test_starting_time_not_0_feasible_time_greater_than_starting_time(self): Loading @@ -144,7 +139,8 @@ class TestCheckUpdating(unittest.TestCase): speed = self.device.getSpeed() required_time = abs(p1 - p0)/speed * 10**7 timestamp = getTimeStamp().value # Starting time is 2 seconds more than the required time # The required_time does not add the SAFETY_FACTOR, so the # feasbile_time is greater than starting_time starting_time = timestamp + int(required_time) value, feasible_time = self.positioner.checkUpdating( starting_time, Loading @@ -154,7 +150,7 @@ class TestCheckUpdating(unittest.TestCase): ) self.assertEqual(value, False) self.assertGreater(feasible_time, starting_time) safety_time = timestamp + int(required_time*1.2) + 2*10**7 safety_time = timestamp + int(required_time * SAFETY_FACTOR) self.assertLess(feasible_time, safety_time) Loading
Common/Servers/PyDewarPositioner/test/pyunit/positioner/test_startUpdating.py +12 −0 Original line number Diff line number Diff line Loading @@ -22,9 +22,11 @@ class PositionerStartUpdatingTest(unittest.TestCase): self.cdbconf.attributes['UpdatingTime'] = 0.1 self.p = Positioner(self.cdbconf) def tearDown(self): self.p.park() def test_alreadyUpdating(self): """The call to startUpdating() have to stop and start again a new updating""" self.cdbconf.setup('KKG') Loading @@ -43,6 +45,7 @@ class PositionerStartUpdatingTest(unittest.TestCase): finally: self.p.stopUpdating() def test_cannotSetPosition(self): """Cannot set position during an updating""" self.cdbconf.setup('KKG') Loading @@ -58,6 +61,7 @@ class PositionerStartUpdatingTest(unittest.TestCase): finally: self.p.stopUpdating() def test_do_nothing_with_axis_MNG_BEAMPARK(self): """Do nothing in case the axis is MNG_BEAMPARK""" self.cdbconf.setup('KKG') Loading @@ -73,6 +77,7 @@ class PositionerStartUpdatingTest(unittest.TestCase): finally: self.p.stopUpdating() def test_notYetConfigured(self): """Verify startUpdating()""" # startUpdating() raises NotAllowedError when the system is not configured Loading Loading @@ -110,6 +115,7 @@ class PositionerStartUpdatingTest(unittest.TestCase): self.cdbconf.UpdatingPosition['ANT_NORTH'] = [10, 'fooName'] # [position, functionName] self.assertRaises(PositionerError, self.p.startUpdating, axis, sector, 1, 1, None, None) def test_custom(self): self.cdbconf.setup('KKG') self.cdbconf.setConfiguration('CUSTOM') Loading @@ -135,6 +141,7 @@ class PositionerStartUpdatingTest(unittest.TestCase): finally: self.p.stopUpdating() def test_change_of_sign_negative_to_positive(self): "The parallactic angle do not have to change from -180 to 180 degrees" site_info = {'latitude': radians(39.49)} Loading @@ -153,6 +160,7 @@ class PositionerStartUpdatingTest(unittest.TestCase): self.assertLess(delta, 180) p0 = angle def test_change_of_sign_positive_to_negative(self): "The parallactic angle do not have to change from 180 to -180 degrees" site_info = {'latitude': radians(39.49)} Loading Loading @@ -296,6 +304,7 @@ class PositionerStartUpdatingTest(unittest.TestCase): finally: self.p.stopUpdating() def test_bsc(self): self.cdbconf.setup('KKG') self.cdbconf.setConfiguration('BSC') Loading @@ -322,6 +331,7 @@ class PositionerStartUpdatingTest(unittest.TestCase): finally: self.p.stopUpdating() def test_bsc_opt(self): self.cdbconf.setup('KKG') self.cdbconf.setConfiguration('BSC_OPT') Loading Loading @@ -349,6 +359,7 @@ class PositionerStartUpdatingTest(unittest.TestCase): finally: self.p.stopUpdating() def test_BSC_staticX(self): self.cdbconf.setup('KKG') self.cdbconf.setConfiguration('BSC') Loading @@ -375,6 +386,7 @@ class PositionerStartUpdatingTest(unittest.TestCase): finally: self.p.stopUpdating() def test_CUSTOM_staticX(self): self.cdbconf.setup('KKG') self.cdbconf.setConfiguration('CUSTOM') Loading