Commit feb8ef2a authored by Stefano Alberto Russo's avatar Stefano Alberto Russo
Browse files

Improved support for running tasks on local compute.

parent e6d0206f
Loading
Loading
Loading
Loading
+52 −11
Original line number Diff line number Diff line
import uuid
import enum

from django.db import models
from django.contrib.auth.models import User
@@ -11,6 +12,14 @@ import logging
logger = logging.getLogger(__name__)


# Task statuses
class TaskStatuses(object):
    created = 'created'
    running = 'running'
    stopped = 'stopped'
    exited = 'exited'


#=========================
#  Profile 
#=========================
@@ -46,14 +55,22 @@ class Task(models.Model):
    tid       = models.CharField('Task ID', max_length=64, blank=False, null=False)
    uuid      = models.CharField('Task UUID', max_length=36, blank=False, null=False)
    name      = models.CharField('Task name', max_length=36, blank=False, null=False)
    type     = models.CharField('Task type', max_length=36, blank=False, null=False)
    container = models.CharField('Task container', max_length=36, blank=False, null=False)
    status    = models.CharField('Task status', max_length=36, blank=True, null=True)
    created   = models.DateTimeField('Created on', default=timezone.now)
    compute   = models.CharField('Task compute', max_length=36, blank=True, null=True)

    tunneled    = models.BooleanField('Task tunneled', default=False)
    tunnel_port = models.IntegerField('Task tunnel port', blank=True, null=True)

    def save(self, *args, **kwargs):
        
        try:
            getattr(TaskStatuses, str(self.status))
        except AttributeError:
            raise Exception('Invalid status "{}"'.format(self.status))

        # Call parent save
        super(Task, self).save(*args, **kwargs)


    def __str__(self):
        return str('Task "{}" of user "{}" in status "{}" (TID "{}")'.format(self.name, self.user.email, self.status, self.tid))
@@ -66,7 +83,31 @@ class Task(models.Model):
            raise Exception('Error: ' + out.stderr)
        return out.stdout
    
    def update_status(self):
        if self.compute == 'local':
            
            check_command = 'sudo docker inspect --format \'{{.State.Status}}\' ' + self.tid # or, .State.Running
            out = os_shell(check_command, capture=True)
            logger.debug('Status: "{}"'.format(out.stdout))
            if out.exit_code != 0: 
                if (('No such' in out.stderr) and (self.tid in out.stderr)):
                    logger.debug('Task "{}" is not running in reality'.format(self.tid))
                self.status = TaskStatuses.exited
            else:
                if out.stdout == 'running':
                    self.status = TaskStatuses.running
                    
                elif out.stdout == 'exited':
                    self.status = TaskStatuses.exited
                    
                else:
                    raise Exception('Unknown task status: "{}"'.format(out.stdout))
                
            self.save()                   

    @property
    def short_uuid(self):
        return self.uuid.split('-')[0]



+11 −5
Original line number Diff line number Diff line
@@ -29,17 +29,24 @@
            </td>
           </tr>

           <!--
           <tr>
            <td><b>Task user</b></td>
            <td>
             <input type="text" name="name" value="" placeholder="metauser" size="23" disabled />
            </td>
           </tr>
           </tr>-->
           
           <!--<tr>
            <td><b>Task password</b></td>
            <td>
             <input type="password" name="password" value="" placeholder="" size="23" disabled />
            </td>
           </tr> -->

           <tr>
            <td><b>Task Type</b></td><td>
              <!-- Dropdown with versions -->
              <select name="type" >
            <td><b>Task container</b></td><td>
              <select name="container" >
              <option value="metadesktop" selected>Meta Desktop</option>
              <option value="astroccok">Astrocook</option>
              <option value="gadgetviewer">Gadget Viewer</option>
@@ -49,7 +56,6 @@
           
           <tr>
            <td><b>Computing resource</b></td><td>
              <!-- Dropdown with versions -->
              <select name="computing" >
              <option value="builtin" selected>Local</option>
              <option value="demoslurm">Demo Slurm cluster</option>
+7 −18
Original line number Diff line number Diff line
@@ -21,23 +21,18 @@
          <table class="dashboard">

           <tr>
            <td><b>Task name</b></td>
            <td>{{ task.name }}</td>
           </tr>

           <tr>
            <td><b>Task type</b></td>
            <td>{{ task.type }}</td>
            <td><b>Task id</b></td>
            <td>{{ task.short_uuid }}</td>
           </tr>

           <tr>
            <td><b>Task URL</b></td>
            <td>https://metabox.online/task/{{ task.shortuuid }}</td>
            <td><b>Task name</b></td>
            <td>{{ task.name }}</td>
           </tr>

           <tr>
            <td><b>Task user</b></td>
            <td>metauser</td>
            <td><b>Task container</b></td>
            <td>{{ task.container }}</td>
           </tr>

           <tr>
@@ -50,12 +45,6 @@
            <td>{{ task.created }}</td>
           </tr>
           
           <tr>
            <td><b>Task tunneled</b></td>
            <td>{{ task.tunneled }}</td>
           
           </tr>
           
           <tr>
            <td><b>Task tunnel port</b></td>
            <td>{{ task.tunnel_port }}</td>
@@ -73,7 +62,7 @@
            
            {% endif%}
            <a href=?uuid={{task.uuid}}&action=delete>Delete</a>
            {% if task.status == "Running" %}
            {% if task.status == "running" %}
             | <a href=?uuid={{task.uuid}}&action=connect>Connect</a>
            {% else %}
             | <font color="#c0c0c0">Connect</font>
+61 −85
Original line number Diff line number Diff line
@@ -20,7 +20,7 @@ from django.contrib.auth.models import User
from django.contrib.auth import update_session_auth_hash

# Project imports
from .models import Profile, LoginToken, Task
from .models import Profile, LoginToken, Task, TaskStatuses
from .utils import send_email, format_exception, random_username, log_user_activity, timezonize, os_shell

# Setup logging
@@ -410,7 +410,7 @@ def tasks(request):
                logger.error('Error in deleting task with uuid="{}": "{}"'.format(uuid, e))
                return render(request, 'error.html', {'data': data})  
        
        elif action=='stop': # or delete,a nd if delete also remove object
        elif action=='stop': # or delete,a and if delete also remove object
            try:
                # Get the task (raises if none available including no permission)
                task = Task.objects.get(user=request.user, uuid=uuid)
@@ -445,20 +445,15 @@ def tasks(request):
            task = Task.objects.get(user=request.user, uuid=uuid)
            
            # Create task tunnel
            if task.tunneled:
                # If the task is already tunneled, do nothing.
                pass
            if task.compute=='local':
                
            elif not task.tunneled and task.compute=='local':
                # 1) Get task IP
                task_ip_addr = task.ip_addr
                logger.debug('task_ip_addr="{}"'.format(task_ip_addr))
                # If there is no tunnel port allocated yet, find one                
                if not task.tunnel_port:

            
                # 2) Get a free port fot the tunnel:
                    # Get a free port fot the tunnel:
                    allocated_tunnel_ports = []
                    for other_task in Task.objects.all():
                    if other_task.tunneled and other_task.tunnel_port:
                        if other_task.tunnel_port and not other_task.status in [TaskStatuses.exited, TaskStatuses.stopped]:
                            allocated_tunnel_ports.append(other_task.tunnel_port)
                    
                    for port in range(7000, 7006):
@@ -469,34 +464,35 @@ def tasks(request):
                        logger.error('Cannot find a free port for the tunnel for task "{}"'.format(task.tid))                      
                        raise ErrorMessage('Cannot find a free port for the tunnel to the task')

                tunnel_command= 'nohup ssh -4 -o StrictHostKeyChecking=no -nNT -L 0.0.0.0:{}:{}:8590 localhost  &> /dev/null & '.format(tunnel_port, task_ip_addr)
                
                logger.debug(tunnel_command)
                
                subprocess.Popen(['nohup', 'tunnel_command'],
                                 stdout=open('/dev/null', 'w'),
                                 stderr=open('/dev/null', 'w'),
                                 preexec_fn=os.setpgrp
                                 )

                task.tunneled=True
                    task.tunnel_port = tunnel_port
                    task.save()


                # Check if the tunnel is active and if not create it
                logger.debug('Checking if task "{}" has a running tunnel'.format(task.tid))
                
                out = os_shell('ps -ef | grep ":{}:{}:8590" | grep -v grep'.format(task.tunnel_port, task.ip_addr), capture=True)

                #out = os_shell(tunnel_command, capture=True)
                #if out.exit_code != 0:
                #    logger.error('Error when creating the tunnel for task "{}": "{}"'.format(task.tid, out.stderr))                      
                #    raise ErrorMessage('Error when creating the tunnel for task')
                if out.exit_code == 0:
                    logger.debug('Task "{}" has a running tunnel, using it'.format(task.tid))
                else:
                    logger.debug('Task "{}" has no running tunnel, creating it'.format(task.tid))
                    
                    # Tunnel command
                    tunnel_command= 'ssh -4 -o StrictHostKeyChecking=no -nNT -L 0.0.0.0:{}:{}:8590 localhost & '.format(task.tunnel_port, task.ip_addr)
                    background_tunnel_command = 'nohup {} >/dev/null 2>&1 &'.format(tunnel_command)
                    
                    # Log
                    logger.debug('Opening tunnel with command: {}'.format(background_tunnel_command))

                    # Execute
                    subprocess.Popen(background_tunnel_command, shell=True)
                   
            else:
                raise ErrorMessage('Connecting to tasks on compute "{}" is not supported yet'.format(task.compute))


            # Ok, now redirect
            # Ok, now redirect to the task through the tunnel
            from django.shortcuts import redirect
            return redirect('http://localhost:{}'.format(task.tunnel_port))

@@ -516,6 +512,10 @@ def tasks(request):
            logger.error('Error in getting Virtual Devices: "{}"'.format(e))
            return render(request, 'error.html', {'data': data})   

    # Update task statuses
    for task in tasks:
        task.update_status()

    data['tasks'] = tasks

    return render(request, 'tasks.html', {'data': data})
@@ -538,13 +538,13 @@ def create_task(request):
    if data['name']:
        
        # Type
        data['type'] = request.POST.get('type', None)
        if not data['type']:
            data['error'] = 'No type given'
        data['container'] = request.POST.get('container', None)
        if not data['container']:
            data['error'] = 'No container given'
            return render(request, 'error.html', {'data': data})

        if not data['type'] in SUPPORTED_TASK_TYPES:
            data['error'] = 'No valid task type'
        if not data['container'] in SUPPORTED_TASK_TYPES:
            data['error'] = 'No valid task container'
            return render(request, 'error.html', {'data': data})
        

@@ -559,35 +559,20 @@ def create_task(request):
            #netifaces.ifaddresses('eth0')
            #backend_ip = netifaces.ifaddresses('eth0')[netifaces.AF_INET][0]['addr']       


            # Get the IP address of the DNS service               
            #inspect_json = json.loads(os_shell('sudo docker inspect metaboxonline-dns-one', capture=True).stdout)
            #DNS_SERVICE_IP = inspect_json[0]['NetworkSettings']['IPAddress']    
        
            # The following does not work on WIndows
            # Do not use .format as there are too many graph brackets    
            #DNS_SERVICE_IP = os_shell('docker inspect --format \'{{ .NetworkSettings.IPAddress }}\' ' + PROJECT_NAME + '-' + service + '-' +instance, capture=True).stdout
        
            #if DNS_SERVICE_IP:
            #    try:
            #        socket.inet_aton(DNS_SERVICE_IP)
            #    except socket.error:
            #        raise Exception('Error, I could not find a valid IP address for the DNS service')

            # Init run command #--cap-add=NET_ADMIN --cap-add=NET_RAW 
            run_command  = 'sudo docker run  --network=rosetta_default --name rosetta-task-{}'.format( str_shortuuid)

            # Data volume
            run_command += ' -v {}/task-{}:/data'.format(TASK_DATA_DIR, str_shortuuid)

            # Ports TODO: remove or generate randomly
            #run_command += ' -p 8590:8590 -p 5900:5900 -p 50381:22'
            
            # Host name, image entry command
            task_type = 'task-{}'.format(data['type'])
            run_command += ' -h task-{} -d -t localhost:5000/rosetta/metadesktop'.format(str_shortuuid, task_type)
            task_container = 'task-{}'.format(data['container'])
            run_command += ' -h task-{} -d -t localhost:5000/rosetta/metadesktop'.format(str_shortuuid, task_container)

            # Create the model
            task = Task.objects.create(user=request.user, name=data['name'], status=TaskStatuses.created, container=data['container'])
                
            # Debug
            # Run the task Debug
            logger.debug('Running new task with command="{}"'.format(run_command))
            out = os_shell(run_command, capture=True)
            if out.exit_code != 0:                        
@@ -595,24 +580,15 @@ def create_task(request):
            else:
                logger.debug('Created task with id: "{}"'.format(out.stdout))
                
                # Create the model
                task = Task.objects.create(user=request.user, name=data['name'], status='Created', type=data['type'])
                
                # Set fields
                task.uuid   = str_uuid
                task.tid    = out.stdout
                task.status = 'Running'
                task.compute = 'local'
                task.status = TaskStatuses.running
                
                # Save
                task.save()

            # Create passwd file on Proxy
            #out = os_shell('ssh -o StrictHostKeyChecking=no proxy "cd /shared/reyns/etc_apache2_sites_enabled/ && htpasswd -bc {}.htpasswd metauser {}"'.format(str_shortuuid, password), capture=True)
            #if out.exit_code != 0:
            #    logger.error(out.stderr) 


        except Exception as e:
            data['error'] = 'Error in creating new Task.'
            logger.error(e)