Commit 598dc3e3 authored by Jay's avatar Jay
Browse files

Updates redis read method for improved performance

parent fe32c3d1
Loading
Loading
Loading
Loading
+33 −6
Original line number Diff line number Diff line
import json
import time
import warnings

from sqlalchemy import insert
from sqlalchemy.sql.expression import bindparam
@@ -50,9 +51,15 @@ def watch_insert_queue(queue, queue_name, counter_name, engine, stop_event, slee
                 A threading.Event object with set and is_set members. This is the
                 poison pill that can be set to terminate the thread.
    """
    # Use a pipe for efficient reads
    pipe = queue.pipe()
    while not stop_event.is_set():

        # Determine the read length of objects to pull from the cache
        # Cap the read length at 250 messages to improve memory usage and throughput
        read_length = int(queue.get(counter_name))
        if read_length > 250:
            read_length = 250
        # Pull the objects from the cache
        points = []
        measures = []
@@ -61,8 +68,18 @@ def watch_insert_queue(queue, queue_name, counter_name, engine, stop_event, slee
        rect_srid = Points.rectangular_srid
        lat_srid = Points.latitudinal_srid
        
        try:
            # Pop the messages off the queue
            for i in range(0, read_length):
            msg = json.loads(queue.lpop(queue_name), object_hook=object_hook)
                pipe.lpop(queue_name)
                pipe.decr(counter_name)
            msgs = pipe.execute()
        except:
            msgs = []
            time.sleep(5)

        for msg in msgs:
            msg = json.loads(msg, object_hook=object_hook)
            if isinstance(msg, dict):
                # A NULL id is not allowable, so pop if a NULL ID exists
                if msg['id'] == None:
@@ -87,10 +104,11 @@ def watch_insert_queue(queue, queue_name, counter_name, engine, stop_event, slee
                    measures.append(point_measures)
                
                points.append(msg)
            # The message was successfully read, so atomically deincrement the counter
            queue.decr(counter_name)
            
        if points:
        if not points:
            continue
        
        try:
            # Write the cached objects into the database 
            with engine.connect() as conn:
                resp = conn.execute(
@@ -109,7 +127,16 @@ def watch_insert_queue(queue, queue_name, counter_name, engine, stop_event, slee
                measures = [measure for sublist in measures for measure in sublist]
                conn.execute(
                    insert(Measures.__table__), measures)
        time.sleep(sleep_time)
        except:
            try:
                # Pop the messages off the queue
                for i in range(0, read_length):
                    pipe.rpush(queue_name, msgs[i])
                    pipe.incr(counter_name)
                msgs = pipe.execute()
            except:
                warnings.warn('Failed to push to DB and failed to repopulate queue.)
                time.sleep(5)

def watch_update_queue(queue, queue_name, counter_name, engine, stop_event, sleep_time=5):
    """