Commit a66b6f23 authored by Jay Laura's avatar Jay Laura Committed by jay
Browse files

updates redis push to work with large messages.

parent 1cba661d
Loading
Loading
Loading
Loading
+6 −1
Original line number Diff line number Diff line
@@ -1654,7 +1654,7 @@ class NetworkCandidateGraph(CandidateGraph):
        if len(res) == 0:
            raise ValueError('Query returned zero results.')
        pipeline = self.redis_queue.pipeline()
        for row in res:
        for i, row in enumerate(res):
            msg = {'along':on,
                    'id':row.id,
                    'func':function,
@@ -1664,6 +1664,11 @@ class NetworkCandidateGraph(CandidateGraph):
            msg['config'] = self.config  # Hacky for now, just passs the whole config dict
            pipeline.rpush(self.processing_queue,
                                json.dumps(msg, cls=JsonEncoder))
            if i % 1000 == 0:
                # Redis can only accept 512MB of messages at a time. This ensures that the pipe
                # stays under the size limit.
                pipeline.execute()
                pipeline = self.redis_queue.pipeline()
        pipeline.execute()
        return len(res)