Loading autocnet/graph/edge.py +15 −21 Original line number Diff line number Diff line Loading @@ -69,7 +69,9 @@ class Edge(dict, MutableMapping): # state, dynamically draw the mask from the object. for c in self._masks.columns: if c in mask_lookup: self._masks[c] = getattr(self, mask_lookup[c]).mask truncated_mask = getattr(self, mask_lookup[c]).mask self._masks[c] = False self._masks[c].iloc[truncated_mask.index] = truncated_mask return self._masks @masks.setter Loading Loading @@ -99,7 +101,9 @@ class Edge(dict, MutableMapping): self.matches = matches else: df = self.matches self.matches = df.append(matches, ignore_index=True) self.matches = df.append(matches, ignore_index=True, verify_integrity=True) def symmetry_check(self): if hasattr(self, 'matches'): Loading @@ -123,7 +127,7 @@ class Edge(dict, MutableMapping): else: raise AttributeError('No matches have been computed for this edge.') def compute_fundamental_matrix(self, clean_keys=[], method='linear', **kwargs): def compute_fundamental_matrix(self, clean_keys=[], **kwargs): """ Estimate the fundamental matrix (F) using the correspondences tagged to this edge. Loading Loading @@ -153,27 +157,16 @@ class Edge(dict, MutableMapping): d_keypoints = self.destination.get_keypoint_coordinates(index=matches['destination_idx'], homogeneous=True) transformation_matrix, fundam_mask = od.compute_fundamental_matrix(s_keypoints.values, d_keypoints.values, **kwargs) try: fundam_mask = fundam_mask.ravel() except: return # Replace the index with the matches index. s_keypoints.index = matches.index d_keypoints.index = matches.index self.fundamental_matrix = FundamentalMatrix(np.zeros((3,3)), index=matches.index) self.fundamental_matrix.compute(s_keypoints, d_keypoints, **kwargs) # Convert the truncated RANSAC mask back into a full length mask mask[mask] = fundam_mask # Pass in the truncated mask to the fundamental matrix. These are # only those points that have pased previous outlier checks self.fundamental_matrix = FundamentalMatrix(transformation_matrix, s_keypoints, d_keypoints, mask=mask, local_mask=fundam_mask) if method != 'linear': self.fundamental_matrix.refine_with_mle(method=method) mask[mask] = self.fundamental_matrix.mask # Subscribe the health watcher to the fundamental matrix observable self.fundamental_matrix.subscribe(self._health.update) Loading @@ -182,6 +175,7 @@ class Edge(dict, MutableMapping): # Set the initial state of the fundamental mask in the masks self.masks = ('fundamental', mask) def add_putative_matches(self): if not hasattr(self, 'fundamental_matrix'): raise(ValueError, 'Fundamental matric has not been computed') Loading autocnet/graph/node.py +3 −3 Original line number Diff line number Diff line Loading @@ -144,9 +144,9 @@ class Node(dict, MutableMapping): """ if hasattr(self, '_keypoints'): try: return self._keypoints.iloc[index] except: if index is not None: return self._keypoints.loc[index] else: return self._keypoints else: return None Loading autocnet/matcher/outlier_detector.py +10 −4 Original line number Diff line number Diff line Loading @@ -53,7 +53,7 @@ class DistanceRatio(Observable): def nvalid(self): return self.mask.sum() def compute(self, ratio=0.95, mask=None, mask_name=None, single=False): def compute(self, ratio=0.8, mask=None, mask_name=None, single=False): """ Compute and return a mask for a matches dataframe using Lowe's ratio test. If keypoints have a single Loading Loading @@ -87,10 +87,14 @@ class DistanceRatio(Observable): self.single = single if mask is not None: self.mask = mask.copy() new_mask = self.matches[mask].groupby('source_idx')['distance'].transform(func).astype('bool') self.mask[mask == True] = new_mask mask_s = self.matches[mask].groupby('source_idx')['distance'].transform(func).astype('bool') mask_d = self.matches[mask].groupby('destination_idx')['distance'].transform(func).astype('bool') self.mask[mask] = mask_s & mask_d else: self.mask = self.matches.groupby('source_idx')['distance'].transform(func).astype('bool') mask_s = self.matches.groupby('source_idx')['distance'].transform(func).astype('bool') mask_d = self.matches.groupby('destination_idx')['distance'].transformat(func).astype('bool') self.mask = (mask_s & mask_d) state_package = {'ratio': ratio, 'mask': self.mask.copy(), Loading Loading @@ -354,6 +358,8 @@ def compute_fundamental_matrix(kp1, kp2, method='ransac', reproj_threshold=5.0, method_ = cv2.FM_LMEDS elif method == 'normal': method_ = cv2.FM_7POINT elif method == '8point': method_ = cv2.FM_8POINT else: raise ValueError("Unknown outlier detection method. Choices are: 'ransac', 'lmeds', or 'normal'.") Loading autocnet/transformation/transformations.py +150 −35 Original line number Diff line number Diff line import abc import math from collections import deque import warnings import cv2 import numpy as np import pandas as pd import pysal as ps Loading @@ -20,24 +22,14 @@ class TransformationMatrix(np.ndarray): __metaclass__ = abc.ABCMeta @abc.abstractmethod def __new__(cls, inputarr, x1, x2, mask, local_mask=None): if not isinstance(inputarr, np.ndarray): raise TypeError('The transformation must be an ndarray') obj = np.asarray(inputarr).view(cls) obj.x1 = x1 obj.x2 = x2 obj.mask = mask obj.local_mask = local_mask def __new__(cls, ndarray, index): obj = np.asarray(ndarray).view(cls) obj.index = index obj._action_stack = deque(maxlen=10) obj._current_action_stack = 0 obj._observers = set() # Seed the state package with the initial creation state if mask is not None: state_package = {'arr': obj.copy(), 'mask': obj.mask.copy()} else: state_package = {'arr': obj.copy(), 'mask': None} obj._action_stack.append(state_package) Loading @@ -48,21 +40,29 @@ class TransformationMatrix(np.ndarray): def __array_finalize__(self, obj): if obj is None: return self.x1 = getattr(obj, 'x1', None) self.x2 = getattr(obj, 'x2', None) self.mask = getattr(obj, 'mask', None) self._action_stack = getattr(obj, '_action_stack', None) self._current_action_stack = getattr(obj, '_current_action_stack', None) self._observers = getattr(obj, '_observers', None) self.x1 = getattr(obj, 'x1', None) self.x2 = getattr(obj, 'x2', None) self.index = getattr(obj, 'index', None) self.mask = getattr(obj, 'mask', None) @abc.abstractproperty def determinant(self): if not getattr(self, '_determinant', None): self._determinant = np.linalg.det(self) return self._determinant return np.linalg.det(self) @abc.abstractproperty def rank(self): return np.linalg.matrix_rank(self) @abc.abstractproperty def condition(self): """ The condition is a measure of the numerical stability of the solution to a set of linear equations. """ if not getattr(self, '_condition', None): s = np.linalg.svd(self, compute_uv=False) self._condition = s[0] / s[1] Loading Loading @@ -166,6 +166,16 @@ class FundamentalMatrix(TransformationMatrix): Attributes ---------- x1 : ndarray (n,2) array of point correspondences used to compute F x2 : ndarray (n,2) array of point correspondences used to compute F mask : ndarray (n, 2) boolean array indicating whether a correspondence is considered an inliner determinant : float The determinant of the matrix Loading @@ -177,17 +187,6 @@ class FundamentalMatrix(TransformationMatrix): compute this homography """ @property def rank(self): """ A valid fundamental matrix should be rank 2. Hartley & Zisserman p. 280 """ rank = np.linalg.matrix_rank(self) if rank != 2: warnings.warn('F rank not equal to 2. This indicates a poor F matrix.') return rank def refine_with_mle(self, **kwargs): """ Given a linear approximation of F, refine using Maximum Liklihood estimation Loading Loading @@ -305,8 +304,8 @@ class FundamentalMatrix(TransformationMatrix): -------- compute_error : The method called to compute element-wise error. """ x = self.x1.loc[self.mask] x1 = self.x2.loc[self.mask] x = self.x1[self.mask] x1 = self.x2[self.mask] return self.compute_error(self.x1, self.x2) def compute_error(self, x, x1): Loading Loading @@ -345,6 +344,122 @@ class FundamentalMatrix(TransformationMatrix): return F_error def _normalize(self, a): """ Normalize a set of coordinates such that the origin is translated to the center and then scaled isotropically such that the average distance from the origin is $\sqrt{2}$. Parameters ---------- a : DataFrame (n,3) of homogeneous coordinates Returns ------- normalized : ndarray (3,3) tranformation matrix """ # Compute the normalization matrix centroid = a[['x', 'y']].mean() dist = np.sqrt(np.sum(((a[['x', 'y']] - centroid)**2).values, axis=1)) mean_dist = np.mean(dist) sq2 = math.sqrt(2) normalizer = np.array([[sq2 / mean_dist, 0, -sq2 / mean_dist * centroid[0]], [0, sq2 / mean_dist, -sq2 / mean_dist * centroid[1]], [0, 0, 1]]) return normalizer def compute(self, kp1, kp2, method='ransac', reproj_threshold=2.0, confidence=0.99): """ Given two arrays of keypoints compute the fundamental matrix Parameters ---------- kp1 : ndarray (n, 2) of coordinates from the source image kp2 : ndarray (n, 2) of coordinates from the destination image method : {'ransac', 'lmeds', 'normal', '8point'} The openCV algorithm to use for outlier detection reproj_threshold : float The maximum distances in pixels a reprojected points can be from the epipolar line to be considered an inlier confidence : float [0, 1] that the estimated matrix is correct Notes ----- While the method is user definable, if the number of input points is < 7, normal outlier detection is automatically used, if 7 > n > 15, least medians is used, and if 7 > 15, ransac can be used. """ if method == 'ransac': method_ = cv2.FM_RANSAC elif method == 'lmeds': method_ = cv2.FM_LMEDS elif method == 'normal': method_ = cv2.FM_7POINT elif method == '8point': method_ = cv2.FM_8POINT else: raise ValueError("Unknown outlier detection method. Choices are: 'ransac', 'lmeds', '8point', or 'normal'.") #x1_norm = self._normalize(kp1) #x2_norm = self._normalize(kp2) #kp1_norm = kp1.values.dot(x1_norm) #kp2_norm = kp2.values.dot(x2_norm) F, mask = cv2.findFundamentalMat(kp1.values, kp2.values, method_, param1=reproj_threshold, param2=confidence) try: mask = mask.astype(bool).ravel() # Enforce dimensionality except: pass # pragma: no cover # Ensure that the singularity constraint is met self._enforce_singularity_constraint() # Set instance variables to inputs self.x1 = kp1 self.x2 = kp2 self.mask = pd.Series(mask, index=self.index) # Denormalize F_Prime to F and set #f = x2_norm.T.dot(transformation_matrix).dot(x1_norm) self[:] = F def _enforce_singularity_constraint(self): """ The fundamental matrix should be rank 2. In instances when it is not, the singularity constraint should be enforced. This is forces epipolar lines to be conincident. References ---------- .. [Hartley2003] """ if self.rank != 2: u, d, vt = np.linalg.svd(self) f1 = u.dot(np.diag([d[0], d[1], 0])).dot(vt) self[:] = f1 def recompute_matrix(self): raise NotImplementedError Loading autocnet/utils/tests/test_utils.py +11 −0 Original line number Diff line number Diff line import unittest import numpy as np import pandas as pd from .. import utils Loading Loading @@ -116,3 +118,12 @@ class TestUtils(unittest.TestCase): y = utils.normalize_vector(x) truth = np.tile(np.array([ 0.70710678, 0.70710678, 0.70710678]), 4).reshape(4,3) np.testing.assert_array_almost_equal(truth, y) def test_slope(self): x1 = pd.DataFrame({'x': np.arange(1, 11), 'y': np.arange(1, 11)}) x2 = pd.DataFrame({'x': np.arange(6, 16), 'y': np.arange(11, 21)}) slope = utils.calculate_slope(x1, x2) self.assertEqual(slope[0], 2) No newline at end of file Loading
autocnet/graph/edge.py +15 −21 Original line number Diff line number Diff line Loading @@ -69,7 +69,9 @@ class Edge(dict, MutableMapping): # state, dynamically draw the mask from the object. for c in self._masks.columns: if c in mask_lookup: self._masks[c] = getattr(self, mask_lookup[c]).mask truncated_mask = getattr(self, mask_lookup[c]).mask self._masks[c] = False self._masks[c].iloc[truncated_mask.index] = truncated_mask return self._masks @masks.setter Loading Loading @@ -99,7 +101,9 @@ class Edge(dict, MutableMapping): self.matches = matches else: df = self.matches self.matches = df.append(matches, ignore_index=True) self.matches = df.append(matches, ignore_index=True, verify_integrity=True) def symmetry_check(self): if hasattr(self, 'matches'): Loading @@ -123,7 +127,7 @@ class Edge(dict, MutableMapping): else: raise AttributeError('No matches have been computed for this edge.') def compute_fundamental_matrix(self, clean_keys=[], method='linear', **kwargs): def compute_fundamental_matrix(self, clean_keys=[], **kwargs): """ Estimate the fundamental matrix (F) using the correspondences tagged to this edge. Loading Loading @@ -153,27 +157,16 @@ class Edge(dict, MutableMapping): d_keypoints = self.destination.get_keypoint_coordinates(index=matches['destination_idx'], homogeneous=True) transformation_matrix, fundam_mask = od.compute_fundamental_matrix(s_keypoints.values, d_keypoints.values, **kwargs) try: fundam_mask = fundam_mask.ravel() except: return # Replace the index with the matches index. s_keypoints.index = matches.index d_keypoints.index = matches.index self.fundamental_matrix = FundamentalMatrix(np.zeros((3,3)), index=matches.index) self.fundamental_matrix.compute(s_keypoints, d_keypoints, **kwargs) # Convert the truncated RANSAC mask back into a full length mask mask[mask] = fundam_mask # Pass in the truncated mask to the fundamental matrix. These are # only those points that have pased previous outlier checks self.fundamental_matrix = FundamentalMatrix(transformation_matrix, s_keypoints, d_keypoints, mask=mask, local_mask=fundam_mask) if method != 'linear': self.fundamental_matrix.refine_with_mle(method=method) mask[mask] = self.fundamental_matrix.mask # Subscribe the health watcher to the fundamental matrix observable self.fundamental_matrix.subscribe(self._health.update) Loading @@ -182,6 +175,7 @@ class Edge(dict, MutableMapping): # Set the initial state of the fundamental mask in the masks self.masks = ('fundamental', mask) def add_putative_matches(self): if not hasattr(self, 'fundamental_matrix'): raise(ValueError, 'Fundamental matric has not been computed') Loading
autocnet/graph/node.py +3 −3 Original line number Diff line number Diff line Loading @@ -144,9 +144,9 @@ class Node(dict, MutableMapping): """ if hasattr(self, '_keypoints'): try: return self._keypoints.iloc[index] except: if index is not None: return self._keypoints.loc[index] else: return self._keypoints else: return None Loading
autocnet/matcher/outlier_detector.py +10 −4 Original line number Diff line number Diff line Loading @@ -53,7 +53,7 @@ class DistanceRatio(Observable): def nvalid(self): return self.mask.sum() def compute(self, ratio=0.95, mask=None, mask_name=None, single=False): def compute(self, ratio=0.8, mask=None, mask_name=None, single=False): """ Compute and return a mask for a matches dataframe using Lowe's ratio test. If keypoints have a single Loading Loading @@ -87,10 +87,14 @@ class DistanceRatio(Observable): self.single = single if mask is not None: self.mask = mask.copy() new_mask = self.matches[mask].groupby('source_idx')['distance'].transform(func).astype('bool') self.mask[mask == True] = new_mask mask_s = self.matches[mask].groupby('source_idx')['distance'].transform(func).astype('bool') mask_d = self.matches[mask].groupby('destination_idx')['distance'].transform(func).astype('bool') self.mask[mask] = mask_s & mask_d else: self.mask = self.matches.groupby('source_idx')['distance'].transform(func).astype('bool') mask_s = self.matches.groupby('source_idx')['distance'].transform(func).astype('bool') mask_d = self.matches.groupby('destination_idx')['distance'].transformat(func).astype('bool') self.mask = (mask_s & mask_d) state_package = {'ratio': ratio, 'mask': self.mask.copy(), Loading Loading @@ -354,6 +358,8 @@ def compute_fundamental_matrix(kp1, kp2, method='ransac', reproj_threshold=5.0, method_ = cv2.FM_LMEDS elif method == 'normal': method_ = cv2.FM_7POINT elif method == '8point': method_ = cv2.FM_8POINT else: raise ValueError("Unknown outlier detection method. Choices are: 'ransac', 'lmeds', or 'normal'.") Loading
autocnet/transformation/transformations.py +150 −35 Original line number Diff line number Diff line import abc import math from collections import deque import warnings import cv2 import numpy as np import pandas as pd import pysal as ps Loading @@ -20,24 +22,14 @@ class TransformationMatrix(np.ndarray): __metaclass__ = abc.ABCMeta @abc.abstractmethod def __new__(cls, inputarr, x1, x2, mask, local_mask=None): if not isinstance(inputarr, np.ndarray): raise TypeError('The transformation must be an ndarray') obj = np.asarray(inputarr).view(cls) obj.x1 = x1 obj.x2 = x2 obj.mask = mask obj.local_mask = local_mask def __new__(cls, ndarray, index): obj = np.asarray(ndarray).view(cls) obj.index = index obj._action_stack = deque(maxlen=10) obj._current_action_stack = 0 obj._observers = set() # Seed the state package with the initial creation state if mask is not None: state_package = {'arr': obj.copy(), 'mask': obj.mask.copy()} else: state_package = {'arr': obj.copy(), 'mask': None} obj._action_stack.append(state_package) Loading @@ -48,21 +40,29 @@ class TransformationMatrix(np.ndarray): def __array_finalize__(self, obj): if obj is None: return self.x1 = getattr(obj, 'x1', None) self.x2 = getattr(obj, 'x2', None) self.mask = getattr(obj, 'mask', None) self._action_stack = getattr(obj, '_action_stack', None) self._current_action_stack = getattr(obj, '_current_action_stack', None) self._observers = getattr(obj, '_observers', None) self.x1 = getattr(obj, 'x1', None) self.x2 = getattr(obj, 'x2', None) self.index = getattr(obj, 'index', None) self.mask = getattr(obj, 'mask', None) @abc.abstractproperty def determinant(self): if not getattr(self, '_determinant', None): self._determinant = np.linalg.det(self) return self._determinant return np.linalg.det(self) @abc.abstractproperty def rank(self): return np.linalg.matrix_rank(self) @abc.abstractproperty def condition(self): """ The condition is a measure of the numerical stability of the solution to a set of linear equations. """ if not getattr(self, '_condition', None): s = np.linalg.svd(self, compute_uv=False) self._condition = s[0] / s[1] Loading Loading @@ -166,6 +166,16 @@ class FundamentalMatrix(TransformationMatrix): Attributes ---------- x1 : ndarray (n,2) array of point correspondences used to compute F x2 : ndarray (n,2) array of point correspondences used to compute F mask : ndarray (n, 2) boolean array indicating whether a correspondence is considered an inliner determinant : float The determinant of the matrix Loading @@ -177,17 +187,6 @@ class FundamentalMatrix(TransformationMatrix): compute this homography """ @property def rank(self): """ A valid fundamental matrix should be rank 2. Hartley & Zisserman p. 280 """ rank = np.linalg.matrix_rank(self) if rank != 2: warnings.warn('F rank not equal to 2. This indicates a poor F matrix.') return rank def refine_with_mle(self, **kwargs): """ Given a linear approximation of F, refine using Maximum Liklihood estimation Loading Loading @@ -305,8 +304,8 @@ class FundamentalMatrix(TransformationMatrix): -------- compute_error : The method called to compute element-wise error. """ x = self.x1.loc[self.mask] x1 = self.x2.loc[self.mask] x = self.x1[self.mask] x1 = self.x2[self.mask] return self.compute_error(self.x1, self.x2) def compute_error(self, x, x1): Loading Loading @@ -345,6 +344,122 @@ class FundamentalMatrix(TransformationMatrix): return F_error def _normalize(self, a): """ Normalize a set of coordinates such that the origin is translated to the center and then scaled isotropically such that the average distance from the origin is $\sqrt{2}$. Parameters ---------- a : DataFrame (n,3) of homogeneous coordinates Returns ------- normalized : ndarray (3,3) tranformation matrix """ # Compute the normalization matrix centroid = a[['x', 'y']].mean() dist = np.sqrt(np.sum(((a[['x', 'y']] - centroid)**2).values, axis=1)) mean_dist = np.mean(dist) sq2 = math.sqrt(2) normalizer = np.array([[sq2 / mean_dist, 0, -sq2 / mean_dist * centroid[0]], [0, sq2 / mean_dist, -sq2 / mean_dist * centroid[1]], [0, 0, 1]]) return normalizer def compute(self, kp1, kp2, method='ransac', reproj_threshold=2.0, confidence=0.99): """ Given two arrays of keypoints compute the fundamental matrix Parameters ---------- kp1 : ndarray (n, 2) of coordinates from the source image kp2 : ndarray (n, 2) of coordinates from the destination image method : {'ransac', 'lmeds', 'normal', '8point'} The openCV algorithm to use for outlier detection reproj_threshold : float The maximum distances in pixels a reprojected points can be from the epipolar line to be considered an inlier confidence : float [0, 1] that the estimated matrix is correct Notes ----- While the method is user definable, if the number of input points is < 7, normal outlier detection is automatically used, if 7 > n > 15, least medians is used, and if 7 > 15, ransac can be used. """ if method == 'ransac': method_ = cv2.FM_RANSAC elif method == 'lmeds': method_ = cv2.FM_LMEDS elif method == 'normal': method_ = cv2.FM_7POINT elif method == '8point': method_ = cv2.FM_8POINT else: raise ValueError("Unknown outlier detection method. Choices are: 'ransac', 'lmeds', '8point', or 'normal'.") #x1_norm = self._normalize(kp1) #x2_norm = self._normalize(kp2) #kp1_norm = kp1.values.dot(x1_norm) #kp2_norm = kp2.values.dot(x2_norm) F, mask = cv2.findFundamentalMat(kp1.values, kp2.values, method_, param1=reproj_threshold, param2=confidence) try: mask = mask.astype(bool).ravel() # Enforce dimensionality except: pass # pragma: no cover # Ensure that the singularity constraint is met self._enforce_singularity_constraint() # Set instance variables to inputs self.x1 = kp1 self.x2 = kp2 self.mask = pd.Series(mask, index=self.index) # Denormalize F_Prime to F and set #f = x2_norm.T.dot(transformation_matrix).dot(x1_norm) self[:] = F def _enforce_singularity_constraint(self): """ The fundamental matrix should be rank 2. In instances when it is not, the singularity constraint should be enforced. This is forces epipolar lines to be conincident. References ---------- .. [Hartley2003] """ if self.rank != 2: u, d, vt = np.linalg.svd(self) f1 = u.dot(np.diag([d[0], d[1], 0])).dot(vt) self[:] = f1 def recompute_matrix(self): raise NotImplementedError Loading
autocnet/utils/tests/test_utils.py +11 −0 Original line number Diff line number Diff line import unittest import numpy as np import pandas as pd from .. import utils Loading Loading @@ -116,3 +118,12 @@ class TestUtils(unittest.TestCase): y = utils.normalize_vector(x) truth = np.tile(np.array([ 0.70710678, 0.70710678, 0.70710678]), 4).reshape(4,3) np.testing.assert_array_almost_equal(truth, y) def test_slope(self): x1 = pd.DataFrame({'x': np.arange(1, 11), 'y': np.arange(1, 11)}) x2 = pd.DataFrame({'x': np.arange(6, 16), 'y': np.arange(11, 21)}) slope = utils.calculate_slope(x1, x2) self.assertEqual(slope[0], 2) No newline at end of file