Commit 2fba1be6 authored by Laura, Jason R's avatar Laura, Jason R
Browse files

Merge branch 'main' into 'main'

Main

See merge request astrogeology/autocnet!667
parents fd436109 8d82a695
Loading
Loading
Loading
Loading
+10 −1
Original line number Diff line number Diff line
@@ -32,7 +32,16 @@ When preparing for a bug fix release create a new 2nd heading above the Fixed
heading to indicate that only the bug fixes and security fixes are in the bug fix
release.
-->
## [Unreleased]
## [1.0.0-rc2]

### Changed
- Redis queue population pushed to a background thread for non-blocking data processing.

### Fixed
- clip_center property in the roi object was checking for existence using getattr which fails on ndarrays that expect to use all() or any() for boolean checks. Fixed to use only tuples for clip_center.


## [1.0.0-rc1]

### Added
- [`pool_pre_ping`](https://docs.sqlalchemy.org/en/14/core/pooling.html#disconnect-handling-pessimistic) to the sqlalchemy engine connection to handle instances where hundreds of connections are simultaneously connecting to the database.
+32 −12
Original line number Diff line number Diff line
@@ -1795,6 +1795,11 @@ class NetworkCandidateGraph(CandidateGraph):
                    'config':self.config}

            pipeline.rpush(self.processing_queue, json.dumps(msg, cls=JsonEncoder))
            if job_counter % 1000 == 0:
                # Redis can only accept 512MB of messages at a time. This ensures that the pipe
                # stays under the size limit.
                pipeline.execute()
                pipeline = self.redis_queue.pipeline()
        pipeline.execute()
        return job_counter + 1

@@ -1806,10 +1811,13 @@ class NetworkCandidateGraph(CandidateGraph):
            log.warning('Use of filters and query_string are mutually exclusive.')

        with self.session_scope() as session:
            t1 = time()
            # Support either an SQL query string, or a simple dict based query
            if query_string:
                log.info(f'Executing query {query_string} to generate objects for queuing.')
                res = session.execute(query_string).fetchall()
            else:
                log.info(f'Executing query to select {query_obj} with filters {filters}')
                query = session.query(query_obj)

                # Now apply any filters that might be passed in.
@@ -1820,6 +1828,8 @@ class NetworkCandidateGraph(CandidateGraph):
                res = query.order_by(query_obj.id).all()
            # Expunge so that the connection can be rapidly returned to the pool
            session.expunge_all()
            t2 = time()
        log.debug(f'Query time: {t2-t1}')

        if len(res) == 0:
            raise ValueError('Query returned zero results.')
@@ -1856,6 +1866,11 @@ class NetworkCandidateGraph(CandidateGraph):
            msg['config'] = self.config
            pipeline.rpush(self.processing_queue,
                                   json.dumps(msg, cls=JsonEncoder))
            if job_counter % 1000 == 0:
                # Redis can only accept 512MB of messages at a time. This ensures that the pipe
                # stays under the size limit.
                pipeline.execute()
                pipeline = self.redis_queue.pipeline()
        pipeline.execute()
        return job_counter + 1

@@ -1994,6 +2009,15 @@ class NetworkCandidateGraph(CandidateGraph):

        # TODO: reapply uses the queue name and reapplies on that queue.
        
        # Submit the jobs
        rconf = self.config['redis']
        rhost = rconf['host']
        rport = rconf['port']
        try:
            processing_queue = getattr(self, redis_queue)
        except AttributeError:
            log.exception(f'Unable to find attribute {redis_queue} on this object. Valid queue names are: "processing_queue" and "working_queue".')

        if not reapply:
            # Determine which obj will be called
            if isinstance(on, str):
@@ -2009,6 +2033,12 @@ class NetworkCandidateGraph(CandidateGraph):

            # Dispatch to either the database object message generator or the autocnet object message generator
            if isinstance(onobj, DeclarativeMeta):
                if just_stage:
                    # Populate the queue in a background thread.
                    th = threading.Thread(target=self._push_row_messages,
                                          args=(self, on, function, walltime, filters, query_string, args, kwargs))
                    th.start()
                else:
                    job_counter = self._push_row_messages(onobj, on, function, walltime, filters, query_string, args, kwargs)
            elif isinstance(onobj, (list, np.ndarray)):
                job_counter = self._push_iterable_message(onobj, function, walltime, args, kwargs)
@@ -2016,16 +2046,6 @@ class NetworkCandidateGraph(CandidateGraph):
                job_counter = self._push_obj_messages(onobj, function, walltime, args, kwargs)
            else:
                raise TypeError('The type of the `on` argument is not understood. Must be a database model, iterable, Node or Edge.')

        # Submit the jobs
        rconf = self.config['redis']
        rhost = rconf['host']
        rport = rconf['port']
        try:
            processing_queue = getattr(self, redis_queue)
        except AttributeError:
            log.exception(f'Unable to find attribute {redis_queue} on this object. Valid queue names are: "processing_queue" and "working_queue".')

        env = self.config['env']
        condaenv = env['conda']
        isisroot = env['ISISROOT']
+1 −1
Original line number Diff line number Diff line
@@ -321,7 +321,7 @@ class Roi():
                                        mode=mode,
                                        order=3)

            self._clip_center = (np.array(pixel_locked.shape)[::-1]) / 2.0
            self._clip_center = tuple(np.array(pixel_locked.shape)[::-1]) / 2.0)

            self._clipped_array = img_as_float32(pixel_locked)
        else:
+1 −1
Original line number Diff line number Diff line
@@ -5,7 +5,7 @@ from setuptools import setup, find_packages
with open('README.md', 'r') as f:
    long_description = f.read()

__version__ = '1.0.0-rc1'
__version__ = '1.0.0-rc2'

def setup_package():
    setup(