Loading Common/Servers/PyDewarPositioner/src/DewarPositioner/DewarPositionerImpl.py +5 −3 Original line number Diff line number Diff line Loading @@ -359,6 +359,7 @@ class DewarPositionerImpl(POA, cc, services, lcycle): def checkUpdating(self, stime, axis, sector, az, el, ra, dec): print('In checkUpdating()') try: return self.positioner.checkUpdating( stime, axis, sector, az, el, ra, dec Loading @@ -374,9 +375,10 @@ class DewarPositionerImpl(POA, cc, services, lcycle): exc.setReason(ex.message) raise exc.getComponentErrorsEx() except Exception, ex: logger.logError(ex.message) exc = ComponentErrorsImpl.UnexpectedExImpl(ex.message) exc.setReason(ex.message) reason = ex.getReason() if hasattr(ex, 'getReason') else ex.message exc = ComponentErrorsImpl.OperationErrorExImpl() logger.logError(reason) exc.setReason(reason) raise exc.getComponentErrorsEx() Loading Common/Servers/PyDewarPositioner/src/DewarPositioner/positioner.py +7 −3 Original line number Diff line number Diff line Loading @@ -185,10 +185,10 @@ class Positioner(object): future_time = 0 ready = True speed = self.device.getSpeed() if self.isRewinding(): # rdistance -> rewinding distance, rtime -> rewinding time rdistance = abs(self.getPosition() - self.getCmdPosition()) speed = self.device.getSpeed() # Safety factor of 1.2 rtime_distance = round((1.2 * rdistance)/speed, 4) rtime = getTimeStamp().value + int(rtime_distance * 10**7) Loading @@ -203,10 +203,14 @@ class Positioner(object): distance = abs(self.getPosition() - target) # Safety factor of 1.2 time_distance = round((1.2 * distance)/speed, 4) future_time = getTimeStamp().value + int(time_distance) future_time = getTimeStamp().value + int(time_distance * 10**7) if stime == 0 and (future_time - getTimeStamp().value) < 3*10**7: # 3 seconds tolerance = 2*10**7 # 2 seconds now = getTimeStamp().value if stime == 0 and (future_time - now) < tolerance: ready = True elif stime == 0 and (future_time - now) >= tolerance: ready = False elif stime >= future_time: ready = True else: Loading Common/Servers/PyDewarPositioner/test/functional/test_checkUpdating.py +105 −3 Original line number Diff line number Diff line from __future__ import with_statement from __future__ import with_statement, division import unittest import time from Antenna import ANT_HORIZONTAL, ANT_NORTH, ANT_SOUTH from Management import MNG_TRACK, MNG_BEAMPARK from DewarPositioner.posgenerator import PosGenerator from Acspy.Clients.SimpleClient import PySimpleClient from Acspy.Common.TimeHelper import getTimeStamp Loading @@ -13,6 +14,8 @@ class TestCheckUpdating(unittest.TestCase): """Test the DewarPositioner.startUpdating() end-to-end method""" def setUp(self): self.client = PySimpleClient() self.device = self.client.getComponent('RECEIVERS/SRTKBandDerotator') self.antenna = self.client.getComponent('ANTENNA/Boss') self.positioner = self.client.getComponent('RECEIVERS/DewarPositioner') self.positioner.setup('KKG') self.positioner.setConfiguration('custom') Loading @@ -27,7 +30,7 @@ class TestCheckUpdating(unittest.TestCase): self.client.releaseComponent('RECEIVERS/DewarPositioner') def test_system_not_ready(self): def _test_system_not_ready(self): """Raise ComponentErrorsEx when the system is not ready""" with self.assertRaisesRegexp(ComponentErrorsEx, 'positioner not configured'): self.positioner.park() Loading @@ -42,7 +45,7 @@ class TestCheckUpdating(unittest.TestCase): ) def test_system_not_ready(self): def _test_mng_beampark(self): """Return (True, starting_time) when axis is MNG_BEAMPARK""" starting_time = getTimeStamp().value + 100000000 az, el, ra, dec = 0, 0, 0, 0 Loading @@ -56,6 +59,105 @@ class TestCheckUpdating(unittest.TestCase): self.assertEqual(feasible_time, starting_time) def test_starting_time_0_feasible_time_less_than_tolerance(self): """We have starting_time == 0 and the system is ready (because it requires less than 2 seconds to reach the position). In this case it has to return True and a feasible_time less than current time plus the tolerance.""" starting_time = 0 az, el, ra, dec = 0.6109, 0.6109, 0, 0 # 0.6109 -> 35 degrees self.antenna.setOffsets(az, el, ANT_HORIZONTAL) self.positioner.startUpdating(MNG_TRACK, ANT_NORTH, az, el, ra, dec) time.sleep(0.5) self.positioner.stopUpdating() time.sleep(0.5) timestamp = getTimeStamp().value value, feasible_time = self.positioner.checkUpdating( starting_time, MNG_TRACK, ANT_NORTH, az, el, ra, dec ) self.assertEqual(value, True) self.assertGreater(feasible_time, timestamp) self.assertLess(feasible_time, timestamp + 10**7) # 1 sec def test_starting_time_0_feasible_time_greater_than_tolerance(self): """We have starting_time == 0 but the system is not ready (because it requires more than 2 seconds to reach the position). In this case it has to return False and a feasible_time greater or equal to the time required to reach the posizion.""" starting_time = 0 az, el = 1.2217, 0.6109 # 70 degrees, 35 degrees p0 = 0 self.positioner.setPosition(p0) p1 = PosGenerator.getParallacticAngle(self.lat, az, el) # -63 degrees speed = self.device.getSpeed() required_time = abs(p1 - p0)/speed * 10**7 timestamp = getTimeStamp().value value, feasible_time = self.positioner.checkUpdating( starting_time, MNG_TRACK, ANT_NORTH, az, el, 0, 0 ) self.assertEqual(value, False) minimum_time = timestamp + int(required_time) self.assertGreater(feasible_time, minimum_time) safety_time = int(1.3*minimum_time) self.assertLess(feasible_time, safety_time) def test_starting_time_not_0_feasible_time_less_than_starting_time(self): """We have starting_time greater than 0 and feasible_time less than starting_time. In this case it has to return True and a feasible_time greater than now and less than starting_time.""" az, el = 1.2217, 0.6109 # 70 degrees, 35 degrees p0 = 0 self.positioner.setPosition(p0) p1 = PosGenerator.getParallacticAngle(self.lat, az, el) # -63 degrees speed = self.device.getSpeed() required_time = abs(p1 - p0)/speed * 10**7 timestamp = getTimeStamp().value # Starting time is 2 seconds more than the required time starting_time = timestamp + int(required_time*1.2) + 2*10**7 value, feasible_time = self.positioner.checkUpdating( starting_time, MNG_TRACK, ANT_NORTH, az, el, 0, 0 ) self.assertEqual(value, True) self.assertLess(feasible_time, starting_time) self.assertGreater(feasible_time, required_time) def test_starting_time_not_0_feasible_time_greater_than_starting_time(self): """We have starting_time greater than 0 and feasible_time greater than starting_time. In this case it has to return False and a feasible_time greater than starting_time.""" az, el = 1.2217, 0.6109 # 70 degrees, 35 degrees p0 = 0 self.positioner.setPosition(p0) p1 = PosGenerator.getParallacticAngle(self.lat, az, el) # -63 degrees speed = self.device.getSpeed() required_time = abs(p1 - p0)/speed * 10**7 timestamp = getTimeStamp().value # Starting time is 2 seconds more than the required time starting_time = timestamp + int(required_time) value, feasible_time = self.positioner.checkUpdating( starting_time, MNG_TRACK, ANT_NORTH, az, el, 0, 0 ) self.assertEqual(value, False) self.assertGreater(feasible_time, starting_time) safety_time = timestamp + int(required_time*1.2) + 2*10**7 self.assertLess(feasible_time, safety_time) if __name__ == '__main__': unittest.main() Loading
Common/Servers/PyDewarPositioner/src/DewarPositioner/DewarPositionerImpl.py +5 −3 Original line number Diff line number Diff line Loading @@ -359,6 +359,7 @@ class DewarPositionerImpl(POA, cc, services, lcycle): def checkUpdating(self, stime, axis, sector, az, el, ra, dec): print('In checkUpdating()') try: return self.positioner.checkUpdating( stime, axis, sector, az, el, ra, dec Loading @@ -374,9 +375,10 @@ class DewarPositionerImpl(POA, cc, services, lcycle): exc.setReason(ex.message) raise exc.getComponentErrorsEx() except Exception, ex: logger.logError(ex.message) exc = ComponentErrorsImpl.UnexpectedExImpl(ex.message) exc.setReason(ex.message) reason = ex.getReason() if hasattr(ex, 'getReason') else ex.message exc = ComponentErrorsImpl.OperationErrorExImpl() logger.logError(reason) exc.setReason(reason) raise exc.getComponentErrorsEx() Loading
Common/Servers/PyDewarPositioner/src/DewarPositioner/positioner.py +7 −3 Original line number Diff line number Diff line Loading @@ -185,10 +185,10 @@ class Positioner(object): future_time = 0 ready = True speed = self.device.getSpeed() if self.isRewinding(): # rdistance -> rewinding distance, rtime -> rewinding time rdistance = abs(self.getPosition() - self.getCmdPosition()) speed = self.device.getSpeed() # Safety factor of 1.2 rtime_distance = round((1.2 * rdistance)/speed, 4) rtime = getTimeStamp().value + int(rtime_distance * 10**7) Loading @@ -203,10 +203,14 @@ class Positioner(object): distance = abs(self.getPosition() - target) # Safety factor of 1.2 time_distance = round((1.2 * distance)/speed, 4) future_time = getTimeStamp().value + int(time_distance) future_time = getTimeStamp().value + int(time_distance * 10**7) if stime == 0 and (future_time - getTimeStamp().value) < 3*10**7: # 3 seconds tolerance = 2*10**7 # 2 seconds now = getTimeStamp().value if stime == 0 and (future_time - now) < tolerance: ready = True elif stime == 0 and (future_time - now) >= tolerance: ready = False elif stime >= future_time: ready = True else: Loading
Common/Servers/PyDewarPositioner/test/functional/test_checkUpdating.py +105 −3 Original line number Diff line number Diff line from __future__ import with_statement from __future__ import with_statement, division import unittest import time from Antenna import ANT_HORIZONTAL, ANT_NORTH, ANT_SOUTH from Management import MNG_TRACK, MNG_BEAMPARK from DewarPositioner.posgenerator import PosGenerator from Acspy.Clients.SimpleClient import PySimpleClient from Acspy.Common.TimeHelper import getTimeStamp Loading @@ -13,6 +14,8 @@ class TestCheckUpdating(unittest.TestCase): """Test the DewarPositioner.startUpdating() end-to-end method""" def setUp(self): self.client = PySimpleClient() self.device = self.client.getComponent('RECEIVERS/SRTKBandDerotator') self.antenna = self.client.getComponent('ANTENNA/Boss') self.positioner = self.client.getComponent('RECEIVERS/DewarPositioner') self.positioner.setup('KKG') self.positioner.setConfiguration('custom') Loading @@ -27,7 +30,7 @@ class TestCheckUpdating(unittest.TestCase): self.client.releaseComponent('RECEIVERS/DewarPositioner') def test_system_not_ready(self): def _test_system_not_ready(self): """Raise ComponentErrorsEx when the system is not ready""" with self.assertRaisesRegexp(ComponentErrorsEx, 'positioner not configured'): self.positioner.park() Loading @@ -42,7 +45,7 @@ class TestCheckUpdating(unittest.TestCase): ) def test_system_not_ready(self): def _test_mng_beampark(self): """Return (True, starting_time) when axis is MNG_BEAMPARK""" starting_time = getTimeStamp().value + 100000000 az, el, ra, dec = 0, 0, 0, 0 Loading @@ -56,6 +59,105 @@ class TestCheckUpdating(unittest.TestCase): self.assertEqual(feasible_time, starting_time) def test_starting_time_0_feasible_time_less_than_tolerance(self): """We have starting_time == 0 and the system is ready (because it requires less than 2 seconds to reach the position). In this case it has to return True and a feasible_time less than current time plus the tolerance.""" starting_time = 0 az, el, ra, dec = 0.6109, 0.6109, 0, 0 # 0.6109 -> 35 degrees self.antenna.setOffsets(az, el, ANT_HORIZONTAL) self.positioner.startUpdating(MNG_TRACK, ANT_NORTH, az, el, ra, dec) time.sleep(0.5) self.positioner.stopUpdating() time.sleep(0.5) timestamp = getTimeStamp().value value, feasible_time = self.positioner.checkUpdating( starting_time, MNG_TRACK, ANT_NORTH, az, el, ra, dec ) self.assertEqual(value, True) self.assertGreater(feasible_time, timestamp) self.assertLess(feasible_time, timestamp + 10**7) # 1 sec def test_starting_time_0_feasible_time_greater_than_tolerance(self): """We have starting_time == 0 but the system is not ready (because it requires more than 2 seconds to reach the position). In this case it has to return False and a feasible_time greater or equal to the time required to reach the posizion.""" starting_time = 0 az, el = 1.2217, 0.6109 # 70 degrees, 35 degrees p0 = 0 self.positioner.setPosition(p0) p1 = PosGenerator.getParallacticAngle(self.lat, az, el) # -63 degrees speed = self.device.getSpeed() required_time = abs(p1 - p0)/speed * 10**7 timestamp = getTimeStamp().value value, feasible_time = self.positioner.checkUpdating( starting_time, MNG_TRACK, ANT_NORTH, az, el, 0, 0 ) self.assertEqual(value, False) minimum_time = timestamp + int(required_time) self.assertGreater(feasible_time, minimum_time) safety_time = int(1.3*minimum_time) self.assertLess(feasible_time, safety_time) def test_starting_time_not_0_feasible_time_less_than_starting_time(self): """We have starting_time greater than 0 and feasible_time less than starting_time. In this case it has to return True and a feasible_time greater than now and less than starting_time.""" az, el = 1.2217, 0.6109 # 70 degrees, 35 degrees p0 = 0 self.positioner.setPosition(p0) p1 = PosGenerator.getParallacticAngle(self.lat, az, el) # -63 degrees speed = self.device.getSpeed() required_time = abs(p1 - p0)/speed * 10**7 timestamp = getTimeStamp().value # Starting time is 2 seconds more than the required time starting_time = timestamp + int(required_time*1.2) + 2*10**7 value, feasible_time = self.positioner.checkUpdating( starting_time, MNG_TRACK, ANT_NORTH, az, el, 0, 0 ) self.assertEqual(value, True) self.assertLess(feasible_time, starting_time) self.assertGreater(feasible_time, required_time) def test_starting_time_not_0_feasible_time_greater_than_starting_time(self): """We have starting_time greater than 0 and feasible_time greater than starting_time. In this case it has to return False and a feasible_time greater than starting_time.""" az, el = 1.2217, 0.6109 # 70 degrees, 35 degrees p0 = 0 self.positioner.setPosition(p0) p1 = PosGenerator.getParallacticAngle(self.lat, az, el) # -63 degrees speed = self.device.getSpeed() required_time = abs(p1 - p0)/speed * 10**7 timestamp = getTimeStamp().value # Starting time is 2 seconds more than the required time starting_time = timestamp + int(required_time) value, feasible_time = self.positioner.checkUpdating( starting_time, MNG_TRACK, ANT_NORTH, az, el, 0, 0 ) self.assertEqual(value, False) self.assertGreater(feasible_time, starting_time) safety_time = timestamp + int(required_time*1.2) + 2*10**7 self.assertLess(feasible_time, safety_time) if __name__ == '__main__': unittest.main()