Loading Common/Servers/PyDewarPositioner/src/DewarPositioner/DewarPositionerImpl.py +7 −0 Original line number Diff line number Diff line Loading @@ -180,6 +180,7 @@ class DewarPositionerImpl(POA, cc, services, lcycle): exc.setData('Reason', reason) raise exc.getComponentErrorsEx() def park(self): logger.logNotice('parking the derotator') try: Loading @@ -204,6 +205,11 @@ class DewarPositionerImpl(POA, cc, services, lcycle): logger.logNotice('derotator parked') def clearSource(self): logger.logNotice('cleaning the parallacting angle sign') self.positioner._clearSign() def getPosition(self): try: return self.positioner.getPosition() Loading Loading @@ -273,6 +279,7 @@ class DewarPositionerImpl(POA, cc, services, lcycle): exc.setData('Reason', ex.message) raise exc.getComponentErrorsEx() def getCmdPosition(self): try: return self.positioner.getCmdPosition() Loading Common/Servers/PyDewarPositioner/src/DewarPositioner/posgenerator.py +20 −8 Original line number Diff line number Diff line Loading @@ -2,6 +2,7 @@ import datetime import time from math import sin, cos, tan, atan2, degrees import numpy from IRAPy import logger from Acspy.Common.TimeHelper import getTimeStamp Loading @@ -26,7 +27,7 @@ class PosGenerator(object): # TODO: refactoring required, in order to put all the parallactic and # galacticParallactic common code in one place def parallactic(self, source, siteInfo): def parallactic(self, source, siteInfo, initial_sign=None): """Return the parallactic angle""" try: latitude = siteInfo['latitude'] Loading @@ -43,7 +44,8 @@ class PosGenerator(object): t = getTimeStamp().value + 1*10*6 # 100 ms in the future coordinates = source.getApparentCoordinates(t) # Values in radians az, el = coordinates[:2] # The first two elements are (az, el) position = PosGenerator.getParallacticAngle(latitude, az, el) position = PosGenerator.getParallacticAngle( latitude, az, el, initial_sign) yield position last_zerodiv_time = datetime.datetime.now() except ZeroDivisionError: Loading @@ -64,7 +66,7 @@ class PosGenerator(object): logger.logNotice('%s: %s' %(raeson, ex.message)) raise PosGeneratorError(raeson) def galacticParallactic(self, source, siteInfo): def galacticParallactic(self, source, siteInfo, initial_sign=None): """Return the galactic parallactic angle""" try: latitude = siteInfo['latitude'] Loading @@ -81,7 +83,8 @@ class PosGenerator(object): t = getTimeStamp().value + 1*10*6 # 100 ms in the future coordinates = source.getApparentCoordinates(t) # Values in radians az, el, ra, dec = coordinates[:4] pg = PosGenerator.getGalacticParallacticAngle(latitude, az, el, ra, dec) pg = PosGenerator.getGalacticParallacticAngle( latitude, az, el, ra, dec, initial_sign) yield pg last_zerodiv_time = datetime.datetime.now() except ZeroDivisionError: Loading @@ -103,10 +106,19 @@ class PosGenerator(object): raise PosGeneratorError(raeson) @staticmethod def getParallacticAngle(latitude, az, el): def getParallacticAngle(latitude, az, el, initial_sign=None): """Arguments in radians""" p = atan2(-sin(az), tan(latitude)*cos(el) - sin(el)*cos(az)) return degrees(p) p = degrees(p) # Remember the sign of the first scan of the map sign_p = int(numpy.sign(p)) if initial_sign is None or (sign_p == initial_sign) or (sign_p == 0): angle = p elif initial_sign == -1: angle = initial_sign * (180 + 180%p) elif initial_sign == +1: angle = initial_sign * (180 + p%180) return angle @staticmethod def getGalacticAngle(ra, dec): Loading @@ -121,9 +133,9 @@ class PosGenerator(object): return degrees(g) @staticmethod def getGalacticParallacticAngle(latitude, az, el, ra, dec): def getGalacticParallacticAngle(latitude, az, el, ra, dec, initial_sign): """Arguments in radians""" p = PosGenerator.getParallacticAngle(latitude, az, el) p = PosGenerator.getParallacticAngle(latitude, az, el, initial_sign) g = PosGenerator.getGalacticAngle(ra, dec) return p + g Loading Common/Servers/PyDewarPositioner/src/DewarPositioner/positioner.py +15 −4 Original line number Diff line number Diff line Loading @@ -54,6 +54,7 @@ class Positioner(object): self.device = device self.device.setup() self._clearOffset() self._clearSign() self.is_setup = True time.sleep(0.4) # Give the device the time to accomplish the setup self.control.updateScanInfo({'iStaticPos': setupPosition}) Loading Loading @@ -159,6 +160,8 @@ class Positioner(object): raise NotAllowedError('no source available') elif str(sector) not in sectors: raise NotAllowedError('sector %s not in %s' %(sector, sectors)) elif self.isRewinding(): raise NotAllowedError('the positioner is rewinding') else: if self.isUpdating(): self.stopUpdating() Loading @@ -183,19 +186,23 @@ class Positioner(object): else: posgen = getattr(self.posgen, functionName) angle_mapping = self.posgen.mapping[functionName] getAngleFunction = self.posgen.mapping[functionName]['getAngleFunction'] coordinateFrame = self.posgen.mapping[functionName]['coordinateFrame'] getAngleFunction = angle_mapping['getAngleFunction'] coordinateFrame = angle_mapping['coordinateFrame'] lat = self.siteInfo['latitude'] try: if coordinateFrame == 'horizontal': iParallacticPos = getAngleFunction(lat, az, el) iParallacticPos = getAngleFunction(lat, az, el, self.sign) elif coordinateFrame == 'equatorial': iParallacticPos = getAngleFunction(lat, az, el, ra, dec) iParallacticPos = getAngleFunction(lat, az, el, ra, dec, self.sign) else: raise PositionerError('coordinate frame %s unknown' %coordinateFrame) except ZeroDivisionError: raise NotAllowedError('zero division error computing p(%.2f, %.2f)' %(az, el)) # Remember the sign of the first scan of the map if self.sign is not None: self.sign = int(numpy.sign(iParallacticPos)) self.control.setScanInfo( axis=axis, sector=sector, Loading Loading @@ -670,12 +677,16 @@ class Positioner(object): finally: Positioner.generalLock.release() def _clearSign(self): self.sign = None def _setDefault(self): self.t = None self.is_setup = False self.control = Control() self.conf.clearConfiguration() self._clearOffset() self._clearSign() class Control(object): Loading Common/Servers/PyDewarPositioner/test/pyunit/positioner/test_offset.py +3 −3 Original line number Diff line number Diff line Loading @@ -29,7 +29,7 @@ class PositionerOffsetTest(unittest.TestCase): self.p.park() time.sleep(0.2) def _test_set_get(self): def test_set_get(self): """Verify the set and get methods""" # Not allowed when the system is not yet configured self.assertRaises(NotAllowedError, self.p.setOffset, 2) Loading @@ -40,7 +40,7 @@ class PositionerOffsetTest(unittest.TestCase): self.p.clearOffset() self.assertEqual(self.p.getOffset(), 0) def _test_set_new_pos(self): def test_set_new_pos(self): """Vefify the setOffset set a new position.""" self.p.setup(siteInfo={}, source=None, device=self.device) time.sleep(0.3) if self.using_mock else time.sleep(3) Loading Loading @@ -81,7 +81,7 @@ class PositionerOffsetTest(unittest.TestCase): self.p.startUpdating('MNG_TRACK', 'ANT_NORTH', az, el, None, None) time.sleep(0.2) if self.using_mock else time.sleep(3) self.p.setOffset(offset) time.sleep(0.2) if self.using_mock else time.sleep(3) time.sleep(0.2) if self.using_mock else time.sleep(5) self.assertEqual(self.device.getActPosition(), expected) Loading Common/Servers/PyDewarPositioner/test/pyunit/positioner/test_startUpdating.py +51 −0 Original line number Diff line number Diff line Loading @@ -135,6 +135,41 @@ class PositionerStartUpdatingTest(unittest.TestCase): finally: self.p.stopUpdating() def test_change_of_sign_negative_to_positive(self): "The parallactic angle do not have to change from -180 to 180 degrees" site_info = {'latitude': radians(39.49)} posgen = PosGenerator() gen = posgen.parallactic(self.source, site_info, initial_sign=-1) azs = [radians(i) for i in (40, 20, 0, 360, 340, 320)] els = [radians(i) for i in (81, 83, 85, 85, 83, 81)] p0 = None for az, el in zip(azs, els): self.source.setAzimuth(az) self.source.setElevation(el) angle = gen.next() self.assertIsNotNone(angle) if p0: delta = abs(angle - p0) self.assertLess(delta, 180) p0 = angle def test_change_of_sign_positive_to_negative(self): "The parallactic angle do not have to change from 180 to -180 degrees" site_info = {'latitude': radians(39.49)} posgen = PosGenerator() gen = posgen.parallactic(self.source, site_info, initial_sign=+1) azs = reversed([radians(i) for i in (40, 20, 0, 360, 340, 320)]) els = reversed([radians(i) for i in (81, 83, 85, 85, 83, 81)]) p0 = None for az, el in zip(azs, els): self.source.setAzimuth(az) self.source.setElevation(el) angle = gen.next() self.assertIsNotNone(angle) if p0: delta = abs(angle - p0) self.assertLess(delta, 180) p0 = angle def test_custom_auto_rewinding(self): self.cdbconf.setup('KKG') Loading Loading @@ -301,6 +336,22 @@ class PositionerStartUpdatingTest(unittest.TestCase): finally: self.p.stopUpdating() def test_cannotUpdateDuringRewind(self): """Cannot execute startUpdating() when a rewind is in progress.""" self.cdbconf.setup('KKG') self.cdbconf.setConfiguration('CUSTOM') latitude, az, el = [radians(50)] * 3 site_info = {'latitude': latitude} self.p.setup(site_info, self.source, self.device) try: self.p.control.isRewinding = True self.assertRaises(NotAllowedError, self.p.startUpdating, MNG_TRACK, ANT_NORTH, az, el, None, None) finally: self.p.stopUpdating() self.p.control.isRewinding = False if __name__ == '__main__': unittest.main() Loading
Common/Servers/PyDewarPositioner/src/DewarPositioner/DewarPositionerImpl.py +7 −0 Original line number Diff line number Diff line Loading @@ -180,6 +180,7 @@ class DewarPositionerImpl(POA, cc, services, lcycle): exc.setData('Reason', reason) raise exc.getComponentErrorsEx() def park(self): logger.logNotice('parking the derotator') try: Loading @@ -204,6 +205,11 @@ class DewarPositionerImpl(POA, cc, services, lcycle): logger.logNotice('derotator parked') def clearSource(self): logger.logNotice('cleaning the parallacting angle sign') self.positioner._clearSign() def getPosition(self): try: return self.positioner.getPosition() Loading Loading @@ -273,6 +279,7 @@ class DewarPositionerImpl(POA, cc, services, lcycle): exc.setData('Reason', ex.message) raise exc.getComponentErrorsEx() def getCmdPosition(self): try: return self.positioner.getCmdPosition() Loading
Common/Servers/PyDewarPositioner/src/DewarPositioner/posgenerator.py +20 −8 Original line number Diff line number Diff line Loading @@ -2,6 +2,7 @@ import datetime import time from math import sin, cos, tan, atan2, degrees import numpy from IRAPy import logger from Acspy.Common.TimeHelper import getTimeStamp Loading @@ -26,7 +27,7 @@ class PosGenerator(object): # TODO: refactoring required, in order to put all the parallactic and # galacticParallactic common code in one place def parallactic(self, source, siteInfo): def parallactic(self, source, siteInfo, initial_sign=None): """Return the parallactic angle""" try: latitude = siteInfo['latitude'] Loading @@ -43,7 +44,8 @@ class PosGenerator(object): t = getTimeStamp().value + 1*10*6 # 100 ms in the future coordinates = source.getApparentCoordinates(t) # Values in radians az, el = coordinates[:2] # The first two elements are (az, el) position = PosGenerator.getParallacticAngle(latitude, az, el) position = PosGenerator.getParallacticAngle( latitude, az, el, initial_sign) yield position last_zerodiv_time = datetime.datetime.now() except ZeroDivisionError: Loading @@ -64,7 +66,7 @@ class PosGenerator(object): logger.logNotice('%s: %s' %(raeson, ex.message)) raise PosGeneratorError(raeson) def galacticParallactic(self, source, siteInfo): def galacticParallactic(self, source, siteInfo, initial_sign=None): """Return the galactic parallactic angle""" try: latitude = siteInfo['latitude'] Loading @@ -81,7 +83,8 @@ class PosGenerator(object): t = getTimeStamp().value + 1*10*6 # 100 ms in the future coordinates = source.getApparentCoordinates(t) # Values in radians az, el, ra, dec = coordinates[:4] pg = PosGenerator.getGalacticParallacticAngle(latitude, az, el, ra, dec) pg = PosGenerator.getGalacticParallacticAngle( latitude, az, el, ra, dec, initial_sign) yield pg last_zerodiv_time = datetime.datetime.now() except ZeroDivisionError: Loading @@ -103,10 +106,19 @@ class PosGenerator(object): raise PosGeneratorError(raeson) @staticmethod def getParallacticAngle(latitude, az, el): def getParallacticAngle(latitude, az, el, initial_sign=None): """Arguments in radians""" p = atan2(-sin(az), tan(latitude)*cos(el) - sin(el)*cos(az)) return degrees(p) p = degrees(p) # Remember the sign of the first scan of the map sign_p = int(numpy.sign(p)) if initial_sign is None or (sign_p == initial_sign) or (sign_p == 0): angle = p elif initial_sign == -1: angle = initial_sign * (180 + 180%p) elif initial_sign == +1: angle = initial_sign * (180 + p%180) return angle @staticmethod def getGalacticAngle(ra, dec): Loading @@ -121,9 +133,9 @@ class PosGenerator(object): return degrees(g) @staticmethod def getGalacticParallacticAngle(latitude, az, el, ra, dec): def getGalacticParallacticAngle(latitude, az, el, ra, dec, initial_sign): """Arguments in radians""" p = PosGenerator.getParallacticAngle(latitude, az, el) p = PosGenerator.getParallacticAngle(latitude, az, el, initial_sign) g = PosGenerator.getGalacticAngle(ra, dec) return p + g Loading
Common/Servers/PyDewarPositioner/src/DewarPositioner/positioner.py +15 −4 Original line number Diff line number Diff line Loading @@ -54,6 +54,7 @@ class Positioner(object): self.device = device self.device.setup() self._clearOffset() self._clearSign() self.is_setup = True time.sleep(0.4) # Give the device the time to accomplish the setup self.control.updateScanInfo({'iStaticPos': setupPosition}) Loading Loading @@ -159,6 +160,8 @@ class Positioner(object): raise NotAllowedError('no source available') elif str(sector) not in sectors: raise NotAllowedError('sector %s not in %s' %(sector, sectors)) elif self.isRewinding(): raise NotAllowedError('the positioner is rewinding') else: if self.isUpdating(): self.stopUpdating() Loading @@ -183,19 +186,23 @@ class Positioner(object): else: posgen = getattr(self.posgen, functionName) angle_mapping = self.posgen.mapping[functionName] getAngleFunction = self.posgen.mapping[functionName]['getAngleFunction'] coordinateFrame = self.posgen.mapping[functionName]['coordinateFrame'] getAngleFunction = angle_mapping['getAngleFunction'] coordinateFrame = angle_mapping['coordinateFrame'] lat = self.siteInfo['latitude'] try: if coordinateFrame == 'horizontal': iParallacticPos = getAngleFunction(lat, az, el) iParallacticPos = getAngleFunction(lat, az, el, self.sign) elif coordinateFrame == 'equatorial': iParallacticPos = getAngleFunction(lat, az, el, ra, dec) iParallacticPos = getAngleFunction(lat, az, el, ra, dec, self.sign) else: raise PositionerError('coordinate frame %s unknown' %coordinateFrame) except ZeroDivisionError: raise NotAllowedError('zero division error computing p(%.2f, %.2f)' %(az, el)) # Remember the sign of the first scan of the map if self.sign is not None: self.sign = int(numpy.sign(iParallacticPos)) self.control.setScanInfo( axis=axis, sector=sector, Loading Loading @@ -670,12 +677,16 @@ class Positioner(object): finally: Positioner.generalLock.release() def _clearSign(self): self.sign = None def _setDefault(self): self.t = None self.is_setup = False self.control = Control() self.conf.clearConfiguration() self._clearOffset() self._clearSign() class Control(object): Loading
Common/Servers/PyDewarPositioner/test/pyunit/positioner/test_offset.py +3 −3 Original line number Diff line number Diff line Loading @@ -29,7 +29,7 @@ class PositionerOffsetTest(unittest.TestCase): self.p.park() time.sleep(0.2) def _test_set_get(self): def test_set_get(self): """Verify the set and get methods""" # Not allowed when the system is not yet configured self.assertRaises(NotAllowedError, self.p.setOffset, 2) Loading @@ -40,7 +40,7 @@ class PositionerOffsetTest(unittest.TestCase): self.p.clearOffset() self.assertEqual(self.p.getOffset(), 0) def _test_set_new_pos(self): def test_set_new_pos(self): """Vefify the setOffset set a new position.""" self.p.setup(siteInfo={}, source=None, device=self.device) time.sleep(0.3) if self.using_mock else time.sleep(3) Loading Loading @@ -81,7 +81,7 @@ class PositionerOffsetTest(unittest.TestCase): self.p.startUpdating('MNG_TRACK', 'ANT_NORTH', az, el, None, None) time.sleep(0.2) if self.using_mock else time.sleep(3) self.p.setOffset(offset) time.sleep(0.2) if self.using_mock else time.sleep(3) time.sleep(0.2) if self.using_mock else time.sleep(5) self.assertEqual(self.device.getActPosition(), expected) Loading
Common/Servers/PyDewarPositioner/test/pyunit/positioner/test_startUpdating.py +51 −0 Original line number Diff line number Diff line Loading @@ -135,6 +135,41 @@ class PositionerStartUpdatingTest(unittest.TestCase): finally: self.p.stopUpdating() def test_change_of_sign_negative_to_positive(self): "The parallactic angle do not have to change from -180 to 180 degrees" site_info = {'latitude': radians(39.49)} posgen = PosGenerator() gen = posgen.parallactic(self.source, site_info, initial_sign=-1) azs = [radians(i) for i in (40, 20, 0, 360, 340, 320)] els = [radians(i) for i in (81, 83, 85, 85, 83, 81)] p0 = None for az, el in zip(azs, els): self.source.setAzimuth(az) self.source.setElevation(el) angle = gen.next() self.assertIsNotNone(angle) if p0: delta = abs(angle - p0) self.assertLess(delta, 180) p0 = angle def test_change_of_sign_positive_to_negative(self): "The parallactic angle do not have to change from 180 to -180 degrees" site_info = {'latitude': radians(39.49)} posgen = PosGenerator() gen = posgen.parallactic(self.source, site_info, initial_sign=+1) azs = reversed([radians(i) for i in (40, 20, 0, 360, 340, 320)]) els = reversed([radians(i) for i in (81, 83, 85, 85, 83, 81)]) p0 = None for az, el in zip(azs, els): self.source.setAzimuth(az) self.source.setElevation(el) angle = gen.next() self.assertIsNotNone(angle) if p0: delta = abs(angle - p0) self.assertLess(delta, 180) p0 = angle def test_custom_auto_rewinding(self): self.cdbconf.setup('KKG') Loading Loading @@ -301,6 +336,22 @@ class PositionerStartUpdatingTest(unittest.TestCase): finally: self.p.stopUpdating() def test_cannotUpdateDuringRewind(self): """Cannot execute startUpdating() when a rewind is in progress.""" self.cdbconf.setup('KKG') self.cdbconf.setConfiguration('CUSTOM') latitude, az, el = [radians(50)] * 3 site_info = {'latitude': latitude} self.p.setup(site_info, self.source, self.device) try: self.p.control.isRewinding = True self.assertRaises(NotAllowedError, self.p.startUpdating, MNG_TRACK, ANT_NORTH, az, el, None, None) finally: self.p.stopUpdating() self.p.control.isRewinding = False if __name__ == '__main__': unittest.main()