Unverified Commit 83db6dd3 authored by jlaura's avatar jlaura Committed by GitHub
Browse files

Tested Cluster Submission (#567)

* Initial tests for acn_submit post refactor

* Moves cluster submission into the library and tests

* Updates tests for missed methods

* Full coverage on submission

* Removes bin submit

* Updates for comments

* Comment updates
parent bcc22bf2
Loading
Loading
Loading
Loading

autocnet/.ropeproject/config.py

deleted100644 → 0
+0 −85
Original line number Diff line number Diff line
# The default ``config.py``


def set_prefs(prefs):
    """This function is called before opening the project"""

    # Specify which files and folders to ignore in the project.
    # Changes to ignored resources are not added to the history and
    # VCSs.  Also they are not returned in `Project.get_files()`.
    # Note that ``?`` and ``*`` match all characters but slashes.
    # '*.pyc': matches 'test.pyc' and 'pkg/test.pyc'
    # 'mod*.pyc': matches 'test/mod1.pyc' but not 'mod/1.pyc'
    # '.svn': matches 'pkg/.svn' and all of its children
    # 'build/*.o': matches 'build/lib.o' but not 'build/sub/lib.o'
    # 'build//*.o': matches 'build/lib.o' and 'build/sub/lib.o'
    prefs['ignored_resources'] = ['*.pyc', '*~', '.ropeproject',
                                  '.hg', '.svn', '_svn', '.git']

    # Specifies which files should be considered python files.  It is
    # useful when you have scripts inside your project.  Only files
    # ending with ``.py`` are considered to be python files by
    # default.
    #prefs['python_files'] = ['*.py']

    # Custom source folders:  By default rope searches the project
    # for finding source folders (folders that should be searched
    # for finding modules).  You can add paths to that list.  Note
    # that rope guesses project source folders correctly most of the
    # time; use this if you have any problems.
    # The folders should be relative to project root and use '/' for
    # separating folders regardless of the platform rope is running on.
    # 'src/my_source_folder' for instance.
    #prefs.add('source_folders', 'src')

    # You can extend python path for looking up modules
    #prefs.add('python_path', '~/python/')

    # Should rope save object information or not.
    prefs['save_objectdb'] = True
    prefs['compress_objectdb'] = False

    # If `True`, rope analyzes each module when it is being saved.
    prefs['automatic_soa'] = True
    # The depth of calls to follow in static object analysis
    prefs['soa_followed_calls'] = 0

    # If `False` when running modules or unit tests "dynamic object
    # analysis" is turned off.  This makes them much faster.
    prefs['perform_doa'] = True

    # Rope can check the validity of its object DB when running.
    prefs['validate_objectdb'] = True

    # How many undos to hold?
    prefs['max_history_items'] = 32

    # Shows whether to save history across sessions.
    prefs['save_history'] = True
    prefs['compress_history'] = False

    # Set the number spaces used for indenting.  According to
    # :PEP:`8`, it is best to use 4 spaces.  Since most of rope's
    # unit-tests use 4 spaces it is more reliable, too.
    prefs['indent_size'] = 4

    # Builtin and c-extension modules that are allowed to be imported
    # and inspected by rope.
    prefs['extension_modules'] = []

    # Add all standard c-extensions to extension_modules list.
    prefs['import_dynload_stdmods'] = True

    # If `True` modules with syntax errors are considered to be empty.
    # The default value is `False`; When `False` syntax errors raise
    # `rope.base.exceptions.ModuleSyntaxError` exception.
    prefs['ignore_syntax_errors'] = False

    # If `True`, rope ignores unresolvable imports.  Otherwise, they
    # appear in the importing namespace.
    prefs['ignore_bad_imports'] = False


def project_opened(project):
    """This function is called after opening the project"""
    # Do whatever you like here!

autocnet/.ropeproject/globalnames

deleted100644 → 0
+0 −1
Original line number Diff line number Diff line
}qUutilsq]q(UcheckbandnumbersqUenumqUfind_in_dictqUcheckdeplaidqUloggerqes.
 No newline at end of file

autocnet/.ropeproject/history

deleted100644 → 0
+0 −1
Original line number Diff line number Diff line
]q(]q]qe.
 No newline at end of file

autocnet/.ropeproject/objectdb

deleted100644 → 0
+0 −1
Original line number Diff line number Diff line
}q.
 No newline at end of file
+96 −15
Original line number Diff line number Diff line
@@ -9,7 +9,6 @@ import warnings

from redis import StrictRedis

from autocnet.io.db.redis_queue import pop_computetime_push, finalize
from autocnet.graph.network import NetworkCandidateGraph
from autocnet.graph.node import NetworkNode
from autocnet.graph.edge import NetworkEdge
@@ -18,15 +17,21 @@ from autocnet.utils.utils import import_func
from autocnet.utils.serializers import JsonEncoder, object_hook


def parse_args():
def parse_args():  # pragma: no cover
    parser = argparse.ArgumentParser()
    parser.add_argument('-r', '--host', help='The host URL for the redis queue to to pull messages from.')
    parser.add_argument('-p', '--port', help='The port for used by redis.')
    parser.add_argument('processing_queue', help='The name of the processing queue to draw messages from.')
    parser.add_argument('working_queue', help='The name of the queue to push messages to while they process.')

    return parser.parse_args()

def _instantiate_obj(msg, ncg):
    """
    Instantiate either a NetworkNode or a NetworkEdge that is the 
    target of processing.

    """
    along = msg['along']
    id = msg['id']
    image_path = msg['image_path']
@@ -40,6 +45,10 @@ def _instantiate_obj(msg, ncg):
    return obj

def _instantiate_row(msg, ncg):
    """
    Instantiate some db.io.model row object that is the target
    of processing.
    """
    # Get the dict mapping iterable keyword types to the objects
    objdict = ncg.apply_iterable_options
    rowid = msg['id']
@@ -49,7 +58,16 @@ def _instantiate_row(msg, ncg):
        session.expunge(res) # Disconnect the object from the session
    return res

def main(msg):
def process(msg):
    """
    Given a message, instantiate the necessary processing objects and 
    apply some generic function or method.

    Parameters
    ----------
    msg : dict
          The message that parametrizes the job.
    """
    ncg = NetworkCandidateGraph()
    ncg.config_from_dict(msg['config'])
    if msg['along'] in ['node', 'edge']:
@@ -88,23 +106,86 @@ def main(msg):

    return msg

if __name__ == '__main__':
    args = parse_args()
def transfer_message_to_work_queue(queue, queue_from, queue_to):
    """
    Atomic pop/push from one redis list to another

    # Get the message
    queue = StrictRedis(host=args.host, port=args.port, db=0)
    msg = json.loads(queue.rpop(args.processing_queue), object_hook=object_hook)
    Parameters
    ----------
    queue : object
            PyRedis queue
    
    queue_from : str
                 The name of the queue to pop a message from
    
    queue_to : str
               The name of the queue to push a message to

    Returns
    -------
      : str
        The message from the queue
    """
    return queue.rpoplpush(queue_from, queue_to)

def finalize_message_from_work_queue(queue, queue_name, remove_key):
    """
    Remove a message from a queue

    Parameters
    ----------
    queue : object
            PyRedis queue

    queue_name : str
                 The name of the queue to remove an object from

    remove_key : obj
                 The message to remove from the list
    """
    # The operation completed. Remove this message from the working queue.  
    queue.lrem(queue_name, 0, remove_key)

def manage_messages(args, queue):
    """
    This function manages pulling a message from a redis list, atomically pushing 
    the message to another redis list, launching a generic processing job, 
    and finalizing the message by removing it from the intermediary redis list.

    This function is an easily testable main for the cluster_submit CLI.

    Parameters
    ----------
    args : dict
           A dictionary with queue names that are parsed from the CLI

    queue : obj
            A py-Redis queue object

    """
    # Pop the message from the left queue and push to the right queue; atomic operation
    msg = transfer_message_to_work_queue(queue, 
                                         args['processing_queue'],
                                         args['working_queue'])

    if msg is None:
        warnings.warn('Expected to process a cluster job, but the message queue is empty.')
        sys.exit()
        return

    # The key to remove from the working queue is the message. Essentially, find this element
    # in the list where the element is the JSON representation of the message. Maybe swap to a hash?
    remove_key = msg

    # Apply the algorithm
    response = main(msg)
    response = process(msg)
    # Should go to a logger someday!
    print(response)

    # Alert the caller on failure to relaunch with next parameter set
    #finalize(response, remove_key, queue,
    #         config['redis']['completed_queue'],
    #         config['redis']['working_queue'])
    finalize_message_from_work_queue(queue, args['working_queue'], remove_key)

def main():  # pragma: no cover
    args = vars(parse_args())
    
    # Get the message
    queue = StrictRedis(host=args['host'], port=args['port'], db=0)
    manage_messages(args, queue)
Loading