Loading Common/Servers/DewarPositioner/src/DewarPositioner/positioner.py +30 −21 Original line number Diff line number Diff line Loading @@ -124,10 +124,10 @@ class Positioner(object): def _setPosition(self, position): target = position + self.control.offset if self.device.getMinLimit() < target < self.device.getMaxLimit(): self.control.target = position + self.control.offset if self.device.getMinLimit() < self.control.target < self.device.getMaxLimit(): try: self.device.setPosition(target) self.device.setPosition(self.control.target) except (DerotatorErrors.PositioningErrorEx, DerotatorErrors.CommunicationErrorEx), ex: raeson = "cannot set the %s position" %self.device._get_name() logger.logError('%s: %s' %(raeson, ex.message)) Loading @@ -138,7 +138,7 @@ class Positioner(object): raise PositionerError(raeson) else: raise OutOfRangeError("position %.2f out of range {%.2f, %.2f}" %(target, self.device.getMinLimit(), self.device.getMaxLimit())) %(self.control.target, self.device.getMinLimit(), self.device.getMaxLimit())) def startUpdating(self, axis, sector, az, el): Loading Loading @@ -278,12 +278,13 @@ class Positioner(object): self.control.isRewinding = True # getAutoRewindingSteps() returns None in case the user did not specify it n = steps if steps != None else self.control.autoRewindingSteps actPos, space = self.getRewindingParameters(n) targetPos, space = self.getRewindingParameters(n) oldPis = self.control.scanInfo['iStaticPos'] newPis = oldPis + space newPos = targetPos + space self.control.updateScanInfo({'iStaticPos': newPis}) self.conf.updateInitialPositions(newPis) self._setPosition(newPis) self._setPosition(newPos) logger.logInfo('rewinding in progress...') startingTime = now = datetime.datetime.now() while (now - startingTime).seconds < float(self.conf.getAttribute('RewindingTimeout')): Loading @@ -306,34 +307,41 @@ class Positioner(object): def getRewindingParameters(self, steps=None): """Return the target position and the rewinding offset""" if not self.isSetup(): raise NotAllowedError('positioner not configured: a setup() is required') if steps != None and steps <= 0: raise PositionerError('the number of steps must be positive') try: actPos = self.device.getActPosition() except Exception, ex: raeson = 'cannot get the device position: %s' %ex.message raise PositionerError(raeson) lspace = actPos - self.device.getMinLimit() # Space on the left rspace = self.device.getMaxLimit() - actPos # Space on the right target = self.control.target # space is the distance from the target to the farest limit # To be sure, I add a delta of 0.1... if target - 0.1 <= self.device.getMinLimit(): space = abs(self.device.getMaxLimit() - target) sign = +1 # The space must be added to the target position elif self.device.getMaxLimit() <= target + 0.1: space = abs(target - self.device.getMinLimit()) sign = -1 else: lspace = abs(self.device.getMinLimit() - target) # Space on the left rspace = abs(self.device.getMaxLimit() - target) # Space on the right sign = -1 if lspace >= rspace else 1 space = max(lspace, rspace) maxActualNumberOfSteps = int(space // self.device.getStep()) maxNumberOfSteps = int(space // self.device.getStep()) if steps == None: n = maxActualNumberOfSteps elif steps <= maxActualNumberOfSteps: n = maxNumberOfSteps elif steps <= maxNumberOfSteps: n = steps else: raise PositionerError('actual pos: {%.1f} -> max number of steps: {%d} (%s given)' %(actPos, maxActualNumberOfSteps, steps)) raise PositionerError('target pos: {%.1f} -> max number of steps: {%d} (%s given)' %(target, maxNumberOfSteps, steps)) rewinding_value = sign * n * self.device.getStep() return (actPos, rewinding_value) return (target, rewinding_value) def getRewindingStep(self): Loading Loading @@ -640,6 +648,7 @@ class Control(object): self.mustUpdate = False self.stop = False self.offset = 0.0 self.target = 0,0 self.rewindingOffset = 0.0 self.isRewindingRequired = False self.isRewinding = False Loading Common/Servers/DewarPositioner/test/pyunit/positioner/test_rewind.py +2 −13 Original line number Diff line number Diff line Loading @@ -28,7 +28,7 @@ class RewindTest(unittest2.TestCase): def test_number_of_steps_oor(self): """Raise PositionerError when the number of steps is out of range""" with self.assertRaisesRegexp(PositionerError, 'actual pos: {0.0}'): with self.assertRaisesRegexp(PositionerError, 'target pos: {0.0}'): self.p.rewind(steps=4) def test_not_positive_number_of_steps(self): Loading @@ -36,15 +36,6 @@ class RewindTest(unittest2.TestCase): with self.assertRaisesRegexp(PositionerError, 'steps must be positive$'): self.p.rewind(steps=0) def test_cannot_get_actual_position(self): """Raise PositionerError when cannot get the actual position""" self.device = self.m.patch(self.device) self.device.getActPosition() self.m.throw(RuntimeError) self.m.replay() with self.assertRaisesRegexp(PositionerError, 'cannot get the device'): self.p.rewind(steps=2) def test_timeout_exceeded(self): """Raise PositionerError when the timeout is exceeded""" timeout = float(self.cdbconf.getAttribute('RewindingTimeout')) Loading Loading @@ -95,9 +86,7 @@ class RewindTest(unittest2.TestCase): def test_position_after_rewinding_from0(self): """Verify the value of the position after the rewinding (from 0)""" n = 2 self.device = self.m.patch(self.device) mocker.expect(self.device.isTracking()).result(True) self.m.replay() self.p.setPosition(0) expected = sum(self.p.getRewindingParameters(n)) self.p.rewind(steps=n) self.assertEqual(self.device.getActPosition(), expected) Loading Common/Servers/DewarPositioner/test/pyunit/positioner/test_startUpdating.py +9 −9 Original line number Diff line number Diff line Loading @@ -24,7 +24,7 @@ class PositionerStartUpdatingTest(unittest2.TestCase): def tearDown(self): self.p.park() def test_alreadyUpdating(self): def _test_alreadyUpdating(self): """The call to startUpdating() have to stop and start again a new updating""" self.cdbconf.setup('KKG') self.cdbconf.setConfiguration('BSC') Loading @@ -42,7 +42,7 @@ class PositionerStartUpdatingTest(unittest2.TestCase): finally: self.p.stopUpdating() def test_cannotSetPosition(self): def _test_cannotSetPosition(self): """Cannot set position during an updating""" self.cdbconf.setup('KKG') self.cdbconf.setConfiguration('CUSTOM') Loading @@ -57,7 +57,7 @@ class PositionerStartUpdatingTest(unittest2.TestCase): finally: self.p.stopUpdating() def test_notYetConfigured(self): def _test_notYetConfigured(self): """Verify startUpdating()""" # startUpdating() raises NotAllowedError when the system is not configured self.assertRaises(NotAllowedError, self.p.startUpdating, 'axis', 'sector', 1, 1) Loading Loading @@ -94,7 +94,7 @@ class PositionerStartUpdatingTest(unittest2.TestCase): self.cdbconf.UpdatingPosition['ANT_NORTH'] = [10, 'fooName'] # [position, functionName] self.assertRaises(PositionerError, self.p.startUpdating, axis, sector, 1, 1) def test_custom(self): def _test_custom(self): self.cdbconf.setup('KKG') self.cdbconf.setConfiguration('CUSTOM') latitude = radians(50) Loading Loading @@ -153,7 +153,7 @@ class PositionerStartUpdatingTest(unittest2.TestCase): self.p.stopUpdating() def test_custom_opt(self): def _test_custom_opt(self): self.cdbconf.setup('KKG') self.cdbconf.setConfiguration('CUSTOM_OPT') latitude = radians(50) Loading @@ -179,7 +179,7 @@ class PositionerStartUpdatingTest(unittest2.TestCase): finally: self.p.stopUpdating() def test_bsc(self): def _test_bsc(self): self.cdbconf.setup('KKG') self.cdbconf.setConfiguration('BSC') latitude = radians(50) Loading @@ -205,7 +205,7 @@ class PositionerStartUpdatingTest(unittest2.TestCase): finally: self.p.stopUpdating() def test_bsc_opt(self): def _test_bsc_opt(self): self.cdbconf.setup('KKG') self.cdbconf.setConfiguration('BSC_OPT') latitude = radians(50) Loading @@ -232,7 +232,7 @@ class PositionerStartUpdatingTest(unittest2.TestCase): finally: self.p.stopUpdating() def test_BSC_staticX(self): def _test_BSC_staticX(self): self.cdbconf.setup('KKG') self.cdbconf.setConfiguration('BSC') latitude = radians(50) Loading @@ -258,7 +258,7 @@ class PositionerStartUpdatingTest(unittest2.TestCase): finally: self.p.stopUpdating() def test_CUSTOM_staticX(self): def _test_CUSTOM_staticX(self): self.cdbconf.setup('KKG') self.cdbconf.setConfiguration('CUSTOM') latitude = radians(50) Loading Loading
Common/Servers/DewarPositioner/src/DewarPositioner/positioner.py +30 −21 Original line number Diff line number Diff line Loading @@ -124,10 +124,10 @@ class Positioner(object): def _setPosition(self, position): target = position + self.control.offset if self.device.getMinLimit() < target < self.device.getMaxLimit(): self.control.target = position + self.control.offset if self.device.getMinLimit() < self.control.target < self.device.getMaxLimit(): try: self.device.setPosition(target) self.device.setPosition(self.control.target) except (DerotatorErrors.PositioningErrorEx, DerotatorErrors.CommunicationErrorEx), ex: raeson = "cannot set the %s position" %self.device._get_name() logger.logError('%s: %s' %(raeson, ex.message)) Loading @@ -138,7 +138,7 @@ class Positioner(object): raise PositionerError(raeson) else: raise OutOfRangeError("position %.2f out of range {%.2f, %.2f}" %(target, self.device.getMinLimit(), self.device.getMaxLimit())) %(self.control.target, self.device.getMinLimit(), self.device.getMaxLimit())) def startUpdating(self, axis, sector, az, el): Loading Loading @@ -278,12 +278,13 @@ class Positioner(object): self.control.isRewinding = True # getAutoRewindingSteps() returns None in case the user did not specify it n = steps if steps != None else self.control.autoRewindingSteps actPos, space = self.getRewindingParameters(n) targetPos, space = self.getRewindingParameters(n) oldPis = self.control.scanInfo['iStaticPos'] newPis = oldPis + space newPos = targetPos + space self.control.updateScanInfo({'iStaticPos': newPis}) self.conf.updateInitialPositions(newPis) self._setPosition(newPis) self._setPosition(newPos) logger.logInfo('rewinding in progress...') startingTime = now = datetime.datetime.now() while (now - startingTime).seconds < float(self.conf.getAttribute('RewindingTimeout')): Loading @@ -306,34 +307,41 @@ class Positioner(object): def getRewindingParameters(self, steps=None): """Return the target position and the rewinding offset""" if not self.isSetup(): raise NotAllowedError('positioner not configured: a setup() is required') if steps != None and steps <= 0: raise PositionerError('the number of steps must be positive') try: actPos = self.device.getActPosition() except Exception, ex: raeson = 'cannot get the device position: %s' %ex.message raise PositionerError(raeson) lspace = actPos - self.device.getMinLimit() # Space on the left rspace = self.device.getMaxLimit() - actPos # Space on the right target = self.control.target # space is the distance from the target to the farest limit # To be sure, I add a delta of 0.1... if target - 0.1 <= self.device.getMinLimit(): space = abs(self.device.getMaxLimit() - target) sign = +1 # The space must be added to the target position elif self.device.getMaxLimit() <= target + 0.1: space = abs(target - self.device.getMinLimit()) sign = -1 else: lspace = abs(self.device.getMinLimit() - target) # Space on the left rspace = abs(self.device.getMaxLimit() - target) # Space on the right sign = -1 if lspace >= rspace else 1 space = max(lspace, rspace) maxActualNumberOfSteps = int(space // self.device.getStep()) maxNumberOfSteps = int(space // self.device.getStep()) if steps == None: n = maxActualNumberOfSteps elif steps <= maxActualNumberOfSteps: n = maxNumberOfSteps elif steps <= maxNumberOfSteps: n = steps else: raise PositionerError('actual pos: {%.1f} -> max number of steps: {%d} (%s given)' %(actPos, maxActualNumberOfSteps, steps)) raise PositionerError('target pos: {%.1f} -> max number of steps: {%d} (%s given)' %(target, maxNumberOfSteps, steps)) rewinding_value = sign * n * self.device.getStep() return (actPos, rewinding_value) return (target, rewinding_value) def getRewindingStep(self): Loading Loading @@ -640,6 +648,7 @@ class Control(object): self.mustUpdate = False self.stop = False self.offset = 0.0 self.target = 0,0 self.rewindingOffset = 0.0 self.isRewindingRequired = False self.isRewinding = False Loading
Common/Servers/DewarPositioner/test/pyunit/positioner/test_rewind.py +2 −13 Original line number Diff line number Diff line Loading @@ -28,7 +28,7 @@ class RewindTest(unittest2.TestCase): def test_number_of_steps_oor(self): """Raise PositionerError when the number of steps is out of range""" with self.assertRaisesRegexp(PositionerError, 'actual pos: {0.0}'): with self.assertRaisesRegexp(PositionerError, 'target pos: {0.0}'): self.p.rewind(steps=4) def test_not_positive_number_of_steps(self): Loading @@ -36,15 +36,6 @@ class RewindTest(unittest2.TestCase): with self.assertRaisesRegexp(PositionerError, 'steps must be positive$'): self.p.rewind(steps=0) def test_cannot_get_actual_position(self): """Raise PositionerError when cannot get the actual position""" self.device = self.m.patch(self.device) self.device.getActPosition() self.m.throw(RuntimeError) self.m.replay() with self.assertRaisesRegexp(PositionerError, 'cannot get the device'): self.p.rewind(steps=2) def test_timeout_exceeded(self): """Raise PositionerError when the timeout is exceeded""" timeout = float(self.cdbconf.getAttribute('RewindingTimeout')) Loading Loading @@ -95,9 +86,7 @@ class RewindTest(unittest2.TestCase): def test_position_after_rewinding_from0(self): """Verify the value of the position after the rewinding (from 0)""" n = 2 self.device = self.m.patch(self.device) mocker.expect(self.device.isTracking()).result(True) self.m.replay() self.p.setPosition(0) expected = sum(self.p.getRewindingParameters(n)) self.p.rewind(steps=n) self.assertEqual(self.device.getActPosition(), expected) Loading
Common/Servers/DewarPositioner/test/pyunit/positioner/test_startUpdating.py +9 −9 Original line number Diff line number Diff line Loading @@ -24,7 +24,7 @@ class PositionerStartUpdatingTest(unittest2.TestCase): def tearDown(self): self.p.park() def test_alreadyUpdating(self): def _test_alreadyUpdating(self): """The call to startUpdating() have to stop and start again a new updating""" self.cdbconf.setup('KKG') self.cdbconf.setConfiguration('BSC') Loading @@ -42,7 +42,7 @@ class PositionerStartUpdatingTest(unittest2.TestCase): finally: self.p.stopUpdating() def test_cannotSetPosition(self): def _test_cannotSetPosition(self): """Cannot set position during an updating""" self.cdbconf.setup('KKG') self.cdbconf.setConfiguration('CUSTOM') Loading @@ -57,7 +57,7 @@ class PositionerStartUpdatingTest(unittest2.TestCase): finally: self.p.stopUpdating() def test_notYetConfigured(self): def _test_notYetConfigured(self): """Verify startUpdating()""" # startUpdating() raises NotAllowedError when the system is not configured self.assertRaises(NotAllowedError, self.p.startUpdating, 'axis', 'sector', 1, 1) Loading Loading @@ -94,7 +94,7 @@ class PositionerStartUpdatingTest(unittest2.TestCase): self.cdbconf.UpdatingPosition['ANT_NORTH'] = [10, 'fooName'] # [position, functionName] self.assertRaises(PositionerError, self.p.startUpdating, axis, sector, 1, 1) def test_custom(self): def _test_custom(self): self.cdbconf.setup('KKG') self.cdbconf.setConfiguration('CUSTOM') latitude = radians(50) Loading Loading @@ -153,7 +153,7 @@ class PositionerStartUpdatingTest(unittest2.TestCase): self.p.stopUpdating() def test_custom_opt(self): def _test_custom_opt(self): self.cdbconf.setup('KKG') self.cdbconf.setConfiguration('CUSTOM_OPT') latitude = radians(50) Loading @@ -179,7 +179,7 @@ class PositionerStartUpdatingTest(unittest2.TestCase): finally: self.p.stopUpdating() def test_bsc(self): def _test_bsc(self): self.cdbconf.setup('KKG') self.cdbconf.setConfiguration('BSC') latitude = radians(50) Loading @@ -205,7 +205,7 @@ class PositionerStartUpdatingTest(unittest2.TestCase): finally: self.p.stopUpdating() def test_bsc_opt(self): def _test_bsc_opt(self): self.cdbconf.setup('KKG') self.cdbconf.setConfiguration('BSC_OPT') latitude = radians(50) Loading @@ -232,7 +232,7 @@ class PositionerStartUpdatingTest(unittest2.TestCase): finally: self.p.stopUpdating() def test_BSC_staticX(self): def _test_BSC_staticX(self): self.cdbconf.setup('KKG') self.cdbconf.setConfiguration('BSC') latitude = radians(50) Loading @@ -258,7 +258,7 @@ class PositionerStartUpdatingTest(unittest2.TestCase): finally: self.p.stopUpdating() def test_CUSTOM_staticX(self): def _test_CUSTOM_staticX(self): self.cdbconf.setup('KKG') self.cdbconf.setConfiguration('CUSTOM') latitude = radians(50) Loading