Commit 0a0a09c0 authored by Jay's avatar Jay
Browse files

Centralizes retry logic into db.io.connection.py and adds retry logic to the NCG setup.

parent 0279ec96
Loading
Loading
Loading
Loading
+3 −33
Original line number Diff line number Diff line
import json
import socket
import sys
from time import sleep

from sqlalchemy import create_engine
from sqlalchemy.pool import NullPool
@@ -11,6 +10,7 @@ from autocnet.graph.edge import NetworkEdge
from autocnet.utils.utils import import_func
from autocnet.utils.serializers import object_hook
from autocnet.io.db.model import Measures, Points, Overlay, Images
from autocnet.io.db.connection import retry, new_connection

apply_iterable_options = {
                'measures' : Measures,
@@ -31,21 +31,7 @@ apply_iterable_options = {
                5: Images
            }

def retry(max_retries=3, wait_time=300):
    def decorator(func):
        def wrapper(*args, **kwargs):
            retries = 0
            if retries < max_retries:
                try:
                    result = func(*args, **kwargs)
                    return result
                except:
                    retries += 1
                    sleep(wait_time)
            else:
                raise Exception(f"Maximum retires of {func} exceeded")
        return wrapper
    return decorator


@retry(max_retries=5)
def _instantiate_obj(msg):
@@ -77,22 +63,6 @@ def _instantiate_row(msg, session):
    session.expunge_all() # Disconnect the object from the session
    return res

@retry()
def get_db_connection(dbconfig):
    db_uri = 'postgresql://{}:{}@{}:{}/{}'.format(dbconfig['username'],
                                                  dbconfig['password'],
                                                  dbconfig['host'],
                                                  dbconfig['pgbouncer_port'],
                                                  dbconfig['name'])
    hostname = socket.gethostname()

    engine = create_engine(db_uri,
        poolclass=NullPool,
        connect_args={"application_name":f"AutoCNet_{hostname}"},
        isolation_level="AUTOCOMMIT",
        pool_pre_ping=True)
    return engine

@retry()
def execute_func(func, *args, **kwargs):
    return func(*args, **kwargs)
@@ -112,7 +82,7 @@ def process(msg):
    # Deserialize the message
    msg = json.loads(msg, object_hook=object_hook)

    engine = get_db_connection(msg['config']['database'])
    _, engine = new_connection(msg['config']['database'])
    
    if msg['along'] in ['node', 'edge']:
        obj = _instantiate_obj(msg)
+0 −15
Original line number Diff line number Diff line
@@ -1620,23 +1620,8 @@ class NetworkCandidateGraph(CandidateGraph):
        self._Session = Session

    def _setup_database(self):
        # A non-linear timeout if the DB is spinning up or loaded with many connections.
        sleeptime = 2
        retries = 0
        self.Session, self.engine = new_connection(self.config['database'])
        try_db_creation(self.engine, self.config)
        return
        while retries < 5:
            log.debug(f'Database connection attempt {retries}')
            try:
                self.Session, self.engine = new_connection(self.config['database'])

                # Attempt to create the database (if it does not exist)
                try_db_creation(self.engine, self.config)
                break
            except:
                retries += 1
                sleep(retries ** sleeptime)

    # def _setup_nodes(self):
    #     with self.session_scope() as session:
+22 −11
Original line number Diff line number Diff line
import socket

import sqlalchemy
from sqlalchemy import create_engine, pool, orm
from sqlalchemy.orm import create_session, scoped_session, sessionmaker

import logging
import os
import socket
import warnings
import yaml
from time import sleep

from sqlalchemy import orm, create_engine, pool

# set up the logging file
log = logging.getLogger(__name__)
@@ -19,6 +13,23 @@ class Parent:
        self.session = Session()
        self.session.begin()

def retry(max_retries=3, wait_time=300):
    def decorator(func):
        def wrapper(*args, **kwargs):
            retries = 0
            if retries < max_retries:
                try:
                    result = func(*args, **kwargs)
                    return result
                except:
                    retries += 1
                    sleep(wait_time)
            else:
                raise Exception(f"Maximum retries of {func} exceeded! Is the database accessible?")
        return wrapper
    return decorator

@retry(max_retries=5)
def new_connection(dbconfig):
    """
    Using the user supplied config create a NullPool database connection.
@@ -43,8 +54,8 @@ def new_connection(dbconfig):
                                                  dbconfig['pgbouncer_port'],
                                                  dbconfig['name'])
    hostname = socket.gethostname()
    engine = sqlalchemy.create_engine(db_uri,
                poolclass=sqlalchemy.pool.NullPool,
    engine = create_engine(db_uri,
                poolclass=pool.NullPool,
                connect_args={"application_name":f"AutoCNet_{hostname}"},
                isolation_level="AUTOCOMMIT",
                pool_pre_ping=True)
+2 −2
Original line number Diff line number Diff line
@@ -232,8 +232,8 @@ def place_points_in_overlap(overlap,
        for id in overlap.intersections:
            try:
                res = session.query(Images).filter(Images.id == id).one()
            except:
                warnings.warn(f'Unable to instantiate image with id: {id}')
            except Exception as e:
                warnings.warn(f'Unable to instantiate image with id: {id} with error: {e}')
                continue
            nn = NetworkNode(node_id=id, 
                             image_path=res.path,