Loading knoten/bundle.py +179 −0 Original line number Diff line number Diff line Loading @@ -373,3 +373,182 @@ def compute_residuals(network, sensors): V = V.reshape(num_meas*2) return V def compute_sigma(V, W_parameters, W_observations): """ Computes the resulting standard deviation of the residuals for the current state of the bundle network. Parameters ---------- V : np.array The control network dataframe with updated ground points W_parameters : ndarray The parameter weight matrix (i.e.: sensor parameters and point weights) W_observations : ndarray The observation weight matrix (i.e.: point weights) Returns ------- : float64 Standard deviation of the residuals """ num_parameters = W_parameters.shape[0] num_observations = W_observations.shape[0] dof = num_observations - num_parameters VTPV = (V.dot(W_observations).dot(V)) sigma0 = np.sqrt(VTPV/dof) return sigma0 def bundle_iteration(J, V, W_parameters, W_observations): """ Parameters ---------- J : ndarray The control network as a dataframe generated by plio. V : np.array The control network dataframe with updated ground points W_parameters : ndarray The parameter weight matrix (i.e.: sensor parameters and point weights) W_observations : ndarray The observation weight matrix (i.e.: measure weights) Returns ------- N : """ N = J.T.dot(W_observations).dot(J) + W_parameters C = J.T.dot(W_observations).dot(V) dX = np.linalg.inv(N).dot(C) return N, dX # For data snooping we need to calculate updated residuals def compute_normalized_residual(J, V, N, W_parameters, W_observations): """ Computes the normalized residual statistic for the data snooping method. Method derived from Forstner 1985 "The Reliability of Block Triangulation" Parameters ---------- V : np.array The control network dataframe with updated ground points N : W_parameters : ndarray The parameter weight matrix (i.e.: sensor parameters and point weights) W_observations : ndarray The observation weight matrix (i.e.: point weights) Returns ------- : np.array Normalized residual statistic for the data snooping """ sigma0 = compute_sigma(V, W_parameters, W_observations) Qxx = np.linalg.inv(N) Qvv = np.linalg.inv(W_observations) - J.dot(Qxx).dot(J.T) qvv = np.diagonal(Qvv) sigma_vi = sigma0*np.sqrt(qvv) wi = -V/sigma_vi return wi def check_network(network): """ Check that all control points in a network have at least 2 remaining measures. Parameters ---------- network : DataFrame The control network as a dataframe generated by plio Returns ------- : list List of measure indices that were masked out for being the only measure on a point. """ bad_measures = [] for point_id, group in network.groupby('id'): if len(group) < 2: for measure_index, _ in group.iterrows(): bad_measures.append(measure_index) return bad_measures def data_snooping(network, sensors, parameters, k=3.29, verbose=True): """ Parameters ---------- network : DataFrame The control network as a dataframe generated by plio sensors : dict A dictionary that maps ISIS serial numbers to CSM sensors parameters : list The list of CsmParameter to compute the partials W.R.T. k : float64 Critical value used for rejection criteria; defaults to Forstner's 3.29 (or Baarda's 4.1??) verbose : bool If status prints should happen Returns ------- : list Indices of the network DataFrame that were rejected during data snooping """ net = network net['mask'] = False rejected_indices = [] awi = np.array([5, 5, 5, 5]) #initialize larger than k so you get into first iteration while (awi > k).any(): # weight matrices coefficient_columns = compute_coefficient_columns(net[~net['mask']], sensors, parameters) num_parameters = max(col_range[1] for col_range in coefficient_columns.values()) W_parameters = compute_parameter_weights(net[~net['mask']], sensors, parameters, coefficient_columns) num_observations = 2 * len(net[~net['mask']]) W_observations = np.eye(num_observations) # bundle iteration (and set up) V = compute_residuals(net[~net['mask']], sensors) J = compute_jacobian(net[~net['mask']], sensors, parameters, coefficient_columns) sigma0 = compute_sigma(V, W_parameters, W_observations) N, dX = bundle_iteration(J, V, W_parameters, W_observations) # calculate test statistic wi = compute_normalized_residual(J, V, N, W_parameters, W_observations) awi = abs(wi) #find maximum imax = np.argmax(awi) if verbose: print(f'max wi = {awi[imax]}') # display if awi[imax] <= k: if verbose: print('Data Snooping Outlier Rejection Complete') break reject_index = floor(imax/2) reject = net.index[~net['mask']][reject_index] net.loc[reject, ['mask']] = True rejected_indices.append(reject) if verbose: print(f'max wi index = {imax}') print(f'max wi measure index = {reject_index}') print(f'rejecting measure {net.loc[reject, ["id", "serialnumber"]].values}') not_enough_measures = check_network(net[~net['mask']]) if (not_enough_measures): for measure_index in not_enough_measures: if verbose: print(f'single measure point {net.loc[measure_index, "id"]}') print(f'rejecting measure {net.loc[measure_index, ["id", "serialnumber"]].values}') net.loc[measure_index, ['mask']] = True if verbose: print('') return rejected_indices Loading
knoten/bundle.py +179 −0 Original line number Diff line number Diff line Loading @@ -373,3 +373,182 @@ def compute_residuals(network, sensors): V = V.reshape(num_meas*2) return V def compute_sigma(V, W_parameters, W_observations): """ Computes the resulting standard deviation of the residuals for the current state of the bundle network. Parameters ---------- V : np.array The control network dataframe with updated ground points W_parameters : ndarray The parameter weight matrix (i.e.: sensor parameters and point weights) W_observations : ndarray The observation weight matrix (i.e.: point weights) Returns ------- : float64 Standard deviation of the residuals """ num_parameters = W_parameters.shape[0] num_observations = W_observations.shape[0] dof = num_observations - num_parameters VTPV = (V.dot(W_observations).dot(V)) sigma0 = np.sqrt(VTPV/dof) return sigma0 def bundle_iteration(J, V, W_parameters, W_observations): """ Parameters ---------- J : ndarray The control network as a dataframe generated by plio. V : np.array The control network dataframe with updated ground points W_parameters : ndarray The parameter weight matrix (i.e.: sensor parameters and point weights) W_observations : ndarray The observation weight matrix (i.e.: measure weights) Returns ------- N : """ N = J.T.dot(W_observations).dot(J) + W_parameters C = J.T.dot(W_observations).dot(V) dX = np.linalg.inv(N).dot(C) return N, dX # For data snooping we need to calculate updated residuals def compute_normalized_residual(J, V, N, W_parameters, W_observations): """ Computes the normalized residual statistic for the data snooping method. Method derived from Forstner 1985 "The Reliability of Block Triangulation" Parameters ---------- V : np.array The control network dataframe with updated ground points N : W_parameters : ndarray The parameter weight matrix (i.e.: sensor parameters and point weights) W_observations : ndarray The observation weight matrix (i.e.: point weights) Returns ------- : np.array Normalized residual statistic for the data snooping """ sigma0 = compute_sigma(V, W_parameters, W_observations) Qxx = np.linalg.inv(N) Qvv = np.linalg.inv(W_observations) - J.dot(Qxx).dot(J.T) qvv = np.diagonal(Qvv) sigma_vi = sigma0*np.sqrt(qvv) wi = -V/sigma_vi return wi def check_network(network): """ Check that all control points in a network have at least 2 remaining measures. Parameters ---------- network : DataFrame The control network as a dataframe generated by plio Returns ------- : list List of measure indices that were masked out for being the only measure on a point. """ bad_measures = [] for point_id, group in network.groupby('id'): if len(group) < 2: for measure_index, _ in group.iterrows(): bad_measures.append(measure_index) return bad_measures def data_snooping(network, sensors, parameters, k=3.29, verbose=True): """ Parameters ---------- network : DataFrame The control network as a dataframe generated by plio sensors : dict A dictionary that maps ISIS serial numbers to CSM sensors parameters : list The list of CsmParameter to compute the partials W.R.T. k : float64 Critical value used for rejection criteria; defaults to Forstner's 3.29 (or Baarda's 4.1??) verbose : bool If status prints should happen Returns ------- : list Indices of the network DataFrame that were rejected during data snooping """ net = network net['mask'] = False rejected_indices = [] awi = np.array([5, 5, 5, 5]) #initialize larger than k so you get into first iteration while (awi > k).any(): # weight matrices coefficient_columns = compute_coefficient_columns(net[~net['mask']], sensors, parameters) num_parameters = max(col_range[1] for col_range in coefficient_columns.values()) W_parameters = compute_parameter_weights(net[~net['mask']], sensors, parameters, coefficient_columns) num_observations = 2 * len(net[~net['mask']]) W_observations = np.eye(num_observations) # bundle iteration (and set up) V = compute_residuals(net[~net['mask']], sensors) J = compute_jacobian(net[~net['mask']], sensors, parameters, coefficient_columns) sigma0 = compute_sigma(V, W_parameters, W_observations) N, dX = bundle_iteration(J, V, W_parameters, W_observations) # calculate test statistic wi = compute_normalized_residual(J, V, N, W_parameters, W_observations) awi = abs(wi) #find maximum imax = np.argmax(awi) if verbose: print(f'max wi = {awi[imax]}') # display if awi[imax] <= k: if verbose: print('Data Snooping Outlier Rejection Complete') break reject_index = floor(imax/2) reject = net.index[~net['mask']][reject_index] net.loc[reject, ['mask']] = True rejected_indices.append(reject) if verbose: print(f'max wi index = {imax}') print(f'max wi measure index = {reject_index}') print(f'rejecting measure {net.loc[reject, ["id", "serialnumber"]].values}') not_enough_measures = check_network(net[~net['mask']]) if (not_enough_measures): for measure_index in not_enough_measures: if verbose: print(f'single measure point {net.loc[measure_index, "id"]}') print(f'rejecting measure {net.loc[measure_index, ["id", "serialnumber"]].values}') net.loc[measure_index, ['mask']] = True if verbose: print('') return rejected_indices