Unverified Commit 25427b05 authored by Kelvin Rodriguez's avatar Kelvin Rodriguez Committed by GitHub
Browse files

Isis special pixels (#577)



* Single clean commit for parallel ground

* Adds capability to use a redis write cache to improve performance

* simplify config via redis setup

* FIxes regression in measures

* Initial work on async watchers

* Fixes serialization when a relationship can't be loaded from a detached instance.

* Fixes regression in images

* Removes debug prints

* Fixes backref - need to be single sided

* dtype checking for reads

* Fices missed merge issues

* added np2isis dtype code

Co-authored-by: default avatarjay <jlaura@usgs.gov>
Co-authored-by: default avatarJay <jlaura@asu.edu>
parent d83087be
Loading
Loading
Loading
Loading
+176 −0
Original line number Diff line number Diff line
import json
import time

from sqlalchemy import insert, update
from sqlalchemy.sql.expression import bindparam

from autocnet.io.db.model import Points, Measures
from autocnet.utils.serializers import object_hook

def watch_insert_queue(queue, queue_name, counter_name, engine, stop_event):
    """
    A worker process to be launched in a thread that will asynchronously insert or update 
    objects in the Session using dicts pulled from a redis queue. Using this queuing approach
    many cluster jobs are able to push to the redis queue rapidly and then a single writer
    process can push the data back to the database.
    
    This function requires that the function called by the asynchronous cluster job INCR
    (increment) the counter_name key in the redis cache. This counter is INCR (incremented) 
    by cluster jobs to track how many messages have been pushed to the queue (queue_name). 
    This func then reads that many messages and DECR (de-increments) the counter by that
    many messages. This way this function only reads when data is present and reads can occur 
    asynchronously. This works becase the cluster job pushes to the right side of the redis
    list and this function reads n-messages from the left side.
    
    This method uses the sqlalchemy core interface for performance reasons. Therefore, some
    mundging of column names is used to ensure that the model to be processed matches the
    database column names.
    
    Parameters
    ----------
    queue : obj
            A Redis or StrictRedis connection instance
    
    queue_name : str
                 The name of the queue to watch
                 
    counter_name : str
                   The name of the incrementing counter to watch. 
                   
    operation : str
                either 'insert' or 'update'. If 'insert', the sqlalchemy
                bulk_insert_mappings func is used to add new rows. If 'update', 
                the sqlalchemy bulks_update_mappings func is used.
    
    engine : obj
              A sqlalchemy engine. 
              
    stop_event : obj
                 A threading.Event object with set and is_set members. This is the
                 poison pill that can be set to terminate the thread.
    """
    while not stop_event.is_set():
        # Determine the read length of objects to pull from the cache
        read_length = int(queue.get(counter_name))
        # Pull the objects from the cache
        points = []
        measures = []
        
        # Pull the SRID dynamically from the model (database)
        srid = Points.rectangular_srid

        for i in range(0, read_length):
            msg = json.loads(queue.lpop(queue_name), object_hook=object_hook)
            if isinstance(msg, dict):
                # A NULL id is not allowable, so pop if a NULL ID exists
                if msg['id'] == None:
                    msg.pop('id', None)

                # Since this avoids the ORM, need to map the table names manually
                msg['pointType'] = msg['pointtype']  
                msg['adjusted'] = f'SRID={srid};' + msg["adjusted"].wkt  # Geometries go in as EWKT
                
                # Measures are removed and manually added later
                point_measures = msg.pop('measures', [])
                if point_measures:
                    measures.append(point_measures)
                
                points.append(msg)
            # The message was successfully read, so atomically deincrement the counter
            queue.decr(counter_name)
            
        if points:
            # Write the cached objects into the database 
            with engine.connect() as conn:
                resp = conn.execute(
                    insert(Points.__table__).returning(Points.__table__.c.id),points
                )
                pointids = [i[0] for i in resp.all()]

                # Measures are a list of lists. Associate each list with a pointid and then flatten the list
                for i, measure_set in enumerate(measures):
                    for measure in measure_set:
                        measure['pointid'] = pointids[i]
                        measure.pop('id', None)  # As above, remove the NULL id
                        # Remap field names because the ORM is NOT being used
                        measure['serialnumber'] = measure.pop('serial', None)
                        measure['measureType'] = measure.pop('measuretype')
                measures = [measure for sublist in measures for measure in sublist]
                conn.execute(
                    insert(Measures.__table__), measures)
        time.sleep(5)

def watch_update_queue(queue, queue_name, counter_name, engine, stop_event):
    """
    A worker process to be launched in a thread that will asynchronously insert or update 
    objects in the Session using dicts pulled from a redis queue. Using this queuing approach
    many cluster jobs are able to push to the redis queue rapidly and then a single writer
    process can push the data back to the database.
    
    This function requires that the function called by the asynchronous cluster job INCR
    (increment) the counter_name key in the redis cache. This counter is INCR (incremented) 
    by cluster jobs to track how many messages have been pushed to the queue (queue_name). 
    This func then reads that many messages and DECR (de-increments) the counter by that
    many messages. This way this function only reads when data is present and reads can occur 
    asynchronously. This works becase the cluster job pushes to the right side of the redis
    list and this function reads n-messages from the left side.
    
    This method uses the sqlalchemy core interface for performance reasons. Therefore, some
    mundging of column names is used to ensure that the model to be processed matches the
    database column names.
    
    Parameters
    ----------
    queue : obj
            A Redis or StrictRedis connection instance
    
    queue_name : str
                 The name of the queue to watch
                 
    counter_name : str
                   The name of the incrementing counter to watch. 
                   
    operation : str
                either 'insert' or 'update'. If 'insert', the sqlalchemy
                bulk_insert_mappings func is used to add new rows. If 'update', 
                the sqlalchemy bulks_update_mappings func is used.
    
    engine : obj
              A sqlalchemy engine. 
              
    stop_event : obj
                 A threading.Event object with set and is_set members. This is the
                 poison pill that can be set to terminate the thread.
    """
    while not stop_event.is_set():
        # Determine the read length of objects to pull from the cache
        read_length = int(queue.get(counter_name))
        # Pull the objects from the cache
        measures = []
        
        for i in range(0, read_length):
            msg = json.loads(queue.lpop(queue_name), object_hook=object_hook)
            if isinstance(msg, dict):
                msg['_id'] = msg.pop('id', None)  # id is reserved by sqlalchemy on insert/update, remapped below
                measures.append(msg)

            # The message was successfully read, so atomically deincrement the counter
            queue.decr(counter_name)
                    
        # Write the updated measures to the db 
        if measures:
            with engine.connect() as conn:
                stmt = Measures.__table__.update().\
                            where(Measures.__table__.c.id == bindparam('_id')).\
                            values({'weight':bindparam('weight'),
                                    'measureIgnore':bindparam('ignore'),
                                    'templateMetric':bindparam('template_metric'),
                                    'templateShift':bindparam('template_shift'),
                                    'line': bindparam('line'),
                                    'sample':bindparam('sample'),
                                    'ChooserName':bindparam('choosername')})
                resp = conn.execute(
                    stmt, measures
                )

    time.sleep(5)
+15 −11
Original line number Diff line number Diff line
@@ -55,7 +55,7 @@ def _instantiate_row(msg, ncg):
    obj = objdict[msg['along']]
    with ncg.session_scope() as session:
        res = session.query(obj).filter(getattr(obj, 'id')==msg['id']).one()
        session.expunge(res) # Disconnect the object from the session
        session.expunge_all() # Disconnect the object from the session
    return res

def process(msg):
@@ -72,7 +72,7 @@ def process(msg):
    ncg.config_from_dict(msg['config'])
    if msg['along'] in ['node', 'edge']:
        obj = _instantiate_obj(msg, ncg)
    elif msg['along'] in ['points', 'measures', 'overlaps', 'images']:
    elif msg['along'] in ['candidategroundpoints', 'points', 'measures', 'overlaps', 'images']:
        obj = _instantiate_row(msg, ncg)
    else:
        obj = msg['along']
@@ -176,6 +176,9 @@ def manage_messages(args, queue):
    # in the list where the element is the JSON representation of the message. Maybe swap to a hash?
    remove_key = msg

    #Convert the message from binary into a dict
    msg = json.loads(msg, object_hook=object_hook)

    # Apply the algorithm
    response = process(msg)
    # Should go to a logger someday!
@@ -185,7 +188,8 @@ def manage_messages(args, queue):

def main():  # pragma: no cover
    args = vars(parse_args())
    
    # Get the message
    queue = StrictRedis(host=args['host'], port=args['port'], db=0)
    manage_messages(args, queue)

+82 −12
Original line number Diff line number Diff line
@@ -5,6 +5,7 @@ import json
import math
import os
from shutil import copyfile
import threading
from time import gmtime, strftime, time
import warnings
from itertools import combinations
@@ -38,6 +39,7 @@ from plurmy import Slurm
import autocnet
from autocnet.config_parser import parse_config
from autocnet.cg import cg
from autocnet.graph.asynchronous_funcs import watch_insert_queue, watch_update_queue
from autocnet.graph import markov_cluster
from autocnet.graph.edge import Edge, NetworkEdge
from autocnet.graph.node import Node, NetworkNode
@@ -1356,7 +1358,7 @@ class NetworkCandidateGraph(CandidateGraph):
                6: CandidateGroundPoints
            }

    def config_from_file(self, filepath):
    def config_from_file(self, filepath, async_watchers=False):
        """
        A NetworkCandidateGraph uses a database. This method parses a config
        file to set up the connection. Additionally, this loads planetary
@@ -1367,11 +1369,16 @@ class NetworkCandidateGraph(CandidateGraph):
        ----------
        filepath : str
                   The path to the config file

        async_watchers : bool
                         If True the ncg will also spawn redis queue watching threads
                         that manage asynchronous database inserts. This is primarily
                         used for increased write performance.
        """
        # The YAML library will raise any parse errors
        self.config_from_dict(parse_config(filepath))
        self.config_from_dict(parse_config(filepath), async_watchers=async_watchers)

    def config_from_dict(self, config_dict):
    def config_from_dict(self, config_dict, async_watchers=False):
        """
        A NetworkCandidateGraph uses a database. This method loads a config
        dict to set up the connection. Additionally, this loads planetary
@@ -1382,15 +1389,24 @@ class NetworkCandidateGraph(CandidateGraph):
        ----------
        filepath : str
                   The path to the config file

        async_watchers : bool
                         If True the ncg will also spawn redis queue watching threads
                         that manage asynchronous database inserts. This is primarily
                         used for increased write performance.
        """
        self.config = config_dict

        self.async_watchers = async_watchers
        # Setup REDIS
        self._setup_queues()

        # Setup the database
        self._setup_database()

        # Setup threaded queue watchers
        if self.async_watchers == True:
            self._setup_asynchronous_workers()

        # Setup the DEM
        # I dislike having the DEM on the NCG, but in the short term it
        # is the best solution I think. I don't want to pass the DEM around
@@ -1470,20 +1486,71 @@ class NetworkCandidateGraph(CandidateGraph):
        self.redis_queue = StrictRedis(host=conf['host'],
                                       port=conf['port'],
                                       db=0)
        self.processing_queue = conf['processing_queue']
        self.completed_queue = conf['completed_queue']
        self.working_queue = conf['working_queue']
        self.processing_queue = conf['basename'] + ':processing'
        self.completed_queue = conf['basename'] + ':completed'
        self.working_queue = conf['basename'] + ':working'
        self.point_insert_queue = conf['basename'] + ':point_insert_queue'
        self.point_insert_counter = conf['basename'] + ':point_insert_counter'
        self.measure_update_queue = conf['basename'] + ':measure_update_queue'
        self.measure_update_counter = conf['basename'] + ':measure_update_counter'

        self.queue_names = [self.processing_queue, self.completed_queue, self.working_queue,
                           self.point_insert_queue, self.point_insert_counter, 
                           self.measure_update_queue, self.measure_update_counter]
         
    def _setup_asynchronous_workers(self):
        
        # Default the counters to zero, unless they are already set from a run
        # where the NCG did not exit cleanly
        if self.redis_queue.get(self.point_insert_counter) is None:
            self.redis_queue.set(self.point_insert_counter, 0)

        if self.redis_queue.get(self.measure_update_counter) is None:
            self.redis_queue.set(self.measure_update_counter, 0)


        # Start the insert watching thread
        self.point_inserter_stop_event = threading.Event()
        self.point_inserter = threading.Thread(target=watch_insert_queue, 
                                               args=(self.redis_queue,
                                                     self.point_insert_queue, 
                                                     self.point_insert_counter, 
                                                     self.engine,
                                                     self.point_inserter_stop_event))
        self.point_inserter.setDaemon(True)
        self.point_inserter.start()

        # Start the update watching thread
        self.measure_updater_stop_event = threading.Event()
        self.measure_updater = threading.Thread(target=watch_update_queue, 
                                               args=(self.redis_queue,
                                                     self.measure_update_queue, 
                                                     self.measure_update_counter, 
                                                     self.engine,
                                                     self.measure_updater_stop_event))
        self.measure_updater.setDaemon(True)
        self.measure_updater.start()        

    def clear_queues(self):
        """
        Delete all messages from the redis queue. This a convenience method.
        The `redis_queue` object is a redis-py StrictRedis object with API
        documented at: https://redis-py.readthedocs.io/en/latest/#redis.StrictRedis

        This also needs to restart any threaded watchers of the queues.
        """
        queues = [self.processing_queue, self.completed_queue, self.working_queue]
        for q in queues:
        if self.async_watchers:
            self.point_inserter_stop_event.set()
            self.measure_updater_stop_event.set()
        
        for q in self.queue_names:
            self.redis_queue.delete(q)
        
        self._setup_queues()
        if self.async_watchers:
            self._setup_asynchronous_workers()


    def _execute_sql(self, sql):
        """
        Execute a raw SQL string in the database currently specified
@@ -1826,6 +1893,8 @@ class NetworkCandidateGraph(CandidateGraph):
                                        radsigma,
                                        self.config['spatial']['semimajor_rad'])
        
        print("df shape: ", df.shape)

        if flistpath is None:
            flistpath = os.path.splitext(path)[0] + '.lis'
        target = self.config['spatial'].get('target', None)
@@ -2042,13 +2111,14 @@ class NetworkCandidateGraph(CandidateGraph):
        sourcesession = sourceSession()

        sourceimages = sourcesession.execute(query_string).fetchall()

        # Change for SQLAlchemy >= 1.4, results are now row objects
        sourceimages = [sourceimage._asdict() for sourceimage in sourceimages]
        with self.session_scope() as destinationsession:
            destinationsession.execute(Images.__table__.insert(), sourceimages)

            # Get the camera objects to manually join. Keeps the caller from
            # having to remember to bring cameras as well.
            ids = [i[0] for i in sourceimages]
            #ids = [i[0] for i in sourceimages]
            #cameras = sourcesession.query(Cameras).filter(Cameras.image_id.in_(ids)).all()
            #for c in cameras:
            #    destinationsession.merge(c)
@@ -2240,7 +2310,7 @@ class NetworkCandidateGraph(CandidateGraph):
        jobs are then called on next cluster job launch, causing failures. This
        method provides a check for left over jobs.
        """
        llen = self.redis_queue.llen(self.config['redis']['processing_queue'])
        llen = self.redis_queue.llen(self.processing_queue)
        return llen

    @property
+1 −0
Original line number Diff line number Diff line
from . import keypoints
from . import network
+4 −7
Original line number Diff line number Diff line
@@ -6,7 +6,7 @@ import numpy as np
import shapely.wkb as swkb
from plio.io import io_controlnetwork as cnet
from autocnet.io.db.model import Measures

from autocnet.spatial.isis import isis2np_types

def db_to_df(engine, sql = """
SELECT measures."pointid",
@@ -56,6 +56,8 @@ ORDER BY measures."pointid", measures."id";
        sql : str
              The sql query to execute in the database.
        """
        print(sql)
        
        df = pd.read_sql(sql, engine)

        # measures.id DB column was read in to ensure the proper ordering of DF
@@ -190,11 +192,6 @@ def null_measure_ignore(point, size_x, size_y, valid_tol, verbose=False, ncg=Non
    if not ncg.Session:
        raise BrokenPipeError('This func requires a database session from a NetworkCandidateGraph.')
    
    isis2np_types = {
            "UnsignedByte" : "uint8",
            "SignedWord" : "int16",
            "Real" : "float64"}

    resultlog = []
    with ncg.session_scope() as session:
        pid = point.id
Loading