Commit c8aa3694 authored by Stefano Alberto Russo's avatar Stefano Alberto Russo
Browse files

Added first agent stub.

parent b6f23b5e
Loading
Loading
Loading
Loading
+120 −1
Original line number Diff line number Diff line
@@ -7,7 +7,7 @@ from rest_framework.response import Response
from rest_framework import status, serializers, viewsets
from rest_framework.views import APIView
from .utils import format_exception
from .models import Profile
from .models import Profile, Task, TaskStatuses
 
# Setup logging
logger = logging.getLogger(__name__)
@@ -205,6 +205,125 @@ class UserViewSet(viewsets.ModelViewSet):
    serializer_class = UserSerializer


class agent_api(PublicGETAPI):
    
    def _get(self, request):
        try:
            
            task_uuid = request.GET.get('task_uuid', None)
            if not task_uuid:
                return HttpResponse('MISSING task_uuid')
    
            from django.core.exceptions import ValidationError
    
            try:
                task = Task.objects.get(uuid=task_uuid)
            except (Task.DoesNotExist, ValidationError):
                return HttpResponse('Unknown task uuid "{}"'.format(task_uuid))
                
            host_conn_string = 'http://172.21.0.1:8080'
            
            action = request.GET.get('action', None)
            
            if not action:
                # Return the agent code
                agent_code='''
import logging
import socket
try:
    from urllib.request import urlopen
except ImportError:
    from urllib import urlopen

# Setup logging
logger = logging.getLogger('Agent')
logging.basicConfig(level=logging.INFO)

hostname = socket.gethostname()

# Task id set by the API
task_uuid = "'''+ task_uuid  +'''"

# Log
logger.info('Reporting for task uuid: "{}"'.format(task_uuid))

# Get IP
ip = socket.gethostbyname(hostname)
logger.info(' - ip: "{}"'.format(ip))

# Get port
from random import randint
while True:

    # Get a random ephimeral port
    port = randint(49152, 65535)

    # Check port is available
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    result = sock.connect_ex(('127.0.0.1', port))
    if result == 0:
        print('Found not available ephimeral port ({}) , choosing another one...'.format(port))
        import time
        time.sleep(1)
    else:
        break
logger.info(' - port: "{}"'.format(port))

response = urlopen("'''+host_conn_string+'''/api/v1/base/agent/?task_uuid={}&action=set_ip_port&ip={}&port={}".format(task_uuid, ip, port))
response_content = response.read() 
if response_content != 'OK':
    logger.error(response_content)
    logger.info('Not everything OK, exiting with status code =1')
    sys.exit(1)
else:
    logger.info('Everything OK')
print(port)
'''
        
                return HttpResponse(agent_code)
    
    
            elif action=='set_ip_port':
                
                task_ip   = request.GET.get('ip', None)
                if not task_ip:
                    return HttpResponse('IP not valid (got "{}")'.format(task_ip))
                
                task_port = request.GET.get('port', None)
                if not task_port:
                    return HttpResponse('Port not valid (got "{}")'.format(task_port))
                
                try:
                    int(task_port)
                except (TypeError, ValueError):
                    return HttpResponse('Port not valid (got "{}")'.format(task_port))
                  
                # Set fields
                task.status = TaskStatuses.running
                task.ip     = task_ip
                #task.pid    = task_pid
                task.port   = int(task_port)
                task.save()
                return HttpResponse('OK')
                
    
            else:
                return HttpResponse('Unknown action "{}"'.format(action))
    

        except Exception as e:
            logger.error(e)














+1 −0
Original line number Diff line number Diff line
@@ -25,6 +25,7 @@ logger = logging.getLogger(__name__)
# Task statuses
class TaskStatuses(object):
    created = 'created'
    sumbitted = 'sumbitted'
    running = 'running'
    stopped = 'stopped'
    exited = 'exited'
+68 −4
Original line number Diff line number Diff line
@@ -64,16 +64,80 @@ def start_task(task):
            # Save
            task.save()




    elif task.computing.type == 'remote':
        logger.debug('Starting a remote task "{}"'.format(task.computing))

        # Get computing host
        host = task.computing.get_conf_param('host')

        # Get id_rsa
        #id_rsa_file = task.computing.get_conf_param('id_rsa')
        #if not id_rsa_file: 
        #    raise Exception('This computing requires an id_rsa file but cannot find any')
        # Get user keys
        if task.computing.require_user_keys:
            user_keys = Keys.objects.get(user=task.user, default=True)
        else:
            raise NotImplementedError('Remote tasks not requiring keys are not yet supported')

        # 1) Run the container on the host (non blocking)
 
        if task.container.type == 'singularity':

            

            # Set pass if any
            if task.auth_pass:
                authstring = ' export SINGULARITYENV_AUTH_PASS={} && '.format(task.auth_pass)
            else:
                authstring = ''

            import socket
            hostname = socket.gethostname()
            my_ip = socket.gethostbyname(hostname)

            run_command  = 'ssh -i {} -4 -o StrictHostKeyChecking=no {} '.format(user_keys.private_key_file, host)
            run_command+= '"wget {}:8080/api/v1/base/agent/?task_uuid={} -O /tmp/agent_{}.py && TASK_PORT=$(python /tmp/agent_{}.py) && '.format(my_ip, task.uuid, task.uuid, task.uuid)
            run_command += 'export SINGULARITY_NOHTTPS=true && export SINGULARITYENV_TASK_PORT=$TASK_PORT && {} '.format(authstring)
            run_command += 'exec nohup singularity run --pid --writable-tmpfs --containall --cleanenv '
            
            # Set registry
            if task.container.registry == 'docker_local':
                registry = 'docker://dregistry:5000/'
            elif task.container.registry == 'docker_hub':
                registry = 'docker://'
            else:
                raise NotImplementedError('Registry {} not supported'.format(task.container.registry))
    
            run_command+='{}{} &> /tmp/{}.log & echo \$!"'.format(registry, task.container.image, task.uuid)
            logger.critical(run_command)
            
        else:
            raise NotImplementedError('Container {} not supported'.format(task.container.type))

        out = os_shell(run_command, capture=True)
        if out.exit_code != 0:
            raise Exception(out.stderr)
        
        logger.critical(out.stdout)
        logger.critical(out.stderr)

 
        # Save pid echoed by the command above
        task_pid = out.stdout

        # Set fields
        task.tid    = task.uuid
        #task.status = TaskStatuses.sumbitted
        task.pid    = task_pid
 
        # Save
        task.save()

    elif task.computing.type == 'remoteOLD':
        logger.debug('Starting a remote task "{}"'.format(task.computing))

        # Get computing host
        host = task.computing.get_conf_param('host')

        # Get user keys
        if task.computing.require_user_keys:
+4 −1
Original line number Diff line number Diff line
@@ -56,6 +56,9 @@ urlpatterns = [
    path('api/v1/base/login/', base_app_api.login_api.as_view(), name='login_api'),
    path('api/v1/base/logout/', base_app_api.logout_api.as_view(), name='logout_api'),

    # Custom APIs
    path('api/v1/base/agent/', base_app_api.agent_api.as_view(), name='agent_api'),
 
]

# This message here is quite useful when developing in autoreload mode