Loading .readthedocs.yaml +4 −8 Original line number Diff line number Diff line # .readthedocs.yaml # Read the Docs configuration file # See https://docs.readthedocs.io/en/stable/config-file/v2.html for details # Required version: 2 # Build documentation in the docs/ directory with Sphinx build: os: ubuntu-20.04 tools: python: mambaforge-4.10 sphinx: configuration: docs/conf.py conda: environment: docs/environment.yml CHANGELOG.md +9 −0 Original line number Diff line number Diff line Loading @@ -34,6 +34,10 @@ release. --> ## [Unreleased] ## [0.7.0]() ### Added - Added a mutual information matcher [#559](https://github.com/USGS-Astrogeology/autocnet/pull/559) - Added residual column information to the Points model Loading @@ -43,6 +47,11 @@ release. - `geom_match_simple` defaults to a 3rd order warp for interpolation - Speed improvements for place_points_from_cnet dependent on COPY method instead of ORM update - License from custom to CC0. Fixes [#607](https://github.com/USGS-Astrogeology/autocnet/issues/607) - `place_points_in_overlap` now properly handles ISIS sensor exceptions - Complex geometries that failed to find valid, in geometry points now fallback to using a random point distribution method to ensure points are added. - Point and Image deletion from the DB now CASCADE to the measures table making modifications via measures easier to manage. ### Fixed - `update_from_jigsaw` failures due to stale code. Now uses a conntext on the engine to ensure closure Loading autocnet/cg/cg.py +16 −0 Original line number Diff line number Diff line from math import isclose, ceil import random import warnings import pandas as pd Loading Loading @@ -313,6 +314,18 @@ def xy_in_polygon(x,y, geom): """ return geom.contains(Point(x, y)) def generate_random(number, polygon): points = [] minx, miny, maxx, maxy = polygon.bounds i = 0 while len(points) < number and i < 1000: pnt = Point(random.uniform(minx, maxx), random.uniform(miny, maxy)) if polygon.contains(pnt): print(pnt.x, pnt.y) points.append([pnt.x, pnt.y]) i += 1 return np.asarray(points) def distribute_points_classic(geom, nspts, ewpts, use_mrr=True, **kwargs): """ This is a decision tree that attempts to perform a Loading Loading @@ -378,6 +391,9 @@ def distribute_points_classic(geom, nspts, ewpts, use_mrr=True, **kwargs): points = np.vstack(points) # Perform a spatial intersection check to eject points that are not valid valid = [p for p in points if xy_in_polygon(p[0], p[1], original_geom)] # The standard method failed. Attempt random placement within the geometry if not valid: valid = generate_random(ewpts * nspts, original_geom) return valid def distribute_points_new(geom, nspts, ewpts, Session): Loading autocnet/graph/asynchronous_funcs.py +33 −6 Original line number Diff line number Diff line import json import time import warnings from sqlalchemy import insert from sqlalchemy.sql.expression import bindparam Loading Loading @@ -50,9 +51,15 @@ def watch_insert_queue(queue, queue_name, counter_name, engine, stop_event, slee A threading.Event object with set and is_set members. This is the poison pill that can be set to terminate the thread. """ # Use a pipe for efficient reads pipe = queue.pipe() while not stop_event.is_set(): # Determine the read length of objects to pull from the cache # Cap the read length at 250 messages to improve memory usage and throughput read_length = int(queue.get(counter_name)) if read_length > 250: read_length = 250 # Pull the objects from the cache points = [] measures = [] Loading @@ -61,8 +68,18 @@ def watch_insert_queue(queue, queue_name, counter_name, engine, stop_event, slee rect_srid = Points.rectangular_srid lat_srid = Points.latitudinal_srid try: # Pop the messages off the queue for i in range(0, read_length): msg = json.loads(queue.lpop(queue_name), object_hook=object_hook) pipe.lpop(queue_name) pipe.decr(counter_name) msgs = pipe.execute() except: msgs = [] time.sleep(5) for msg in msgs: msg = json.loads(msg, object_hook=object_hook) if isinstance(msg, dict): # A NULL id is not allowable, so pop if a NULL ID exists if msg['id'] == None: Loading @@ -87,10 +104,11 @@ def watch_insert_queue(queue, queue_name, counter_name, engine, stop_event, slee measures.append(point_measures) points.append(msg) # The message was successfully read, so atomically deincrement the counter queue.decr(counter_name) if points: if not points: continue try: # Write the cached objects into the database with engine.connect() as conn: resp = conn.execute( Loading @@ -109,7 +127,16 @@ def watch_insert_queue(queue, queue_name, counter_name, engine, stop_event, slee measures = [measure for sublist in measures for measure in sublist] conn.execute( insert(Measures.__table__), measures) time.sleep(sleep_time) except: try: # Pop the messages off the queue for i in range(0, read_length): pipe.rpush(queue_name, msgs[i]) pipe.incr(counter_name) msgs = pipe.execute() except: warnings.warn('Failed to push to DB and failed to repopulate queue.') time.sleep(5) def watch_update_queue(queue, queue_name, counter_name, engine, stop_event, sleep_time=5): """ Loading autocnet/graph/cluster_submit.py +9 −5 Original line number Diff line number Diff line Loading @@ -5,7 +5,7 @@ import copy import os import json import sys import warnings import logging from io import StringIO from contextlib import redirect_stdout Loading @@ -20,6 +20,8 @@ from autocnet.utils.utils import import_func from autocnet.utils.serializers import JsonEncoder, object_hook from autocnet.io.db.model import JobsHistory log = logging.getLogger(__name__) def parse_args(): # pragma: no cover parser = argparse.ArgumentParser() parser.add_argument('-r', '--host', help='The host URL for the redis queue to to pull messages from.') Loading Loading @@ -178,7 +180,7 @@ def manage_messages(args, queue): if msg is None: if args['queue'] == False: warnings.warn('Expected to process a cluster job, but the message queue is empty.') log.warning('Expected to process a cluster job, but the message queue is empty.') return elif args['queue'] == True: print(f'Completed processing from queue: {queue}.') Loading @@ -191,19 +193,19 @@ def manage_messages(args, queue): #Convert the message from binary into a dict msgdict = json.loads(msg, object_hook=object_hook) # should replace this with some logging logic later # rather than redirecting std out stdout = StringIO() with redirect_stdout(stdout): # Apply the algorithm response = process(msgdict) # Should go to a logger someday! # Should go to a logger someday! (today is that day!) print(response) out = stdout.getvalue() # print to get everything on the logs in the directory print(out) sys.stdout.flush() stdout.flush() Loading @@ -226,6 +228,8 @@ def manage_messages(args, queue): def main(): # pragma: no cover args = vars(parse_args()) # set up the logger logging.basicConfig(level=os.environ.get("AUTOCNET_LOGLEVEL", "INFO")) # Get the message queue = StrictRedis(host=args['host'], port=args['port'], db=0) manage_messages(args, queue) Loading Loading
.readthedocs.yaml +4 −8 Original line number Diff line number Diff line # .readthedocs.yaml # Read the Docs configuration file # See https://docs.readthedocs.io/en/stable/config-file/v2.html for details # Required version: 2 # Build documentation in the docs/ directory with Sphinx build: os: ubuntu-20.04 tools: python: mambaforge-4.10 sphinx: configuration: docs/conf.py conda: environment: docs/environment.yml
CHANGELOG.md +9 −0 Original line number Diff line number Diff line Loading @@ -34,6 +34,10 @@ release. --> ## [Unreleased] ## [0.7.0]() ### Added - Added a mutual information matcher [#559](https://github.com/USGS-Astrogeology/autocnet/pull/559) - Added residual column information to the Points model Loading @@ -43,6 +47,11 @@ release. - `geom_match_simple` defaults to a 3rd order warp for interpolation - Speed improvements for place_points_from_cnet dependent on COPY method instead of ORM update - License from custom to CC0. Fixes [#607](https://github.com/USGS-Astrogeology/autocnet/issues/607) - `place_points_in_overlap` now properly handles ISIS sensor exceptions - Complex geometries that failed to find valid, in geometry points now fallback to using a random point distribution method to ensure points are added. - Point and Image deletion from the DB now CASCADE to the measures table making modifications via measures easier to manage. ### Fixed - `update_from_jigsaw` failures due to stale code. Now uses a conntext on the engine to ensure closure Loading
autocnet/cg/cg.py +16 −0 Original line number Diff line number Diff line from math import isclose, ceil import random import warnings import pandas as pd Loading Loading @@ -313,6 +314,18 @@ def xy_in_polygon(x,y, geom): """ return geom.contains(Point(x, y)) def generate_random(number, polygon): points = [] minx, miny, maxx, maxy = polygon.bounds i = 0 while len(points) < number and i < 1000: pnt = Point(random.uniform(minx, maxx), random.uniform(miny, maxy)) if polygon.contains(pnt): print(pnt.x, pnt.y) points.append([pnt.x, pnt.y]) i += 1 return np.asarray(points) def distribute_points_classic(geom, nspts, ewpts, use_mrr=True, **kwargs): """ This is a decision tree that attempts to perform a Loading Loading @@ -378,6 +391,9 @@ def distribute_points_classic(geom, nspts, ewpts, use_mrr=True, **kwargs): points = np.vstack(points) # Perform a spatial intersection check to eject points that are not valid valid = [p for p in points if xy_in_polygon(p[0], p[1], original_geom)] # The standard method failed. Attempt random placement within the geometry if not valid: valid = generate_random(ewpts * nspts, original_geom) return valid def distribute_points_new(geom, nspts, ewpts, Session): Loading
autocnet/graph/asynchronous_funcs.py +33 −6 Original line number Diff line number Diff line import json import time import warnings from sqlalchemy import insert from sqlalchemy.sql.expression import bindparam Loading Loading @@ -50,9 +51,15 @@ def watch_insert_queue(queue, queue_name, counter_name, engine, stop_event, slee A threading.Event object with set and is_set members. This is the poison pill that can be set to terminate the thread. """ # Use a pipe for efficient reads pipe = queue.pipe() while not stop_event.is_set(): # Determine the read length of objects to pull from the cache # Cap the read length at 250 messages to improve memory usage and throughput read_length = int(queue.get(counter_name)) if read_length > 250: read_length = 250 # Pull the objects from the cache points = [] measures = [] Loading @@ -61,8 +68,18 @@ def watch_insert_queue(queue, queue_name, counter_name, engine, stop_event, slee rect_srid = Points.rectangular_srid lat_srid = Points.latitudinal_srid try: # Pop the messages off the queue for i in range(0, read_length): msg = json.loads(queue.lpop(queue_name), object_hook=object_hook) pipe.lpop(queue_name) pipe.decr(counter_name) msgs = pipe.execute() except: msgs = [] time.sleep(5) for msg in msgs: msg = json.loads(msg, object_hook=object_hook) if isinstance(msg, dict): # A NULL id is not allowable, so pop if a NULL ID exists if msg['id'] == None: Loading @@ -87,10 +104,11 @@ def watch_insert_queue(queue, queue_name, counter_name, engine, stop_event, slee measures.append(point_measures) points.append(msg) # The message was successfully read, so atomically deincrement the counter queue.decr(counter_name) if points: if not points: continue try: # Write the cached objects into the database with engine.connect() as conn: resp = conn.execute( Loading @@ -109,7 +127,16 @@ def watch_insert_queue(queue, queue_name, counter_name, engine, stop_event, slee measures = [measure for sublist in measures for measure in sublist] conn.execute( insert(Measures.__table__), measures) time.sleep(sleep_time) except: try: # Pop the messages off the queue for i in range(0, read_length): pipe.rpush(queue_name, msgs[i]) pipe.incr(counter_name) msgs = pipe.execute() except: warnings.warn('Failed to push to DB and failed to repopulate queue.') time.sleep(5) def watch_update_queue(queue, queue_name, counter_name, engine, stop_event, sleep_time=5): """ Loading
autocnet/graph/cluster_submit.py +9 −5 Original line number Diff line number Diff line Loading @@ -5,7 +5,7 @@ import copy import os import json import sys import warnings import logging from io import StringIO from contextlib import redirect_stdout Loading @@ -20,6 +20,8 @@ from autocnet.utils.utils import import_func from autocnet.utils.serializers import JsonEncoder, object_hook from autocnet.io.db.model import JobsHistory log = logging.getLogger(__name__) def parse_args(): # pragma: no cover parser = argparse.ArgumentParser() parser.add_argument('-r', '--host', help='The host URL for the redis queue to to pull messages from.') Loading Loading @@ -178,7 +180,7 @@ def manage_messages(args, queue): if msg is None: if args['queue'] == False: warnings.warn('Expected to process a cluster job, but the message queue is empty.') log.warning('Expected to process a cluster job, but the message queue is empty.') return elif args['queue'] == True: print(f'Completed processing from queue: {queue}.') Loading @@ -191,19 +193,19 @@ def manage_messages(args, queue): #Convert the message from binary into a dict msgdict = json.loads(msg, object_hook=object_hook) # should replace this with some logging logic later # rather than redirecting std out stdout = StringIO() with redirect_stdout(stdout): # Apply the algorithm response = process(msgdict) # Should go to a logger someday! # Should go to a logger someday! (today is that day!) print(response) out = stdout.getvalue() # print to get everything on the logs in the directory print(out) sys.stdout.flush() stdout.flush() Loading @@ -226,6 +228,8 @@ def manage_messages(args, queue): def main(): # pragma: no cover args = vars(parse_args()) # set up the logger logging.basicConfig(level=os.environ.get("AUTOCNET_LOGLEVEL", "INFO")) # Get the message queue = StrictRedis(host=args['host'], port=args['port'], db=0) manage_messages(args, queue) Loading