Loading plio/io/io_controlnetwork.py +2 −2 Original line number Diff line number Diff line Loading @@ -348,7 +348,7 @@ class IsisStore(object): arr = g.iloc[0][df_attr] if isinstance(arr, np.ndarray): arr = arr.ravel().tolist() if arr: point_spec.aprioriCovar.extend(arr) # If field is repeated you must extend instead of assign elif cnf._CONTROLPOINTFILEENTRYV0002.fields_by_name[attr].label == 3: Loading plio/io/tests/test_io_controlnetwork.py +19 −13 Original line number Diff line number Diff line Loading @@ -4,6 +4,7 @@ import unittest from time import strftime, gmtime import pandas as pd import numpy as np import pvl from plio.io import io_controlnetwork Loading Loading @@ -69,12 +70,17 @@ def cnet_dataframe(tmpdir): serial_times = {295: '1971-07-31T01:24:11.754', 296: '1971-07-31T01:24:36.970'} serials = {i:'APOLLO15/METRIC/{}'.format(j) for i, j in enumerate(serial_times.values())} columns = ['id', 'pointType', 'serialnumber', 'measureType', 'sample', 'line', 'image_index', 'pointLog', 'measureLog'] columns = ['id', 'pointType', 'serialnumber', 'measureType', 'sample', 'line', 'image_index', 'pointLog', 'measureLog', 'aprioriCovar'] data = [] for i in range(npts): data.append((i, 2, serials[0], 2, 0, 0, 0, [], [])) data.append((i, 2, serials[1], 2, 0, 0, 1, [], [io_controlnetwork.MeasureLog(2, 0.5)])) aprioriCovar = None if i == npts - 1: aprioriCovar = np.ones((2,3)) data.append((i, 2, serials[0], 2, 0, 0, 0, [], [], aprioriCovar)) data.append((i, 2, serials[1], 2, 0, 0, 1, [], [io_controlnetwork.MeasureLog(2, 0.5)],aprioriCovar)) df = pd.DataFrame(data, columns=columns) Loading @@ -85,7 +91,7 @@ def cnet_dataframe(tmpdir): df.header_message_size = 78 df.point_start_byte = 65614 # 66949 df.npts = npts df.measure_size = 149 # Size of each measure in bytes df.measure_size = [149, 149, 149, 149, 200] # Size of each measure in bytes df.serials = serials return df Loading @@ -104,22 +110,23 @@ def test_create_buffer_header(cnet_dataframe, tmpdir): assert 'None' == header_protocol.description assert cnet_dataframe.modified_date == header_protocol.lastModified #Repeating assert [cnet_dataframe.measure_size] * cnet_dataframe.npts == header_protocol.pointMessageSizes assert cnet_dataframe.measure_size == header_protocol.pointMessageSizes def test_create_point(cnet_dataframe, tmpdir): with open(tmpdir.join('test.net'), 'rb') as f: f.seek(cnet_dataframe.point_start_byte) for i, length in enumerate([cnet_dataframe.measure_size] * cnet_dataframe.npts): for i, length in enumerate(cnet_dataframe.measure_size): point_protocol = cnf.ControlPointFileEntryV0002() raw_point = f.read(length) point_protocol.ParseFromString(raw_point) assert str(i) == point_protocol.id assert 2 == point_protocol.type print(len(point_protocol.measures)) for i, m in enumerate(point_protocol.measures): if i == cnet_dataframe.npts - 1: assert point_protocol.aprioriCovar == [1.0, 1.0, 1.0, 1.0, 1.0, 1.0] for j, m in enumerate(point_protocol.measures): assert m.serialnumber in cnet_dataframe.serials.values() assert 2 == m.type assert len(m.log) == i assert len(m.log) == j # Only the second measure has a message def test_create_pvl_header(cnet_dataframe, tmpdir): with open(tmpdir.join('test.net'), 'rb') as f: Loading @@ -132,8 +139,7 @@ def test_create_pvl_header(cnet_dataframe, tmpdir): assert 10 == mpoints points_bytes = find_in_dict(pvl_header, 'PointsBytes') assert 745 == points_bytes assert 796 == points_bytes points_start_byte = find_in_dict(pvl_header, 'PointsStartByte') assert cnet_dataframe.point_start_byte == points_start_byte Loading
plio/io/io_controlnetwork.py +2 −2 Original line number Diff line number Diff line Loading @@ -348,7 +348,7 @@ class IsisStore(object): arr = g.iloc[0][df_attr] if isinstance(arr, np.ndarray): arr = arr.ravel().tolist() if arr: point_spec.aprioriCovar.extend(arr) # If field is repeated you must extend instead of assign elif cnf._CONTROLPOINTFILEENTRYV0002.fields_by_name[attr].label == 3: Loading
plio/io/tests/test_io_controlnetwork.py +19 −13 Original line number Diff line number Diff line Loading @@ -4,6 +4,7 @@ import unittest from time import strftime, gmtime import pandas as pd import numpy as np import pvl from plio.io import io_controlnetwork Loading Loading @@ -69,12 +70,17 @@ def cnet_dataframe(tmpdir): serial_times = {295: '1971-07-31T01:24:11.754', 296: '1971-07-31T01:24:36.970'} serials = {i:'APOLLO15/METRIC/{}'.format(j) for i, j in enumerate(serial_times.values())} columns = ['id', 'pointType', 'serialnumber', 'measureType', 'sample', 'line', 'image_index', 'pointLog', 'measureLog'] columns = ['id', 'pointType', 'serialnumber', 'measureType', 'sample', 'line', 'image_index', 'pointLog', 'measureLog', 'aprioriCovar'] data = [] for i in range(npts): data.append((i, 2, serials[0], 2, 0, 0, 0, [], [])) data.append((i, 2, serials[1], 2, 0, 0, 1, [], [io_controlnetwork.MeasureLog(2, 0.5)])) aprioriCovar = None if i == npts - 1: aprioriCovar = np.ones((2,3)) data.append((i, 2, serials[0], 2, 0, 0, 0, [], [], aprioriCovar)) data.append((i, 2, serials[1], 2, 0, 0, 1, [], [io_controlnetwork.MeasureLog(2, 0.5)],aprioriCovar)) df = pd.DataFrame(data, columns=columns) Loading @@ -85,7 +91,7 @@ def cnet_dataframe(tmpdir): df.header_message_size = 78 df.point_start_byte = 65614 # 66949 df.npts = npts df.measure_size = 149 # Size of each measure in bytes df.measure_size = [149, 149, 149, 149, 200] # Size of each measure in bytes df.serials = serials return df Loading @@ -104,22 +110,23 @@ def test_create_buffer_header(cnet_dataframe, tmpdir): assert 'None' == header_protocol.description assert cnet_dataframe.modified_date == header_protocol.lastModified #Repeating assert [cnet_dataframe.measure_size] * cnet_dataframe.npts == header_protocol.pointMessageSizes assert cnet_dataframe.measure_size == header_protocol.pointMessageSizes def test_create_point(cnet_dataframe, tmpdir): with open(tmpdir.join('test.net'), 'rb') as f: f.seek(cnet_dataframe.point_start_byte) for i, length in enumerate([cnet_dataframe.measure_size] * cnet_dataframe.npts): for i, length in enumerate(cnet_dataframe.measure_size): point_protocol = cnf.ControlPointFileEntryV0002() raw_point = f.read(length) point_protocol.ParseFromString(raw_point) assert str(i) == point_protocol.id assert 2 == point_protocol.type print(len(point_protocol.measures)) for i, m in enumerate(point_protocol.measures): if i == cnet_dataframe.npts - 1: assert point_protocol.aprioriCovar == [1.0, 1.0, 1.0, 1.0, 1.0, 1.0] for j, m in enumerate(point_protocol.measures): assert m.serialnumber in cnet_dataframe.serials.values() assert 2 == m.type assert len(m.log) == i assert len(m.log) == j # Only the second measure has a message def test_create_pvl_header(cnet_dataframe, tmpdir): with open(tmpdir.join('test.net'), 'rb') as f: Loading @@ -132,8 +139,7 @@ def test_create_pvl_header(cnet_dataframe, tmpdir): assert 10 == mpoints points_bytes = find_in_dict(pvl_header, 'PointsBytes') assert 745 == points_bytes assert 796 == points_bytes points_start_byte = find_in_dict(pvl_header, 'PointsStartByte') assert cnet_dataframe.point_start_byte == points_start_byte