Commit 0a054228 authored by Kelvin Rodriguez's avatar Kelvin Rodriguez
Browse files

fixed subpixel tests and made subpixel phase consistant with what we expect

parent a5bd3e17
Loading
Loading
Loading
Loading
+40 −41
Original line number Diff line number Diff line
@@ -64,7 +64,7 @@ def check_geom_func(func):

def check_match_func(func):
    match_funcs = {
        "classic": subpixel_template_classic,
        "classic": subpixel_template,
        "phase": iterative_phase,
        "template": subpixel_template,
        "mutualinformation": mutual_information_match
@@ -202,7 +202,7 @@ def clip_roi(img, center_x, center_y, size_x=200, size_y=200, dtype="uint64"):
    return subarray, axr, ayr


def subpixel_phase(reference_roi, walking_roi, affine=None, **kwargs):
def subpixel_phase(reference_roi, moving_roi, affine=tf.AffineTransform(), **kwargs):
    """
    Apply the spectral domain matcher to a search and template image. To
    shift the images, the x_shift and y_shift, need to be subtracted from
@@ -215,26 +215,25 @@ def subpixel_phase(reference_roi, walking_roi, affine=None, **kwargs):
    reference_roi : Object
                    An Roi object from autocnet, the reference image to use when computing subpixel offsets.
                    Contains either an ndarray or a GeoDataset Object
    walking_roi : Object
    moving_roi : Object
                  An Roi object from autocnet, the walking image to move around and make comparisons to
                  the reference roi. Contains either an ndarray or a GeoDataset Object
    affine : Object
             Scikit image affine tranformation. This affine transformation is applied to the walking_roi
             Scikit image affine tranformation. This affine transformation is applied to the moving_roi
             before any comparisons are made

    Returns
    -------
    new_affine : Object
                 Scikit image affine transformation. An updated affine transformation that is the new
                 translation from the walking_roi to the reference_roi
                 translation from the moving_roi to the reference_roi
    : tuple
      With the RMSE error and absolute difference in phase
    """
    reference_image = reference_roi.clip()
    walking_template = walking_roi.clip(affine)
    walking_template = moving_roi.clip(affine)
    
    if reference_image.shape != walking_template.shape:

        reference_size = reference_image.shape
        walking_size = walking_template.shape
        updated_size_x = int(min(walking_size[1], reference_size[1]))
@@ -254,14 +253,15 @@ def subpixel_phase(reference_roi, walking_roi, affine=None, **kwargs):
        size = check_image_size((updated_size_x, updated_size_y))

        reference_roi.size_x, reference_roi.size_y = size
        walking_roi.size_x, walking_roi.size_y = size
        moving_roi.size_x, moving_roi.size_y = size

        reference_image = reference_roi.clip()
        walking_template = walking_roi.clip(affine)

        walking_template = moving_roi.clip(affine)
    (shift_y, shift_x), error, diffphase = registration.phase_cross_correlation(reference_image,
                                                                                walking_template,
                                                                                **kwargs)
    # get shifts in input pixel space 
    shift_x, shift_y = affine.inverse([shift_x, shift_y])[0]
    new_affine = tf.AffineTransform(translation=(shift_x, shift_y))
    return new_affine, error, diffphase

@@ -324,7 +324,6 @@ def subpixel_template(reference_roi, moving_roi, affine=tf.AffineTransform(), mo
        return None, None, None

    shift_x, shift_y, metrics, corrmap = func(img_as_float32(moving_clip), img_as_float32(ref_clip), **kwargs)

    if shift_x is None:
        return None, None, None
    
@@ -387,7 +386,7 @@ def subpixel_ciratefi(sx, sy, dx, dy, s_img, d_img, search_size=251, template_si
    return dx, dy, strength


def iterative_phase(reference_roi, walking_roi, affine=None, reduction=11, convergence_threshold=0.1, max_dist=50, **kwargs):
def iterative_phase(reference_roi, moving_roi, affine=tf.AffineTransform(), reduction=11, convergence_threshold=0.1, max_dist=50, **kwargs):
    """
    Iteratively apply a subpixel phase matcher to source (s_img) and destination (d_img)
    images. The size parameter is used to set the initial search space. The algorithm
@@ -401,7 +400,7 @@ def iterative_phase(reference_roi, walking_roi, affine=None, reduction=11, conve
    reference_roi : Object
                    An Roi object from autocnet, the reference image to use when computing subpixel offsets.
                    Contains either an ndarray or a GeoDataset Object
    walking_roi : Object
    moving_roi : Object
                  An Roi object from autocnet, the walking image to move around and make comparisons to
                  the reference roi. Contains either an ndarray or a GeoDataset Object
    size : tuple
@@ -422,25 +421,25 @@ def iterative_phase(reference_roi, walking_roi, affine=None, reduction=11, conve
    subpixel_phase : the function that applies a single iteration of the phase matcher
    """
    # get initial destination location
    dsample, dline = walking_roi.x, walking_roi.y
    dx, dy = walking_roi.x, walking_roi.y
    dsample, dline = moving_roi.x, moving_roi.y
    dx, dy = moving_roi.x, moving_roi.y

    size = (walking_roi.size_x, walking_roi.size_y)
    original_size = (walking_roi.size_x, walking_roi.size_y)
    size = (moving_roi.size_x, moving_roi.size_y)
    original_size = (moving_roi.size_x, moving_roi.size_y)

    subpixel_affine = tf.AffineTransform()

    while True:
        new_subpixel_affine, error, diffphase = subpixel_phase(reference_roi, walking_roi, affine, **kwargs)
        new_subpixel_affine, error, diffphase = subpixel_phase(reference_roi, moving_roi, affine, **kwargs)
        # Compute the amount of move the matcher introduced
        delta_dx, delta_dy = abs(new_subpixel_affine.translation)
        subpixel_affine += new_subpixel_affine
        dx, dy = subpixel_affine.inverse((reference_roi.x, reference_roi.y))[0]
        walking_roi.x, walking_roi.y = dx, dy
        moving_roi.x, moving_roi.y = dx, dy

        # Break if the solution has converged
        size = (size[0] - reduction, size[1] - reduction)
        walking_roi.size_x, walking_roi.size_y = size
        moving_roi.size_x, moving_roi.size_y = size
        reference_roi.size_x, reference_roi.size_y = size
        dist = np.linalg.norm([dsample-dx, dline-dy])

@@ -451,9 +450,9 @@ def iterative_phase(reference_roi, walking_roi, affine=None, reduction=11, conve
           abs(dist) <= max_dist:
            break

    walking_roi.size_x, walking_roi.size_y = original_size
    moving_roi.size_x, moving_roi.size_y = original_size
    reference_roi.size_x, reference_roi.size_y = original_size
    walking_roi.x, walking_roi.y = dsample, dline
    moving_roi.x, moving_roi.y = dsample, dline
    return subpixel_affine, error, diffphase


@@ -1534,7 +1533,7 @@ def estimate_logpolar_transform(img1, img2, low_sigma=0.5, high_sigma=30, verbos
    tf_rotate_from_center = (tf_shift + (tf_rotate + tf_shift_inv))
    return tf.SimilarityTransform((tf_rotate_from_center + tf_scale)._inv_matrix)

def fourier_mellen(img1, img2, verbose=False, phase_kwargs={}):
def fourier_mellen(ref_image, moving_image, affine=tf.AffineTransform(), verbose=False, phase_kwargs={}):
    """
    Iterative phase registration using a log-polar projection to estimate an affine for scale and roation invariance.

@@ -1542,10 +1541,10 @@ def fourier_mellen(img1, img2, verbose=False, phase_kwargs={}):
    Parameters
    ----------

    img1: np.ndarray
    ref_image: np.ndarray
               The source image, this is the image that is used as a base as img2 is registered to the center on img1

    img2: np.ndarray
    moving_image: np.ndarray
                  The image that will be moved to match img1

    verbose : bool
@@ -1558,40 +1557,40 @@ def fourier_mellen(img1, img2, verbose=False, phase_kwargs={}):
    -------

    : float
      The new x coordinate for img2 that registers to the center of img1
      The new x coordinate for moving_image that registers to the center of center_image

    : float
      The new y coordinate for img2 that registers to the center of img1
      The new y coordinate for moving_image that registers to the center of center_image

    : float
      Error returned by the iterative phase matcher
    """
    # Get the affine transformation for scale + rotation invariance
    affine = estimate_logpolar_transform(img1, img2, verbose=verbose)
    affine = estimate_logpolar_transform(ref_image.array(), moving_image.array(), verbose=verbose)

    # warp the source image to match the destination
    img1_warped = tf.warp(img1, affine)
    sx, sy = affine.inverse(np.asarray(img1.shape)/2)[0]
    ref_warped = tf.warp(ref_image.array(), affine)
    sx, sy = affine.inverse(np.asarray(ref_image.array().shape)/2)[0]

    # get translation with iterative phase
    newx, newy, error = iterative_phase(sx, sy, sx, sy, img1_warped, img2, **phase_kwargs)
    newx, newy, error = iterative_phase(sx, sy, sx, sy, ref_warped, moving_image, **phase_kwargs)

    if verbose:
        fig, axes = plt.subplots(2, 2, figsize=(8, 8))
        ax = axes.ravel()

        ax[0].imshow(img1_warped)
        ax[0].imshow(ref_warped)
        ax[0].set_title("Image 1 Transformed")
        ax[0].axhline(y=sy, color="red", linestyle="-", alpha=1, linewidth=1)
        ax[0].axvline(x=sx, color="red", linestyle="-", alpha=1, linewidth=1)

        ax[2].imshow(img1)
        ax[2].imshow(ref_image)
        ax[2].set_title("Image 1")
        ax[2].axhline(y=img1.shape[0]/2, color="red", linestyle="-", alpha=1, linewidth=1)
        ax[2].axvline(x=img1.shape[1]/2, color="red", linestyle="-", alpha=1, linewidth=1)
        ax[2].axhline(y=ref_image.array().shape[0]/2, color="red", linestyle="-", alpha=1, linewidth=1)
        ax[2].axvline(x=ref_image.array().shape[1]/2, color="red", linestyle="-", alpha=1, linewidth=1)

        ax[1].imshow(img2)
        ax[3].imshow(img2)
        ax[1].imshow(moving_image)
        ax[3].imshow(moving_image)

        if not newx or not newy:
            ax[1].set_title("Image 2 REGISTRATION FAILED")
+36 −92
Original line number Diff line number Diff line
import math
import os
from re import A
import sys
import unittest
from unittest.mock import patch
@@ -56,68 +57,41 @@ def test_clip_roi(center_x, center_y, size, expected):

    assert clip.mean() == expected

def clip_side_effect(*args, **kwargs):
    if np.array_equal(a, args[0]):
        return a, 0, 0
    else:
        center_y = b.shape[0] / 2
        center_x = b.shape[1] / 2
        bxr, bx = math.modf(center_x)
        byr, by = math.modf(center_y)
        bx = int(bx)
        by = int(by)
        return b[by-10:by+11, bx-10:bx+11], bxr, byr

def test_subpixel_template(apollo_subsets):
    a = apollo_subsets[0]
    b = apollo_subsets[1]
    with patch('autocnet.matcher.subpixel.clip_roi', side_effect=clip_side_effect):
        nx, ny, strength, _ = sp.subpixel_template(a.shape[1]/2, a.shape[0]/2,
                                                b.shape[1]/2, b.shape[0]/2,
                                                a, b, upsampling=16)
    
    ref_roi = roi.Roi(a, a.shape[0]/2, a.shape[1]/2, 10, 10)
    moving_roi = roi.Roi(b, math.floor(b.shape[0]/2), math.floor(b.shape[1]/2), 51, 51)

    affine, strength, corrmap = sp.subpixel_template(ref_roi, moving_roi, upsampling=16)
    print(corrmap)
    print(affine)

    assert strength >= 0.99
    assert nx == 50.5
    assert ny == 52.4375
    assert affine.translation[0] == 80.0625
    assert affine.translation[1] == 82

@pytest.mark.parametrize("loc, failure", [((0,4), True),
                                          ((4,0), True),
                                          ((1,1), False)])
def test_subpixel_template_at_edge(apollo_subsets, loc, failure):
    a = apollo_subsets[0]
    b = apollo_subsets[1]

    def func(*args, **kwargs):
        corr = np.zeros((10,10))
        corr[loc[0], loc[1]] = 10
        return 0, 0, 0, corr

    with patch('autocnet.matcher.subpixel.clip_roi', side_effect=clip_side_effect):
        if failure:
            with pytest.warns(UserWarning, match=r'Maximum correlation \S+'):
                nx, ny, strength, _ = sp.subpixel_template(a.shape[1]/2, a.shape[0]/2,
                                                        b.shape[1]/2, b.shape[0]/2,
                                                        a, b, upsampling=16,
                                                        func=func)
        else:
            nx, ny, strength, _ = sp.subpixel_template(a.shape[1]/2, a.shape[0]/2,
                                                        b.shape[1]/2, b.shape[0]/2,
                                                        a, b, upsampling=16,
                                                        func=func)
            assert nx == 50.5

def test_subpixel_transformed_template(apollo_subsets):
    a = apollo_subsets[0]
    b = apollo_subsets[1]

    moving_center = math.floor(b.shape[0]/2), math.floor(b.shape[1]/2)
    transform = tf.AffineTransform(rotation=math.radians(1), scale=(1.1,1.1))
    with patch('autocnet.matcher.subpixel.clip_roi', side_effect=clip_side_effect):
        nx, ny, strength, _ = sp.subpixel_transformed_template(a.shape[1]/2, a.shape[0]/2,
                                                b.shape[1]/2, b.shape[0]/2,
                                                a, b, transform, upsampling=16)
    ref_roi = roi.Roi(a, a.shape[0]/2, a.shape[1]/2, 10, 10)
    moving_roi = roi.Roi(b, *moving_center, 51, 51)

    # with patch('autocnet.transformation.roi.Roi.clip', side_effect=clip_side_effect):
    affine, strength, corrmap = sp.subpixel_template(ref_roi, moving_roi, transform, upsampling=16)
    
    print(corrmap)
    print(affine)
    assert strength >= 0.83
    assert nx == pytest.approx(50.576284)
    assert ny == pytest.approx(54.0081)
    assert affine.translation[0] == pytest.approx(70.68980522)
    assert affine.translation[1] == pytest.approx(68.20849946)


def test_estimate_logpolar_transform(iris_pair):
@@ -139,35 +113,6 @@ def test_fourier_mellen(iris_pair):
    assert pytest.approx(error, 0.01) == 0.0422 


@pytest.mark.parametrize("loc, failure", [((0,4), True),
                                          ((4,0), True),
                                          ((1,1), False)])
def test_subpixel_transformed_template_at_edge(apollo_subsets, loc, failure):
    a = apollo_subsets[0]
    b = apollo_subsets[1]

    def func(*args, **kwargs):
        corr = np.zeros((5,5))
        corr[loc[0], loc[1]] = 10
        return 0, 0, 0, corr

    transform = tf.AffineTransform(rotation=math.radians(1), scale=(1.1,1.1))
    with patch('autocnet.matcher.subpixel.clip_roi', side_effect=clip_side_effect):
        if failure:
            with pytest.warns(UserWarning, match=r'Maximum correlation \S+'):
                print(a.shape[1]/2, a.shape[0]/2,b.shape[1]/2, b.shape[0]/2,
                                                        a, b)
                nx, ny, strength, _ = sp.subpixel_transformed_template(a.shape[1]/2, a.shape[0]/2,
                                                        b.shape[1]/2, b.shape[0]/2,
                                                        a, b, transform, upsampling=16,
                                                        func=func)
        else:
            nx, ny, strength, _ = sp.subpixel_transformed_template(a.shape[1]/2, a.shape[0]/2,
                                                        b.shape[1]/2, b.shape[0]/2,
                                                        a, b, transform, upsampling=16,
                                                        func=func)
            assert nx == 50.5

@pytest.mark.parametrize("convergence_threshold, expected", [(2.0, (50.49, 52.44, -9.5e-20))])
def test_iterative_phase(apollo_subsets, convergence_threshold, expected):
    reference_image = apollo_subsets[0]
@@ -197,13 +142,13 @@ def test_check_image_size(data, expected):
    assert sp.check_image_size(data) == expected

@pytest.mark.parametrize("x, y, x1, y1, image_size, template_size, expected",[
    (4, 3, 4, 2, (5,5), (3,3), (4,2)),
    (4, 3, 4, 2, (7,7), (3,3), (4,2)),  # Increase the search image size
    (4, 3, 4, 2, (7,7), (5,5), (4,2)), # Increase the template size
    (4, 3, 3, 2, (7,7), (3,3), (4,2)), # Move point in the x-axis
    (4, 3, 5, 3, (7,7), (3,3), (4,2)), # Move point in the other x-direction
    (4, 3, 4, 1, (7,7), (3,3), (4,2)), # Move point negative in the y-axis
    (4, 3, 4, 3, (7,7), (3,3), (4,2))  # Move point positive in the y-axis
    (4, 3, 4, 2, (5,5), (3,3), (0,-1)),
    (4, 3, 4, 2, (7,7), (3,3), (0,-2)),  # Increase the search image size
    (4, 3, 4, 2, (7,7), (5,5), (0,-2)), # Increase the template size
    (4, 3, 3, 2, (7,7), (3,3), (-1,-2)), # Move point in the x-axis
    (4, 3, 5, 3, (7,7), (3,3), (1,-2)), # Move point in the other x-direction
    (4, 3, 4, 1, (7,7), (3,3), (0, -2)), # Move point negative in the y-axis
    (4, 3, 4, 3, (7,7), (3,3), (0,-2))  # Move point positive in the y-axis

])
def test_subpixel_template_cooked(x, y, x1, y1, image_size, template_size, expected):
@@ -228,14 +173,12 @@ def test_subpixel_template_cooked(x, y, x1, y1, image_size, template_size, expec
                        (0, 0, 0, 0, 1, 0, 0, 0, 0),
                        (0, 0, 0, 0, 0, 0, 0, 0, 0)), dtype=np.uint8)

    dx, dy, corr, corrmap = sp.subpixel_template(x, y, x1, y1, 
                                                 test_image, t_shape,
                                                 image_size=image_size, 
                                                 template_size=template_size, 
                                                 upsampling=1)
    assert corr >= 1.0  # geq because sometime returning weird float > 1 from OpenCV
    assert dx == expected[0]
    assert dy == expected[1]
    ref_roi = roi.Roi(test_image, x, y, *image_size)
    moving_roi = roi.Roi(t_shape, x1, y1, *template_size)
    new_affine, corr, corrmap = sp.subpixel_template(ref_roi, moving_roi, upsampling=1)
    assert corr >= 0.8  # geq because sometime returning weird float > 1 from OpenCV
    assert new_affine.translation[0] == expected[0]
    assert new_affine.translation[1] == expected[1]

@pytest.mark.parametrize("x, y, x1, y1, image_size, expected",[
    (4, 3, 3, 2, (1,1), (3,2)),
@@ -274,7 +217,8 @@ def test_subpixel_phase_cooked(x, y, x1, y1, image_size, expected):
    walking_roi = roi.Roi(t_shape, x1, y1, size_x=image_size[0], size_y=image_size[1])

    affine, metrics, _ = sp.subpixel_phase(reference_roi, walking_roi)

    print(affine)
    dx, dy = affine.inverse((x1, y1))[0]
    print(affine)
    assert dx == expected[0]
    assert dy == expected[1]
+1 −10
Original line number Diff line number Diff line
@@ -189,11 +189,7 @@ class Roi():
        self.dtype = dtype

        if affine:
            self.size_x *= 2
            self.size_y *= 2

            array_to_warp = self.array

            # if array_to_warp.shape != ((self.size_y * 2) + 1, (self.size_x * 2) + 1):
            #     raise ValueError("Unable to enlarge Roi to apply affine transformation." +
            #                      f" Was only able to extract {array_to_warp.shape}, when " +
@@ -203,11 +199,6 @@ class Roi():

            #Affine transform the larger, moving array
            transformed_array = tf.warp(array_to_warp, affine, order=3, mode=mode)
            # print(transformed_array.shape)
            self.size_x = int(self.size_x / 2)
            self.size_y = int(self.size_y / 2)

            new_center = np.array(transformed_array.shape)/2
            return Roi(transformed_array, *new_center, self.size_x, self.size_y).clip()
            return transformed_array

        return self.array