Commit 849ab494 authored by Stefano Alberto Russo's avatar Stefano Alberto Russo
Browse files

Added support for running tasks on remote compute elements over ssh. Fixes in...

Added support for running tasks on remote compute elements over ssh. Fixes in the stop/delete task clycle management.
parent feb8ef2a
Loading
Loading
Loading
Loading
+4 −8
Original line number Diff line number Diff line
@@ -59,6 +59,9 @@ class Task(models.Model):
    status    = models.CharField('Task status', max_length=36, blank=True, null=True)
    created   = models.DateTimeField('Created on', default=timezone.now)
    compute   = models.CharField('Task compute', max_length=36, blank=True, null=True)
    pid       = models.IntegerField('Task pid', blank=True, null=True)
    port      = models.IntegerField('Task port', blank=True, null=True)
    ip        = models.CharField('Task ip address', max_length=36, blank=True, null=True)
    tunnel_port = models.IntegerField('Task tunnel port', blank=True, null=True)

    def save(self, *args, **kwargs):
@@ -75,13 +78,6 @@ class Task(models.Model):
    def __str__(self):
        return str('Task "{}" of user "{}" in status "{}" (TID "{}")'.format(self.name, self.user.email, self.status, self.tid))

    @property
    def ip_addr(self):
        # TODO: if self.computing (or self.type) == "local":
        out = os_shell('sudo docker inspect --format \'{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}\' ' + self.tid + ' | tail -n1', capture=True)
        if out.exit_code != 0:
            raise Exception('Error: ' + out.stderr)
        return out.stdout

    def update_status(self):
        if self.compute == 'local':
+4 −3
Original line number Diff line number Diff line
@@ -56,8 +56,9 @@
           
           <tr>
            <td><b>Computing resource</b></td><td>
              <select name="computing" >
              <option value="builtin" selected>Local</option>
              <select name="compute" >
              <option value="local" selected>Local</option>
              <option value="demoremote">Demo remote</option>
              <option value="demoslurm">Demo Slurm cluster</option>
              </select>          
            </td>
@@ -65,7 +66,7 @@
                      
           <tr><td colspan=2>
           <table><tr><td  style="border: 1px solid lightgray;" >
           I understand that files saved or modified and not explicity saved to a persistent share, including system packages, will be LOST when the task ends.
           I understand that files saved or modified and not explicitly saved to a persistent share, including system packages, will be LOST when the task ends.
           </td><td  style="border: 1px solid lightgray;" >
           <input class="form-check-input" type="checkbox" value="" id="invalidCheck" required>
           </td></table>
+21 −2
Original line number Diff line number Diff line
@@ -45,6 +45,21 @@
            <td>{{ task.created }}</td>
           </tr>

           <tr>
            <td><b>Task pid</b></td>
            <td>{{ task.pid}}</td>
           </tr>

           <tr>
            <td><b>Task ip</b></td>
            <td>{{ task.ip}}</td>
           </tr>

           <tr>
            <td><b>Task port</b></td>
            <td>{{ task.port }}</td>
           </tr>
           
           <tr>
            <td><b>Task tunnel port</b></td>
            <td>{{ task.tunnel_port }}</td>
@@ -54,14 +69,18 @@
            <td><b>Operations</b></td>
            
            <td>
            {% if task.status == "Running" %}
            {% if task.status == "running" %}
            <a href=?uuid={{task.uuid}}&action=stop>Stop</a> | 
            {% else %}
            <!-- <a href=?uuid={{task.uuid}}&action=start>Start</a> |  -->
            <font color="#c0c0c0">Stop</font> | 
            
            {% endif %}
            {% if task.status == "exited" or task.status == "stopped" %}
            <a href=?uuid={{task.uuid}}&action=delete>Delete</a>
            {% else %}
            <font color="#c0c0c0">Delete</font>
            {% endif %}
            {% if task.status == "running" %}
             | <a href=?uuid={{task.uuid}}&action=connect>Connect</a>
            {% else %}
+145 −52
Original line number Diff line number Diff line
@@ -388,18 +388,19 @@ def tasks(request):

    # Perform actions if required:
    if action and uuid:

        # Get the task (raises if none available including no permission)
        task = Task.objects.get(user=request.user, uuid=uuid)

        if action=='delete':
            if task.status not in [TaskStatuses.stopped, TaskStatuses.exited]:
                data['error'] = 'Can delete only tasks in the stopped state'
                return render(request, 'error.html', {'data': data})  
            try:
                # Get the task (raises if none available including no permission)
                task = Task.objects.get(user=request.user, uuid=uuid)
                
                # Delete the Docker container
                delete_command = 'sudo docker stop {} && sudo docker rm {}'.format(task.tid,task.tid)
                out = os_shell(delete_command, capture=True)
                if out.exit_code != 0:
                    logger.error('Error when removing Docker container for task "{}": "{}"'.format(task.tid, out.stderr))                      
                
                # Ok, delete
                # Delete
                task.delete()

                # Unset uuid to load the list again
@@ -412,9 +413,8 @@ def tasks(request):
        
        elif action=='stop': # or delete,a and if delete also remove object
            try:
                # Get the task (raises if none available including no permission)
                task = Task.objects.get(user=request.user, uuid=uuid)

                if task.compute == 'local':
                    str_shortuuid = task.uuid.split('-')[0]
    
                    # Delete the Docker container
@@ -427,10 +427,43 @@ def tasks(request):
                    if out.exit_code != 0:                        
                        raise Exception(out.stderr)
                    
                # Ok, delete
                task.status = 'Stopped' 
                elif task.compute == 'demoremote':
                    
                    # Stop the task remotely
                    stop_command = 'ssh -4 -o StrictHostKeyChecking=no slurmclusterworker-one  "kill -9 {}"'.format(task.pid)
                    logger.debug(stop_command)
                    out = os_shell(stop_command, capture=True)
                    if out.exit_code != 0:                        
                        raise Exception(out.stderr)
   
                else:
                    data['error']= 'Don\'t know how to stop tasks on "{}" compute resource.'.format(task.compute)
                    return render(request, 'error.html', {'data': data})   
           
                # Ok, save status as deleted
                task.status = 'stopped' 
                task.save()

                # Check if the tunnel is active and if so kill it
                logger.debug('Checking if task "{}" has a running tunnel'.format(task.tid))
                check_command = 'ps -ef | grep ":'+str(task.tunnel_port)+':'+str(task.ip)+':'+str(task.port)+'" | grep -v grep | awk \'{print $2}\''
                logger.debug(check_command)
                out = os_shell(check_command, capture=True)
                logger.debug(out)
                if out.exit_code == 0:
                    logger.debug('Task "{}" has a running tunnel, killing it'.format(task.tid))
                    tunnel_pid = out.stdout
                    # Kill Tunnel command
                    kill_tunnel_command= 'kill -9 {}'.format(tunnel_pid)
                    
                    # Log
                    logger.debug('Killing tunnel with command: {}'.format(kill_tunnel_command))
    
                    # Execute
                    os_shell(kill_tunnel_command, capture=True)
                    if out.exit_code != 0:                    
                        raise Exception(out.stderr)

            except Exception as e:
                data['error'] = 'Error in stopping the task'
                logger.error('Error in stopping task with uuid="{}": "{}"'.format(uuid, e))
@@ -445,7 +478,7 @@ def tasks(request):
            task = Task.objects.get(user=request.user, uuid=uuid)
            
            # Create task tunnel
            if task.compute=='local':
            if task.compute in ['local', 'demoremote']:
                
                # If there is no tunnel port allocated yet, find one                
                if not task.tunnel_port:
@@ -471,7 +504,7 @@ def tasks(request):
                # Check if the tunnel is active and if not create it
                logger.debug('Checking if task "{}" has a running tunnel'.format(task.tid))
                
                out = os_shell('ps -ef | grep ":{}:{}:8590" | grep -v grep'.format(task.tunnel_port, task.ip_addr), capture=True)
                out = os_shell('ps -ef | grep ":{}:{}:{}" | grep -v grep'.format(task.tunnel_port, task.ip, task.port), capture=True)

                if out.exit_code == 0:
                    logger.debug('Task "{}" has a running tunnel, using it'.format(task.tid))
@@ -479,7 +512,7 @@ def tasks(request):
                    logger.debug('Task "{}" has no running tunnel, creating it'.format(task.tid))
                    
                    # Tunnel command
                    tunnel_command= 'ssh -4 -o StrictHostKeyChecking=no -nNT -L 0.0.0.0:{}:{}:8590 localhost & '.format(task.tunnel_port, task.ip_addr)
                    tunnel_command= 'ssh -4 -o StrictHostKeyChecking=no -nNT -L 0.0.0.0:{}:{}:{} localhost & '.format(task.tunnel_port, task.ip, task.port)
                    background_tunnel_command = 'nohup {} >/dev/null 2>&1 &'.format(tunnel_command)
                    
                    # Log
@@ -547,13 +580,30 @@ def create_task(request):
            data['error'] = 'No valid task container'
            return render(request, 'error.html', {'data': data})
        
        compute = request.POST.get('compute', None)

        try:
        logger.debug(compute)

        if compute not in ['local', 'demoremote']:
            data['error'] = 'Unknown compute resource "{}'.format(compute)
            return render(request, 'error.html', {'data': data})
        
            #Generate uuid
        # Generate the task uuid
        str_uuid = str(uuid.uuid4())
        str_shortuuid = str_uuid.split('-')[0]
        
        # Create the task object
        task = Task.objects.create(uuid      = str_uuid,
                                   user      = request.user,
                                   name      = data['name'],
                                   status    = TaskStatuses.created,
                                   container = data['container'],
                                   compute   = compute)
        
        # Actually start tasks
        try:
            if compute == 'local':            

                # Get our ip address             
                #import netifaces
                #netifaces.ifaddresses('eth0')
@@ -578,17 +628,60 @@ def create_task(request):
                if out.exit_code != 0:                        
                    raise Exception(out.stderr)
                else:
                logger.debug('Created task with id: "{}"'.format(out.stdout))
                    task_tid = out.stdout
                    logger.debug('Created task with id: "{}"'.format(task_tid))
                    

                    # Get task IP address
                    out = os_shell('sudo docker inspect --format \'{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}\' ' + task_tid + ' | tail -n1', capture=True)
                    if out.exit_code != 0:
                        raise Exception('Error: ' + out.stderr)
                    task_ip = out.stdout
                    
                    # Set fields
                    task.tid    = task_tid
                    task.status = TaskStatuses.running
                    task.ip     = task_ip
                    task.port   = 8590
                
                    # Save
                    task.save()

            elif compute == 'demoremote':
                logger.debug('Using Demo Remote as compute resource')


                # 1) Run the singularity container on slurmclusterworker-one (non blocking)
                run_command = 'ssh -4 -o StrictHostKeyChecking=no slurmclusterworker-one  "export SINGULARITY_NOHTTPS=true && exec nohup singularity run --pid --writable-tmpfs --containall --cleanenv docker://dregistry:5000/rosetta/metadesktop &> /dev/null & echo \$!"'
                out = os_shell(run_command, capture=True)
                if out.exit_code != 0:                        
                    raise Exception(out.stderr)
                
                # Save pid echoed by the command above
                task_pid = out.stdout

                # 2) Simulate the agent (i.e. report container IP and port port)

                # Get task IP address
                out = os_shell('sudo docker inspect --format \'{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}\' slurmclusterworker-one | tail -n1', capture=True)
                if out.exit_code != 0:
                    raise Exception('Error: ' + out.stderr)
                task_ip = out.stdout
                
                # Set fields
                task.uuid   = str_uuid
                task.tid    = out.stdout
                task.compute = 'local'
                task.tid    = task.uuid
                task.status = TaskStatuses.running
                task.ip     = task_ip
                task.pid    = task_pid
                task.port   = 8590
                
                # Save
                task.save()
                

            else:
                raise Exception('Consistency exception: invalid compute resource "{}'.format(compute))

        except Exception as e:
            data['error'] = 'Error in creating new Task.'
            logger.error(e)