Commit e6d0206f authored by Stefano Alberto Russo's avatar Stefano Alberto Russo
Browse files

Added preliminary support for running tasks on local compute.

parent 65f63363
Loading
Loading
Loading
Loading
+20 −4
Original line number Diff line number Diff line
import uuid

from django.db import models
from django.contrib.auth.models import User
from django.utils import timezone

from .utils import os_shell

# Setup logging
import logging
logger = logging.getLogger(__name__)


#=========================
#  Profile 
@@ -42,13 +49,22 @@ class Task(models.Model):
    type     = models.CharField('Task type', max_length=36, blank=False, null=False)
    status   = models.CharField('Task status', max_length=36, blank=True, null=True)
    created  = models.DateTimeField('Created on', default=timezone.now)
    compute  = models.CharField('Task compute', max_length=36, blank=True, null=True)

    def __str__(self):
        return str('Task "{}" of user "{}" in status "{}" (TID "{}")'.format(self.name, self.user.email, self.status, self.tid))

    tunneled    = models.BooleanField('Task tunneled', default=False)
    tunnel_port = models.IntegerField('Task tunnel port', blank=True, null=True)


    def __str__(self):
        return str('Task "{}" of user "{}" in status "{}" (TID "{}")'.format(self.name, self.user.email, self.status, self.tid))

    @property
    def ip_addr(self):
        # TODO: if self.computing (or self.type) == "local":
        out = os_shell('sudo docker inspect --format \'{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}\' ' + self.tid + ' | tail -n1', capture=True)
        if out.exit_code != 0:
            raise Exception('Error: ' + out.stderr)
        return out.stdout
        


+1 −1
Original line number Diff line number Diff line
@@ -51,7 +51,7 @@
            <td><b>Computing resource</b></td><td>
              <!-- Dropdown with versions -->
              <select name="computing" >
              <option value="builtin" selected>Rosetta built-in</option>
              <option value="builtin" selected>Local</option>
              <option value="demoslurm">Demo Slurm cluster</option>
              </select>          
            </td>
+13 −2
Original line number Diff line number Diff line
@@ -50,6 +50,17 @@
            <td>{{ task.created }}</td>
           </tr>
           
           <tr>
            <td><b>Task tunneled</b></td>
            <td>{{ task.tunneled }}</td>
           
           </tr>
           
           <tr>
            <td><b>Task tunnel port</b></td>
            <td>{{ task.tunnel_port }}</td>
           </tr>
           
           <tr>
            <td><b>Operations</b></td>
            
@@ -63,7 +74,7 @@
            {% endif%}
            <a href=?uuid={{task.uuid}}&action=delete>Delete</a>
            {% if task.status == "Running" %}
             | <a href="/task/{{task.shortuuid}}/">Connect</a>
             | <a href=?uuid={{task.uuid}}&action=connect>Connect</a>
            {% else %}
             | <font color="#c0c0c0">Connect</font>
            {% endif%}
+70 −27
Original line number Diff line number Diff line
@@ -5,6 +5,8 @@ import uuid
import inspect
import json
import socket
import os
import subprocess
                
# Django imports
from django.conf import settings
@@ -371,9 +373,6 @@ def account(request):
@private_view
def tasks(request):

    # Mock task management commands?
    mock_task_commands=True

    # Init data
    data={}
    data['user']  = request.user
@@ -387,7 +386,6 @@ def tasks(request):
    # Setting var
    standby_supported = False


    # Perform actions if required:
    if action and uuid:
        if action=='delete':
@@ -397,11 +395,9 @@ def tasks(request):
                
                # Delete the Docker container
                delete_command = 'sudo docker stop {} && sudo docker rm {}'.format(task.tid,task.tid)
                if mock_task_commands:
                    delete_command = 'echo "Hello World!"'
                out = os_shell(delete_command, capture=True)
                if out.exit_code != 0:
                    logger.error('Error when removing Docker container for task with DID="{}": "{}"'.format(task.tid, out.stderr))                      
                    logger.error('Error when removing Docker container for task "{}": "{}"'.format(task.tid, out.stderr))                      
                
                # Ok, delete
                task.delete()
@@ -427,9 +423,6 @@ def tasks(request):
                else:
                    stop_command = 'sudo docker stop {} && sudo docker rm {}'.format(task.tid,task.tid)

                if mock_task_commands:
                    stop_command = 'echo "Hello World!"'
 
                out = os_shell(stop_command, capture=True)
                if out.exit_code != 0:                        
                    raise Exception(out.stderr)
@@ -446,8 +439,67 @@ def tasks(request):
            # Unset uuid to load the list again
            uuid = None
            
        elif action=='connect':
            
            # Get the task (raises if none available including no permission)
            task = Task.objects.get(user=request.user, uuid=uuid)
            
            # Create task tunnel
            if task.tunneled:
                # If the task is already tunneled, do nothing.
                pass
            
            elif not task.tunneled and task.compute=='local':
                # 1) Get task IP
                task_ip_addr = task.ip_addr
                logger.debug('task_ip_addr="{}"'.format(task_ip_addr))

            
                # 2) Get a free port fot the tunnel:
                allocated_tunnel_ports = []
                for other_task in Task.objects.all():
                    if other_task.tunneled and other_task.tunnel_port:
                        allocated_tunnel_ports.append(other_task.tunnel_port)
                
                for port in range(7000, 7006):
                    if not port in allocated_tunnel_ports:
                        tunnel_port = port
                        break
                if not tunnel_port:
                    logger.error('Cannot find a free port for the tunnel for task "{}"'.format(task.tid))                      
                    raise ErrorMessage('Cannot find a free port for the tunnel to the task')
                
                tunnel_command= 'nohup ssh -4 -o StrictHostKeyChecking=no -nNT -L 0.0.0.0:{}:{}:8590 localhost  &> /dev/null & '.format(tunnel_port, task_ip_addr)
                
                logger.debug(tunnel_command)
                
                subprocess.Popen(['nohup', 'tunnel_command'],
                                 stdout=open('/dev/null', 'w'),
                                 stderr=open('/dev/null', 'w'),
                                 preexec_fn=os.setpgrp
                                 )

                task.tunneled=True
                task.tunnel_port = tunnel_port
                task.save()
                
                
                
                #out = os_shell(tunnel_command, capture=True)
                #if out.exit_code != 0:
                #    logger.error('Error when creating the tunnel for task "{}": "{}"'.format(task.tid, out.stderr))                      
                #    raise ErrorMessage('Error when creating the tunnel for task')
                
                
                
            else:
                raise ErrorMessage('Connecting to tasks on compute "{}" is not supported yet'.format(task.compute))


            # Ok, now redirect
            from django.shortcuts import redirect
            return redirect('http://localhost:{}'.format(task.tunnel_port))

    # Get all task(s)
    if uuid:
        try:
@@ -476,19 +528,12 @@ def tasks(request):
@private_view
def create_task(request):

    # Mock task management commands?
    mock_task_commands=True

    # Get post data
    name     = request.POST.get('name',None)
    password = request.POST.get('password',None)        

    # Init data
    data={}
    data['user']    = request.user
    data['profile'] = Profile.objects.get(user=request.user)
    data['title']   = 'New Task'
    data['name']    = name
    data['name']    = request.POST.get('name',None)
    
    if data['name']:
        
@@ -529,8 +574,8 @@ def create_task(request):
            #    except socket.error:
            #        raise Exception('Error, I could not find a valid IP address for the DNS service')

            # Init run command 
            run_command  = 'sudo docker run --cap-add=NET_ADMIN --cap-add=NET_RAW --name metabox-task-{}'.format( str_shortuuid)
            # Init run command #--cap-add=NET_ADMIN --cap-add=NET_RAW 
            run_command  = 'sudo docker run  --network=rosetta_default --name rosetta-task-{}'.format( str_shortuuid)

            # Data volume
            run_command += ' -v {}/task-{}:/data'.format(TASK_DATA_DIR, str_shortuuid)
@@ -544,9 +589,6 @@ def create_task(request):

            # Debug
            logger.debug('Running new task with command="{}"'.format(run_command))

            if mock_task_commands:
                run_command = 'echo "Hello World!"'
            out = os_shell(run_command, capture=True)
            if out.exit_code != 0:                        
                raise Exception(out.stderr)
@@ -558,8 +600,9 @@ def create_task(request):
                
                # Set fields
                task.uuid   = str_uuid
                task.did    = out.stdout
                task.tid    = out.stdout
                task.status = 'Running'
                task.compute = 'local'
                
                # Save
                task.save()