Commit cf70a85e authored by Laura, Jason R's avatar Laura, Jason R
Browse files

Merge branch 'main' into 'dev'

Main

See merge request astrogeology/autocnet!639
parents b0df84bc 8b736ed5
Loading
Loading
Loading
Loading
+5 −0
Original line number Diff line number Diff line
@@ -42,6 +42,11 @@ release.
- `geom_match_simple` defaults to a 3rd order warp for interpolation
- Speed improvements for place_points_from_cnet dependent on COPY method instead of ORM update
- License from custom to CC0. Fixes [#607](https://github.com/USGS-Astrogeology/autocnet/issues/607)
- `place_points_in_overlap` now properly handles ISIS sensor exceptions
- Complex geometries that failed to find valid, in geometry points now fallback
  to using a random point distribution method to ensure points are added.
- Point and Image deletion from the DB now CASCADE to the measures table making
  modifications via measures easier to manage. 

### Fixed
- `update_from_jigsaw` failures due to stale code. Now uses a conntext on the engine to ensure closure
+16 −0
Original line number Diff line number Diff line
from math import isclose, ceil
import random
import warnings

import pandas as pd
@@ -313,6 +314,18 @@ def xy_in_polygon(x,y, geom):
    """
    return geom.contains(Point(x, y))

def generate_random(number, polygon):
    points = []
    minx, miny, maxx, maxy = polygon.bounds
    i = 0
    while len(points) < number and i < 1000:
        pnt = Point(random.uniform(minx, maxx), random.uniform(miny, maxy))
        if polygon.contains(pnt):
            print(pnt.x, pnt.y)
            points.append([pnt.x, pnt.y])
        i += 1
    return np.asarray(points)

def distribute_points_classic(geom, nspts, ewpts, use_mrr=True, **kwargs):
    """
    This is a decision tree that attempts to perform a
@@ -378,6 +391,9 @@ def distribute_points_classic(geom, nspts, ewpts, use_mrr=True, **kwargs):
    points = np.vstack(points)
    # Perform a spatial intersection check to eject points that are not valid
    valid = [p for p in points if xy_in_polygon(p[0], p[1], original_geom)]
    # The standard method failed. Attempt random placement within the geometry
    if not valid:
        valid = generate_random(ewpts * nspts, original_geom)
    return valid

def distribute_points_new(geom, nspts, ewpts, Session):
+33 −6
Original line number Diff line number Diff line
import json
import time
import warnings

from sqlalchemy import insert
from sqlalchemy.sql.expression import bindparam
@@ -50,9 +51,15 @@ def watch_insert_queue(queue, queue_name, counter_name, engine, stop_event, slee
                 A threading.Event object with set and is_set members. This is the
                 poison pill that can be set to terminate the thread.
    """
    # Use a pipe for efficient reads
    pipe = queue.pipe()
    while not stop_event.is_set():

        # Determine the read length of objects to pull from the cache
        # Cap the read length at 250 messages to improve memory usage and throughput
        read_length = int(queue.get(counter_name))
        if read_length > 250:
            read_length = 250
        # Pull the objects from the cache
        points = []
        measures = []
@@ -61,8 +68,18 @@ def watch_insert_queue(queue, queue_name, counter_name, engine, stop_event, slee
        rect_srid = Points.rectangular_srid
        lat_srid = Points.latitudinal_srid
        
        try:
            # Pop the messages off the queue
            for i in range(0, read_length):
            msg = json.loads(queue.lpop(queue_name), object_hook=object_hook)
                pipe.lpop(queue_name)
                pipe.decr(counter_name)
            msgs = pipe.execute()
        except:
            msgs = []
            time.sleep(5)

        for msg in msgs:
            msg = json.loads(msg, object_hook=object_hook)
            if isinstance(msg, dict):
                # A NULL id is not allowable, so pop if a NULL ID exists
                if msg['id'] == None:
@@ -87,10 +104,11 @@ def watch_insert_queue(queue, queue_name, counter_name, engine, stop_event, slee
                    measures.append(point_measures)
                
                points.append(msg)
            # The message was successfully read, so atomically deincrement the counter
            queue.decr(counter_name)
            
        if points:
        if not points:
            continue
        
        try:
            # Write the cached objects into the database 
            with engine.connect() as conn:
                resp = conn.execute(
@@ -109,7 +127,16 @@ def watch_insert_queue(queue, queue_name, counter_name, engine, stop_event, slee
                measures = [measure for sublist in measures for measure in sublist]
                conn.execute(
                    insert(Measures.__table__), measures)
        time.sleep(sleep_time)
        except:
            try:
                # Pop the messages off the queue
                for i in range(0, read_length):
                    pipe.rpush(queue_name, msgs[i])
                    pipe.incr(counter_name)
                msgs = pipe.execute()
            except:
                warnings.warn('Failed to push to DB and failed to repopulate queue.')
                time.sleep(5)

def watch_update_queue(queue, queue_name, counter_name, engine, stop_event, sleep_time=5):
    """
+98 −0
Original line number Diff line number Diff line
import sys
import json

from autocnet.graph.network import NetworkCandidateGraph
from autocnet.graph.node import NetworkNode
from autocnet.graph.edge import NetworkEdge
from autocnet.utils.utils import import_func
from autocnet.utils.serializers import JsonEncoder, object_hook

def _instantiate_obj(msg, ncg):
    """
    Instantiate either a NetworkNode or a NetworkEdge that is the
    target of processing.

    """
    along = msg['along']
    id = msg['id']
    image_path = msg['image_path']
    if along == 'node':
        obj = NetworkNode(node_id=id, image_path=image_path)
    elif along == 'edge':
        obj = NetworkEdge()
        obj.source = NetworkNode(node_id=id[0], image_path=image_path[0])
        obj.destination = NetworkNode(node_id=id[1], image_path=image_path[1])
    obj.parent = ncg
    return obj

def _instantiate_row(msg, ncg):
    """
    Instantiate some db.io.model row object that is the target
    of processing.
    """
    # Get the dict mapping iterable keyword types to the objects
    objdict = ncg.apply_iterable_options
    obj = objdict[msg['along']]
    with ncg.session_scope() as session:
        res = session.query(obj).filter(getattr(obj, 'id')==msg['id']).one()
        session.expunge_all() # Disconnect the object from the session
    return res

def process(msg):
    """
    Given a message, instantiate the necessary processing objects and
    apply some generic function or method.

    Parameters
    ----------
    msg : dict
          The message that parametrizes the job.
    """
    # Deserialize the message
    msg = json.loads(msg, object_hook=object_hook)

    ncg = NetworkCandidateGraph()
    ncg.config_from_dict(msg['config'])
    if msg['along'] in ['node', 'edge']:
        obj = _instantiate_obj(msg, ncg)
    elif msg['along'] in ['candidategroundpoints', 'points', 'measures', 'overlaps', 'images']:
        obj = _instantiate_row(msg, ncg)
    else:
        obj = msg['along']

    # Grab the function and apply. This assumes that the func is going to
    # have a True/False return value. Basically, all processing needs to
    # occur inside of the func, nothing belongs in here.
    #
    # All args/kwargs are passed through the RedisQueue, and then right on to the func.
    func = msg['func']
    if callable(func):  # The function is a de-serialzied function
        msg['args'] = (obj, *msg['args'])
        msg['kwargs']['ncg'] = ncg
    elif hasattr(obj, msg['func']):  # The function is a method on the object
        func = getattr(obj, msg['func'])
    else:  # The func is a function from a library to be imported
        func = import_func(msg['func'])
        # Get the object begin processed prepended into the args.
        msg['args'] = (obj, *msg['args'])
        # For now, pass all the potential config items through
        # most funcs will simply discard the unnecessary ones.
        msg['kwargs']['ncg'] = ncg
        msg['kwargs']['Session'] = ncg.Session

    # Now run the function.
    res = func(*msg['args'], **msg['kwargs'])

    # Update the message with the True/False
    msg['results'] = res
    # Update the message with the correct callback function

    return msg

def main():
    msg = ''.join(sys.argv[1:])
    result = process(msg)
    print(result)
    
if __name__ == '__main__':
    main()
+42 −16
Original line number Diff line number Diff line
@@ -1598,7 +1598,7 @@ class NetworkCandidateGraph(CandidateGraph):
        """
        Push messages to the redis queue for objects e.g., Nodes and Edges
        """

        pipeline = self.redis_queue.pipeline()
        for job_counter, elem in enumerate(onobj.data('data')):
            if getattr(elem[-1], 'ignore', False):
                continue
@@ -1624,7 +1624,8 @@ class NetworkCandidateGraph(CandidateGraph):
                    'param_step':1,
                    'config':self.config}

            self.redis_queue.rpush(self.processing_queue, json.dumps(msg, cls=JsonEncoder))
            pipeline.rpush(self.processing_queue, json.dumps(msg, cls=JsonEncoder))
        pipeline.execute()
        return job_counter + 1

    def _push_row_messages(self, query_obj, on, function, walltime, filters, query_string, args, kwargs):
@@ -1647,10 +1648,13 @@ class NetworkCandidateGraph(CandidateGraph):

                # Execute the query to get the rows to be processed
                res = query.order_by(query_obj.id).all()
            # Expunge so that the connection can be rapidly returned to the pool
            session.expunge_all()

        if len(res) == 0:
            raise ValueError('Query returned zero results.')
            for row in res:
        pipeline = self.redis_queue.pipeline()
        for i, row in enumerate(res):
            msg = {'along':on,
                    'id':row.id,
                    'func':function,
@@ -1658,14 +1662,21 @@ class NetworkCandidateGraph(CandidateGraph):
                    'kwargs':kwargs,
                    'walltime':walltime}
            msg['config'] = self.config  # Hacky for now, just passs the whole config dict
                self.redis_queue.rpush(self.processing_queue,
            pipeline.rpush(self.processing_queue,
                                json.dumps(msg, cls=JsonEncoder))
            assert len(res) == self.queue_length
            if i % 1000 == 0:
                # Redis can only accept 512MB of messages at a time. This ensures that the pipe
                # stays under the size limit.
                pipeline.execute()
                pipeline = self.redis_queue.pipeline()
        pipeline.execute()
        return len(res)

    def _push_iterable_message(self, iterable, function, walltime, args, kwargs):
        if not iterable:  # the list is empty...
            raise ValueError('iterable is not an iterable object, e.g., a list or set')

        pipeline = self.redis_queue.pipeline()
        for job_counter, item in enumerate(iterable):
            msg = {'along':item,
                    'func':function,
@@ -1673,8 +1684,9 @@ class NetworkCandidateGraph(CandidateGraph):
                    'kwargs':kwargs,
                    'walltime':walltime}
            msg['config'] = self.config
            self.redis_queue.rpush(self.processing_queue,
            pipeline.rpush(self.processing_queue,
                                   json.dumps(msg, cls=JsonEncoder))
        pipeline.execute()
        return job_counter + 1

    def apply(self,
@@ -1693,6 +1705,7 @@ class NetworkCandidateGraph(CandidateGraph):
            queue=None,
            redis_queue='processing_queue',
            exclude=None,
            just_stage=False,
            **kwargs):
        """
        A mirror of the apply function from the standard CandidateGraph object. This implementation
@@ -1770,6 +1783,13 @@ class NetworkCandidateGraph(CandidateGraph):
                      The redis queue to push messages to that are then pulled by the
                      cluster job this call launches. Options are: 'processing_queue' (default)
                      or 'working_queue'

        just_stage : bool
                     If True, push messages to the redis queue for processing, but do not
                     submit a slurm/sbatch job to the cluster. This is useful when one process
                     is being used to orchestrate queue population and another process is being
                     used to process the messages. Default: False.

        Returns
        -------
        job_str : str
@@ -1848,6 +1868,10 @@ class NetworkCandidateGraph(CandidateGraph):
            job += ' --queue'  # Use queue mode where jobs run until the queue is empty
        command = f'{condasetup} && {isissetup} && srun {job}'

        # The user does not want to submit the job. Only stage the messages.
        if just_stage:
            return command

        if queue == None:
            queue = self.config['cluster']['queue']

@@ -1858,6 +1882,8 @@ class NetworkCandidateGraph(CandidateGraph):
                     partition=queue,
                     ntasks=ntasks,
                     output=log_dir+f'/autocnet.{function}-%j')

        # Submit the jobs to the cluster   
        if ntasks > 1:
            job_str = submitter.submit(exclude=exclude)
        else:
Loading