Commit 02a46188 authored by Jay's avatar Jay
Browse files

Adds more efficient pushes to redis queue.

parent 8b974986
Loading
Loading
Loading
Loading
+24 −16
Original line number Diff line number Diff line
@@ -1596,7 +1596,7 @@ class NetworkCandidateGraph(CandidateGraph):
        """
        Push messages to the redis queue for objects e.g., Nodes and Edges
        """

        pipeline = self.redis_queue.pipeline()
        for job_counter, elem in enumerate(onobj.data('data')):
            if getattr(elem[-1], 'ignore', False):
                continue
@@ -1622,7 +1622,8 @@ class NetworkCandidateGraph(CandidateGraph):
                    'param_step':1,
                    'config':self.config}

            self.redis_queue.rpush(self.processing_queue, json.dumps(msg, cls=JsonEncoder))
            pipeline.rpush(self.processing_queue, json.dumps(msg, cls=JsonEncoder))
        pipeline.execute()
        return job_counter + 1

    def _push_row_messages(self, query_obj, on, function, walltime, filters, query_string, args, kwargs):
@@ -1645,9 +1646,12 @@ class NetworkCandidateGraph(CandidateGraph):

                # Execute the query to get the rows to be processed
                res = query.order_by(query_obj.id).all()
            # Expunge so that the connection can be rapidly returned to the pool
            session.expunge_all()

        if len(res) == 0:
            raise ValueError('Query returned zero results.')
        pipeline = self.redis_queue.pipeline()
        for row in res:
            msg = {'along':on,
                    'id':row.id,
@@ -1656,14 +1660,17 @@ class NetworkCandidateGraph(CandidateGraph):
                    'kwargs':kwargs,
                    'walltime':walltime}
            msg['config'] = self.config  # Hacky for now, just passs the whole config dict
                self.redis_queue.rpush(self.processing_queue,
            pipeline.rpush(self.processing_queue,
                                json.dumps(msg, cls=JsonEncoder))
        assert len(res) == self.queue_length
        pipeline.execute()
        return len(res)

    def _push_iterable_message(self, iterable, function, walltime, args, kwargs):
        if not iterable:  # the list is empty...
            raise ValueError('iterable is not an iterable object, e.g., a list or set')

        pipeline = self.redis_queue.pipeline()
        for job_counter, item in enumerate(iterable):
            msg = {'along':item,
                    'func':function,
@@ -1671,8 +1678,9 @@ class NetworkCandidateGraph(CandidateGraph):
                    'kwargs':kwargs,
                    'walltime':walltime}
            msg['config'] = self.config
            self.redis_queue.rpush(self.processing_queue,
            pipeline.rpush(self.processing_queue,
                                   json.dumps(msg, cls=JsonEncoder))
        pipeline.execute()
        return job_counter + 1

    def apply(self,
+1 −0
Original line number Diff line number Diff line
@@ -12,6 +12,7 @@ dependencies:
  - fakeredis
  - geoalchemy2
  - geopandas
  - hiredis
  - hoggorm
  - imageio
  - ipykernel