Commit ee9076f7 authored by Stefano Alberto Russo's avatar Stefano Alberto Russo
Browse files

Added support for computing parameters and for stopping Slurm Jobs, improved...

Added support for computing parameters and for stopping Slurm Jobs, improved agent and output file handling. Minor fixes
parent 6c563463
Loading
Loading
Loading
Loading
+0 −1
Original line number Diff line number Diff line
@@ -308,7 +308,6 @@ print(port)
                logger.info('Setting task "{}" to ip "{}" and port "{}"'.format(task.uuid, task_ip, task_port))
                task.status = TaskStatuses.running
                task.ip     = task_ip
                #task.pid    = task_pid
                task.port   = int(task_port)
                task.save()
                return HttpResponse('OK')
+85 −19
Original line number Diff line number Diff line
@@ -72,6 +72,7 @@ class ComputingManager(object):
        return self._get_task_log(task, **kwargs)



class LocalComputingManager(ComputingManager):
    
    def _start_task(self, task):
@@ -135,6 +136,11 @@ class LocalComputingManager(ComputingManager):
        if out.exit_code != 0:
            raise Exception(out.stderr)
 
        # Set task as stopped
        task.status = TaskStatuses.stopped
        task.save()

    
    def _get_task_log(self, task, **kwargs):

        # View the Docker container log (attach)
@@ -147,6 +153,7 @@ class LocalComputingManager(ComputingManager):
            return out.stdout



class RemoteComputingManager(ComputingManager):
    
    def _start_task(self, task, **kwargs):
@@ -157,7 +164,7 @@ class RemoteComputingManager(ComputingManager):
        user = task.computing.get_conf_param('user')

        # Get user keys
        if task.computing.require_user_keys:
        if task.computing.require_user_auth_keys:
            user_keys = Keys.objects.get(user=task.user, default=True)
        else:
            raise NotImplementedError('Remote tasks not requiring keys are not yet supported')
@@ -219,8 +226,7 @@ class RemoteComputingManager(ComputingManager):
        task_pid = out.stdout

        # Set fields

        #task.status = TaskStatuses.sumbitted
        #task.status = TaskStatuses.running
        task.pid = task_pid
 
        # Save
@@ -230,7 +236,7 @@ class RemoteComputingManager(ComputingManager):
    def _stop_task(self, task, **kwargs):

        # Get user keys
        if task.computing.require_user_keys:
        if task.computing.require_user_auth_keys:
            user_keys = Keys.objects.get(user=task.user, default=True)
        else:
            raise NotImplementedError('Remote tasks not requiring keys are not yet supported')
@@ -247,13 +253,17 @@ class RemoteComputingManager(ComputingManager):
            if not 'No such process' in out.stderr:
                raise Exception(out.stderr)

        # Set task as stopped
        task.status = TaskStatuses.stopped
        task.save()


    def _get_task_log(self, task, **kwargs):
        # Get computing host
        host = task.computing.get_conf_param('host')

        # Get id_rsa
        if task.computing.require_user_keys:
        if task.computing.require_user_auth_keys:
            user_keys = Keys.objects.get(user=task.user, default=True)
            id_rsa_file = user_keys.private_key_file
        else:
@@ -280,16 +290,34 @@ class SlurmComputingManager(ComputingManager):
        user = task.computing.get_conf_param('user')
        
        # Get user keys
        if task.computing.require_user_keys:
        if task.computing.require_user_auth_keys:
            user_keys = Keys.objects.get(user=task.user, default=True)
        else:
            raise NotImplementedError('Remote tasks not requiring keys are not yet supported')

        # 1) Run the container on the host (non blocking)
        # Get task computing parameters and set sbatch args
        sbatch_args = ''
        if task.computing_options:
            task_partition = task.computing_options.get('partition', None)
            task_cpus = task.computing_options.get('cpus', None)
            task_memory = task.computing_options.get('memory', None)

            # Set sbatch args
            sbatch_args = ''
            if task_partition:
                sbatch_args += '-p {} '.format(task_partition)
            #if task_cpus:
            #    sbatch_args += '-c {} '.format()
            #if task_memory:
            #    sbatch_args += '-m {} '.format()
        
        # Set output and error files
        sbatch_args += ' --output=\$HOME/{}.log --error=\$HOME/{}.log '.format(task.uuid, task.uuid)

        # 1) Run the container on the host (non blocking)
        if task.container.type == 'singularity':

            if not task.container.dynamic_ports:
            if not task.container.supports_dynamic_ports:
                raise Exception('This task does not support dynamic port allocation and is therefore not supported using singularity on Slurm')

            # Set pass if any
@@ -304,18 +332,12 @@ class SlurmComputingManager(ComputingManager):

            run_command = 'ssh -i {} -4 -o StrictHostKeyChecking=no {}@{} '.format(user_keys.private_key_file, user, host)

            run_command += '\'bash -c "echo \\"#!/bin/bash\nwget {}:8080/api/v1/base/agent/?task_uuid={} -O /tmp/agent_{}.py &> /dev/null && export BASE_PORT=\\\\\\$(python /tmp/agent_{}.py 2> /tmp/{}.log) && '.format(webapp_ip, task.uuid, task.uuid, task.uuid, task.uuid)
            run_command += '\'bash -c "echo \\"#!/bin/bash\nwget {}:8080/api/v1/base/agent/?task_uuid={} -O \$HOME/agent_{}.py &> /dev/null && export BASE_PORT=\\\\\\$(python \$HOME/agent_{}.py 2> \$HOME/{}.log) && '.format(webapp_ip, task.uuid, task.uuid, task.uuid, task.uuid)
            run_command += 'export SINGULARITY_NOHTTPS=true && export SINGULARITYENV_BASE_PORT=\\\\\\$BASE_PORT && {} '.format(authstring)
            run_command += 'exec nohup singularity run --pid --writable-tmpfs --containall --cleanenv '
            

            # Double to escape for python six for shell (double times three as \\\ escapes a single slash in shell)

            # ssh -i /rosetta/.ssh/id_rsa -4 -o StrictHostKeyChecking=no slurmclustermaster-main "echo \"wget 172.18.0.5:8080/api/v1/base/agent/?task_uuid=558c65c3-8b72-4d6b-8119-e1dcf6f81177 -O /tmp/agent_558c65c3-8b72-4d6b-8119-e1dcf6f81177.py &> /dev/null
            #  && export BASE_PORT=\\\$(python /tmp/agent_558c65c3-8b72-4d6b-8119-e1dcf6f81177.py 2> /tmp/558c65c3-8b72-4d6b-8119-e1dcf6f81177.log) && export SINGULARITY_NOHTTPS=true && export SINGULARITYENV_BASE_PORT=\\\$BASE_PORT &&  export SINGULARITYENV_AUTH_PASS=testpass 
            #  && exec nohup singularity run --pid --writable-tmpfs --containall --cleanenv docker://dregistry:5000/rosetta/metadesktop &> /tmp/558c65c3-8b72-4d6b-8119-e1dcf6f81177.log\" > /tmp/558c65c3-8b72-4d6b-8119-e1dcf6f81177.sh"

            
            # Set registry
            if task.container.registry == 'docker_local':
                registry = 'docker://dregistry:5000/'
@@ -324,7 +346,7 @@ class SlurmComputingManager(ComputingManager):
            else:
                raise NotImplementedError('Registry {} not supported'.format(task.container.registry))
    
            run_command+='{}{} &> /tmp/{}.log\\" > /tmp/{}.sh && sbatch -p partition1 /tmp/{}.sh"\''.format(registry, task.container.image, task.uuid, task.uuid, task.uuid)
            run_command+='{}{} &> \$HOME/{}.log\\" > \$HOME/{}.sh && sbatch {} \$HOME/{}.sh"\''.format(registry, task.container.image, task.uuid, task.uuid, sbatch_args, task.uuid)

            
        else:
@@ -334,9 +356,53 @@ class SlurmComputingManager(ComputingManager):
        if out.exit_code != 0:
            raise Exception(out.stderr)

        # Log        
        logger.debug('Shell exec output: "{}"'.format(out))

        # Parse sbatch output. Example: Output(stdout='Submitted batch job 3', stderr='', exit_code=0)
        job_id = out.stdout.split(' ')[-1]
        try:
            int(job_id)
        except:
            raise Exception('Cannot find int job id from output string "{}"'.format(out.stdout))
        
        # Load back the task to avoid concurrency problems in the agent call
        task_uuid = task.uuid
        task = Task.objects.get(uuid=task_uuid)

        # Save job id as task pid
        task.pid = job_id
        
        # Set status (only fi we get here before the agent which sets the status as running via the API)
        if task.status != TaskStatuses.running:
            task.status = TaskStatuses.sumbitted
        
        # Save
        task.save()


    def _stop_task(self, task, **kwargs):
        raise NotImplementedError('Not implemented')
        
        # Get user keys
        if task.computing.require_user_auth_keys:
            user_keys = Keys.objects.get(user=task.user, default=True)
        else:
            raise NotImplementedError('Remote tasks not requiring keys are not yet supported')

        # Get computing host
        host = task.computing.get_conf_param('master')
        user = task.computing.get_conf_param('user')

        # Stop the task remotely
        stop_command = 'ssh -i {} -4 -o StrictHostKeyChecking=no {}@{} \'/bin/bash -c "scancel {}"\''.format(user_keys.private_key_file, user, host, task.pid)
        logger.debug(stop_command)
        out = os_shell(stop_command, capture=True)
        if out.exit_code != 0:
            raise Exception(out.stderr)
        
        # Set task as topped
        task.status = TaskStatuses.stopped
        task.save()


    def _get_task_log(self, task, **kwargs):
+43 −37
Original line number Diff line number Diff line
@@ -53,9 +53,10 @@ class Command(BaseCommand):
                                     image    = 'rosetta/metadesktop',
                                     type     = 'docker',
                                     registry = 'docker_local',
                                     default_ports = '8590',
                                     dynamic_ports = True,
                                     require_pass  = True)
                                     ports    = '8590',
                                     supports_dynamic_ports = True,
                                     supports_user_auth     = False,
                                     supports_pass_auth     = True)

            # MetaDesktop Singularity
            Container.objects.create(user     = None,
@@ -63,9 +64,10 @@ class Command(BaseCommand):
                                     image    = 'rosetta/metadesktop',
                                     type     = 'singularity',
                                     registry = 'docker_local',
                                     default_ports = '8590',
                                     dynamic_ports = True,
                                     require_pass  = True)
                                     ports    = '8590',
                                     supports_dynamic_ports = True,
                                     supports_user_auth     = False,
                                     supports_pass_auth     = True)

            # Astrocook
            Container.objects.create(user     = None,
@@ -73,8 +75,10 @@ class Command(BaseCommand):
                                     image    = 'sarusso/astrocook:b2b819e',
                                     type     = 'docker',
                                     registry = 'docker_local',
                                     dynamic_ports = False,
                                     default_ports = '8590')
                                     ports    = '8590',
                                     supports_dynamic_ports = False,
                                     supports_user_auth     = False,
                                     supports_pass_auth     = False)


        # Private containers
@@ -90,8 +94,10 @@ class Command(BaseCommand):
                                     image    = 'jupyter/base-notebook',
                                     type     = 'docker',
                                     registry = 'docker_hub',
                                     dynamic_ports = False,
                                     default_ports = '8888')
                                     ports    = '8888', 
                                     supports_dynamic_ports = False,
                                     supports_user_auth     = False,
                                     supports_pass_auth     = False)

        # Computing resources
        computing_resources = Computing.objects.all()
@@ -107,8 +113,8 @@ class Command(BaseCommand):
                                     name = 'Local',
                                     type = 'local',
                                     require_sys_conf  = False,
                                     require_user_conf = False,
                                     require_user_keys = False)
                                     require_user_auth_conf = False,
                                     require_user_auth_keys = False)


            #==============================
@@ -118,8 +124,8 @@ class Command(BaseCommand):
                                                             name = 'Demo remote',
                                                             type = 'remote',
                                                             require_sys_conf  = True,
                                                             require_user_conf = True,
                                                             require_user_keys = True)
                                                             require_user_auth_conf = True,
                                                             require_user_auth_keys = True)
    
            ComputingSysConf.objects.create(computing = demo_remote_auth_computing,
                                            data      = {'host': 'slurmclusterworker-one'})
@@ -136,12 +142,12 @@ class Command(BaseCommand):
                                                            name = 'Demo Slurm',
                                                            type = 'slurm',
                                                            require_sys_conf  = True,
                                                            require_user_conf = True,
                                                            require_user_keys = True)
                                                            require_user_auth_conf = True,
                                                            require_user_auth_keys = True)
    
            # Create demo slurm sys computing conf
            ComputingSysConf.objects.create(computing = demo_slurm_computing,
                                            data      = {'master': 'slurmclusterworker-master'})
                                            data      = {'master': 'slurmclustermaster-main'})

            # Create demo slurm user computing conf
            ComputingUserConf.objects.create(user      = testuser,
+19 −16
Original line number Diff line number Diff line
@@ -80,14 +80,16 @@ class Container(models.Model):
    image    = models.CharField('Container image', max_length=255, blank=False, null=False)
    type     = models.CharField('Container type', max_length=36, blank=False, null=False)
    registry = models.CharField('Container registry', max_length=255, blank=False, null=False)
    default_ports = models.CharField('Container service ports', max_length=36, blank=True, null=True)
    dynamic_ports = models.BooleanField(default=False)
    require_user  = models.BooleanField(default=False)
    require_pass  = models.BooleanField(default=False)
    ports    = models.CharField('Container service ports', max_length=36, blank=True, null=True)

    # Capabilities
    supports_dynamic_ports = models.BooleanField(default=False)
    supports_user_auth = models.BooleanField(default=False)
    supports_pass_auth = models.BooleanField(default=False)


    def __str__(self):
        return str('Container of type "{}" with image "{}" with service ports "{}" from registry "{}" of user "{}"'.format(self.type, self.image, self.default_ports, self.registry, self.user))
        return str('Container of type "{}" with image "{}" and  ports "{}" from registry "{}" of user "{}"'.format(self.type, self.image, self.ports, self.registry, self.user))


    @property
@@ -110,8 +112,8 @@ class Computing(models.Model):
    type = models.CharField('Computing Type', max_length=255, blank=False, null=False)

    require_sys_conf  = models.BooleanField(default=False)
    require_user_conf = models.BooleanField(default=False)
    require_user_keys = models.BooleanField(default=False)
    require_user_auth_conf = models.BooleanField(default=False)
    require_user_auth_keys = models.BooleanField(default=False)


    def __str__(self):
@@ -223,6 +225,8 @@ class Task(models.Model):
    auth_pass     = models.CharField('Task auth pass', max_length=36, blank=True, null=True)
    access_method = models.CharField('Task access method', max_length=36, blank=True, null=True)

    # Computing options
    computing_options = JSONField(blank=True, null=True)

    def save(self, *args, **kwargs):
        
@@ -234,11 +238,6 @@ class Task(models.Model):
        # Call parent save
        super(Task, self).save(*args, **kwargs)


    def __str__(self):
        return str('Task "{}" of user "{}" in status "{}" (TID "{}")'.format(self.name, self.user.email, self.status, self.tid))


    def update_status(self):
        if self.computing == 'local':
            
@@ -266,6 +265,10 @@ class Task(models.Model):
        return str(self.uuid).split('-')[0]


    def __str__(self):
        return str('Task "{}" of user "{}" running on "{}" in status "{}" created at "{}"'.format(self.name, self.user, self.computing, self.status, self.created))



#=========================
#  Keys 
+26 −4
Original line number Diff line number Diff line
@@ -23,7 +23,7 @@
          <table class="dashboard" style="max-width:430px">

           <tr>
            <td><b>Container name</b></td>
            <td><b>Name</b></td>
            <td>
             <input type="text" name="container_name" value="" placeholder="" size="23" required />
            </td>
@@ -49,19 +49,41 @@
           </tr>

           <tr>
            <td><b>Container image</b></td>
            <td><b>Image</b></td>
            <td>
             <input type="text" name="container_image" value="" placeholder="" size="23" required />
            </td>
           </tr>

           <tr>
            <td><b>Service port</b></td>
            <td><b>Ports</b></td>
            <td>
             <input type="text" name="container_service_port" value="" placeholder="" size="5" />
             <input type="text" name="container_ports" value="" placeholder="" size="5" />
            </td>
           </tr>

           <tr>
            <td colspan=2>
             <b>Supports dynamic ports</b> &nbsp; 
             <input type="checkbox" name="container_supports_dynamic_ports" value="True" />
            </td>
           </tr>

           <tr>
            <td colspan=2>
             <b>Supports user auth</b> &nbsp; 
             <input type="checkbox" name="container_supports_user_auth" value="True" />
            </td>
           </tr>

           <tr>
            <td colspan=2>
             <b>Supports pass auth</b> &nbsp; 
             <input type="checkbox" name="container_supports_pass_auth" value="True" />
            </td>
           </tr>


           <tr>
           <td colspan=2 align=center style="padding:20px">
           <input type="submit" value="Add">
Loading