Commit d32f2a78 authored by Jay Laura's avatar Jay Laura
Browse files

Adds logging and threading to message pushes

parent fd436109
Loading
Loading
Loading
Loading
+31 −11
Original line number Diff line number Diff line
@@ -1795,6 +1795,11 @@ class NetworkCandidateGraph(CandidateGraph):
                    'config':self.config}

            pipeline.rpush(self.processing_queue, json.dumps(msg, cls=JsonEncoder))
            if job_counter % 1000 == 0:
                # Redis can only accept 512MB of messages at a time. This ensures that the pipe
                # stays under the size limit.
                pipeline.execute()
                pipeline = self.redis_queue.pipeline()
        pipeline.execute()
        return job_counter + 1

@@ -1806,10 +1811,13 @@ class NetworkCandidateGraph(CandidateGraph):
            log.warning('Use of filters and query_string are mutually exclusive.')

        with self.session_scope() as session:
            t1 = time()
            # Support either an SQL query string, or a simple dict based query
            if query_string:
                log.info(f'Executing query {query_string} to generate objects for queuing.')
                res = session.execute(query_string).fetchall()
            else:
                log.info(f'Executing query to select {query_obj} with filters {filters}')
                query = session.query(query_obj)

                # Now apply any filters that might be passed in.
@@ -1820,6 +1828,8 @@ class NetworkCandidateGraph(CandidateGraph):
                res = query.order_by(query_obj.id).all()
            # Expunge so that the connection can be rapidly returned to the pool
            session.expunge_all()
            t2 = time()
        log.debug(f'Query time: {t2-t1}')

        if len(res) == 0:
            raise ValueError('Query returned zero results.')
@@ -1856,6 +1866,11 @@ class NetworkCandidateGraph(CandidateGraph):
            msg['config'] = self.config
            pipeline.rpush(self.processing_queue,
                                   json.dumps(msg, cls=JsonEncoder))
            if job_counter % 1000 == 0:
                # Redis can only accept 512MB of messages at a time. This ensures that the pipe
                # stays under the size limit.
                pipeline.execute()
                pipeline = self.redis_queue.pipeline()
        pipeline.execute()
        return job_counter + 1

@@ -1994,6 +2009,15 @@ class NetworkCandidateGraph(CandidateGraph):

        # TODO: reapply uses the queue name and reapplies on that queue.
        
        # Submit the jobs
        rconf = self.config['redis']
        rhost = rconf['host']
        rport = rconf['port']
        try:
            processing_queue = getattr(self, redis_queue)
        except AttributeError:
            log.exception(f'Unable to find attribute {redis_queue} on this object. Valid queue names are: "processing_queue" and "working_queue".')

        if not reapply:
            # Determine which obj will be called
            if isinstance(on, str):
@@ -2009,6 +2033,12 @@ class NetworkCandidateGraph(CandidateGraph):

            # Dispatch to either the database object message generator or the autocnet object message generator
            if isinstance(onobj, DeclarativeMeta):
                if just_stage:
                    # Populate the queue in a background thread.
                    th = threading.Thread(target=self._push_row_messages,
                                          args=(self, on, function, walltime, filters, query_string, args, kwargs))
                    th.start()
                else:
                    job_counter = self._push_row_messages(onobj, on, function, walltime, filters, query_string, args, kwargs)
            elif isinstance(onobj, (list, np.ndarray)):
                job_counter = self._push_iterable_message(onobj, function, walltime, args, kwargs)
@@ -2016,16 +2046,6 @@ class NetworkCandidateGraph(CandidateGraph):
                job_counter = self._push_obj_messages(onobj, function, walltime, args, kwargs)
            else:
                raise TypeError('The type of the `on` argument is not understood. Must be a database model, iterable, Node or Edge.')

        # Submit the jobs
        rconf = self.config['redis']
        rhost = rconf['host']
        rport = rconf['port']
        try:
            processing_queue = getattr(self, redis_queue)
        except AttributeError:
            log.exception(f'Unable to find attribute {redis_queue} on this object. Valid queue names are: "processing_queue" and "working_queue".')

        env = self.config['env']
        condaenv = env['conda']
        isisroot = env['ISISROOT']