Commit 50a6a2bd authored by Stefano Alberto Russo's avatar Stefano Alberto Russo
Browse files

Merge branch 'feature/computing_managers_refactoring' into develop

parents aab0d1dd 3fbee378
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -21,6 +21,7 @@ services:
    privileged: true
    volumes:
      - ./data/shared:/shared
      - /var/run/docker.sock:/var/run/docker.sock

  dregistry:
    container_name: dregistry
+6 −0
Original line number Diff line number Diff line
FROM rosetta/slurmcluster
MAINTAINER Stefano Alberto Russo <stefano.russo@gmail.com>

# Docker
RUN apt-get install docker.io -y

# Add slurmtestuser user to sudoers
RUN adduser slurmtestuser sudo
+24 −8
Original line number Diff line number Diff line
@@ -323,11 +323,27 @@ print(port)
            logger.info('Setting task "{}" to ip "{}" and port "{}"'.format(task.uuid, task_interface_ip, task_interface_port))
            task.status = TaskStatuses.running
            task.interface_ip = task_interface_ip
            
            # Get container runtime
            container_runtime = None
            if task.computing_options:
                container_runtime = task.computing_options.get('container_runtime', None)
            if not container_runtime:
                container_runtime = task.computing.default_container_runtime
            
            if container_runtime=='singularity':
                # For Singularity, set this only if the container supports custom interface ports
                if task.container.supports_custom_interface_port:
                    task.interface_port = int(task_interface_port)
            else:
                # For all other container runtimes, set it in any case
                task.interface_port = int(task_interface_port)
            
            # Save the task
            task.save()
                    
            # Notify the user that the task called back home
            if settings.DJANGO_EMAIL_APIKEY:
                logger.info('Sending task ready mail notification to "{}"'.format(task.user.email))
                mail_subject = 'Your Task "{}" is now starting up'.format(task.container.name)
                mail_text = 'Hello,\n\nyour Task "{}" on {} is now starting up. Check logs or connect here: https://{}/tasks/?uuid={}\n\nThe Rosetta notifications bot.'.format(task.container.name, task.computing, settings.ROSETTA_HOST, task.uuid)
+113 −17
Original line number Diff line number Diff line
@@ -73,7 +73,7 @@ class ComputingManager(object):
        return self._get_task_log(task, **kwargs)


class SingleNodeComputingManager(ComputingManager):
class StandaloneComputingManager(ComputingManager):
    pass


@@ -87,7 +87,7 @@ class SSHComputingManager(ComputingManager):



class InternalSingleNodeComputingManager(SingleNodeComputingManager):
class InternalStandaloneComputingManager(StandaloneComputingManager):
    
    def _start_task(self, task):

@@ -111,7 +111,7 @@ class InternalSingleNodeComputingManager(SingleNodeComputingManager):
        #run_command += ' -v {}/user-{}:/data'.format(settings.LOCAL_USER_DATA_DIR, task.user.id)

        # Host name, image entry command
        run_command += ' -h task-{} -d -t {}/{}:{}'.format(task.uuid, task.container.registry, task.container.image_name, task.container.image_tag)
        run_command += ' -h task-{} -d -t {}/{}:{}'.format(task.short_uuid, task.container.registry, task.container.image_name, task.container.image_tag)

        # Debug
        logger.debug('Running new task with command="{}"'.format(run_command))
@@ -176,7 +176,7 @@ class InternalSingleNodeComputingManager(SingleNodeComputingManager):



class SSHSingleNodeComputingManager(SingleNodeComputingManager, SSHComputingManager):
class SSHStandaloneComputingManager(StandaloneComputingManager, SSHComputingManager):
    
    def _start_task(self, task, **kwargs):
        logger.debug('Starting a remote task "{}"'.format(self.computing))
@@ -189,7 +189,14 @@ class SSHSingleNodeComputingManager(SingleNodeComputingManager, SSHComputingMana
        webapp_conn_string = get_webapp_conn_string()
            
        # Handle container runtime
        if self.computing.default_container_runtime == 'singularity':
        container_runtime = None
        if task.computing_options:
            container_runtime = task.computing_options.get('container_runtime', None)
        if not container_runtime:
            container_runtime = task.computing.default_container_runtime

        # Runtime-specific part 
        if container_runtime == 'singularity':

            #if not task.container.supports_custom_interface_port:
            #     raise Exception('This task does not support dynamic port allocation and is therefore not supported using singularity on Slurm')
@@ -241,8 +248,57 @@ class SSHSingleNodeComputingManager(SingleNodeComputingManager, SSHComputingMana
            run_command+='docker://{}/{}:{} &>> /tmp/{}_data/task.log & echo \$!"\''.format(task.container.registry, task.container.image_name, task.container.image_tag, task.uuid)


        elif container_runtime == 'docker':

            # Set pass if any
            authstring = ''
            if not task.requires_proxy_auth and task.password:
                authstring = ' -e AUTH_PASS={} '.format(task.password)
                
            # Handle storages (binds)
            binds = ''
            storages = Storage.objects.filter(computing=self.computing)
            for storage in storages:
                if storage.type == 'generic_posix' and storage.bind_path:
                    
                    # Expand the base path
                    expanded_base_path = storage.base_path        
                    if '$SSH_USER' in expanded_base_path:
                        if storage.access_through_computing:
                            expanded_base_path = expanded_base_path.replace('$SSH_USER', computing_user)
                        else:
            raise NotImplementedError('Container runtime {} not supported'.format(self.computing.default_container_runtime))
                            raise NotImplementedError('Accessing a storage with ssh+cli without going through its computing resource is not implemented')
                    if '$USER' in expanded_base_path:
                        expanded_base_path = expanded_base_path.replace('$USER', self.task.user.name)
                        
                    # Expand the bind_path
                    expanded_bind_path = storage.bind_path        
                    if '$SSH_USER' in expanded_bind_path:
                        if storage.access_through_computing:
                            expanded_bind_path = expanded_bind_path.replace('$SSH_USER', computing_user)
                        else:
                            raise NotImplementedError('Accessing a storage with ssh+cli without going through its computing resource is not implemented')
                    if '$USER' in expanded_bind_path:
                        expanded_bind_path = expanded_bind_path.replace('$USER', self.task.user.name)
                        
                    # Add the bind
                    if not binds:
                        binds = '-v{}:{}'.format(expanded_base_path, expanded_bind_path)
                    else:
                        binds += ' -v{}:{}'.format(expanded_base_path, expanded_bind_path)
            
            # TODO: remove this hardcoding
            prefix = 'sudo' if computing_host == 'slurmclusterworker-one' else ''
            
            run_command  = 'ssh -o LogLevel=ERROR -i {} -4 -o StrictHostKeyChecking=no {}@{} '.format(computing_keys.private_key_file, computing_user, computing_host)
            run_command += '/bin/bash -c \'"rm -rf /tmp/{}_data && mkdir /tmp/{}_data && chmod 700 /tmp/{}_data && '.format(task.uuid, task.uuid, task.uuid) 
            run_command += 'wget {}/api/v1/base/agent/?task_uuid={} -O /tmp/{}_data/agent.py &> /dev/null && export TASK_PORT=\$(python /tmp/{}_data/agent.py 2> /tmp/{}_data/task.log) && '.format(webapp_conn_string, task.uuid, task.uuid, task.uuid, task.uuid)
            run_command += '{} docker run -p \$TASK_PORT:{} {} {} '.format(prefix, task.container.interface_port, authstring, binds)        
            run_command += '-h task-{} -d -t {}/{}:{}'.format(task.short_uuid, task.container.registry, task.container.image_name, task.container.image_tag)
            run_command += '"\''
            
        else:
            raise NotImplementedError('Container runtime {} not supported'.format(container_runtime))

        out = os_shell(run_command, capture=True)
        if out.exit_code != 0:
@@ -267,11 +323,28 @@ class SSHSingleNodeComputingManager(SingleNodeComputingManager, SSHComputingMana
        # Get credentials
        computing_user, computing_host, computing_keys = get_ssh_access_mode_credentials(self.computing, task.user)

        # Stop the task remotely
        stop_command = 'ssh -o LogLevel=ERROR -i {} -4 -o StrictHostKeyChecking=no {}@{} \'/bin/bash -c "kill -9 {}"\''.format(computing_keys.private_key_file, computing_user, computing_host, task.id)
        # Handle container runtime
        container_runtime = None
        if task.computing_options:
            container_runtime = task.computing_options.get('container_runtime', None)
        if not container_runtime:
            container_runtime = task.computing.default_container_runtime

        if container_runtime=='singularity':
            internal_stop_command = 'kill -9 {}'.format(task.id)            
        elif container_runtime=='docker':
            # TODO: remove this hardcoding
            prefix = 'sudo' if computing_host == 'slurmclusterworker-one' else ''
            internal_stop_command = '{} docker stop {} && {} docker rm {}'.format(prefix,task.id,prefix,task.id)
        else:
            raise NotImplementedError('Container runtime {} not supported'.format(container_runtime))

        stop_command = 'ssh -o LogLevel=ERROR -i {} -4 -o StrictHostKeyChecking=no {}@{} \'/bin/bash -c "{}"\''.format(computing_keys.private_key_file, computing_user, computing_host, internal_stop_command)
        out = os_shell(stop_command, capture=True)
        if out.exit_code != 0:
            if not 'No such process' in out.stderr:
            if ('No such process' in out.stderr) or ('No such container' in out.stderr):
                pass
            else:
                raise Exception(out.stderr)

        # Set task as stopped
@@ -284,9 +357,26 @@ class SSHSingleNodeComputingManager(SingleNodeComputingManager, SSHComputingMana
        # Get credentials
        computing_user, computing_host, computing_keys = get_ssh_access_mode_credentials(self.computing, task.user)
        
        # View log remotely
        view_log_command = 'ssh -o LogLevel=ERROR -i {} -4 -o StrictHostKeyChecking=no {}@{} \'/bin/bash -c "cat /tmp/{}_data/task.log"\''.format(computing_keys.private_key_file, computing_user, computing_host, task.uuid)
        # Handle container runtime
        container_runtime = None
        if task.computing_options:
            container_runtime = task.computing_options.get('container_runtime', None)
        if not container_runtime:
            container_runtime = task.computing.default_container_runtime

        if container_runtime=='singularity':
            internal_view_log_command = 'cat /tmp/{}_data/task.log'.format(task.uuid)            
        elif container_runtime=='docker':
            # TODO: remove this hardcoding
            prefix = 'sudo' if computing_host == 'slurmclusterworker-one' else ''
            internal_view_log_command = '{} docker logs {}'.format(prefix,task.id)
        else:
            raise NotImplementedError('Container runtime {} not supported'.format(container_runtime))
            
        # Prepare full comand
        view_log_command = 'ssh -o LogLevel=ERROR -i {} -4 -o StrictHostKeyChecking=no {}@{} \'/bin/bash -c "{}"\''.format(computing_keys.private_key_file, computing_user, computing_host, internal_view_log_command)

        # Execute
        out = os_shell(view_log_command, capture=True)
        if out.exit_code != 0:
            raise Exception(out.stderr)
@@ -294,7 +384,6 @@ class SSHSingleNodeComputingManager(SingleNodeComputingManager, SSHComputingMana
            return out.stdout



class SlurmSSHClusterComputingManager(ClusterComputingManager, SSHComputingManager):
    
    def _start_task(self, task, **kwargs):
@@ -327,8 +416,15 @@ class SlurmSSHClusterComputingManager(ClusterComputingManager, SSHComputingManag
        # Set output and error files
        sbatch_args += ' --output=\$HOME/{}.log --error=\$HOME/{}.log '.format(task.uuid, task.uuid)

        # Submit the job
        if task.computing.default_container_runtime == 'singularity':
        # Handle container runtime
        container_runtime = None
        if task.computing_options:
            container_runtime = task.computing_options.get('container_runtime', None)
        if not container_runtime:
            container_runtime = task.computing.default_container_runtime

        # Runtime-specific part 
        if container_runtime == 'singularity':

            #if not task.container.supports_custom_interface_port:
            #     raise Exception('This task does not support dynamic port allocation and is therefore not supported using singularity on Slurm')
@@ -380,7 +476,7 @@ class SlurmSSHClusterComputingManager(ClusterComputingManager, SSHComputingManag
            run_command+='docker://{}/{}:{} &> \$HOME/{}.log\\" > \$HOME/{}.sh && sbatch {} \$HOME/{}.sh"\''.format(task.container.registry, task.container.image_name, task.container.image_tag, task.uuid, task.uuid, sbatch_args, task.uuid)

        else:
            raise NotImplementedError('Container runtime {} not supported'.format(task.computing.default_container_runtime))
            raise NotImplementedError('Container runtime {} not supported'.format(container_runtime))

        out = os_shell(run_command, capture=True)
        if out.exit_code != 0:
+14 −10
Original line number Diff line number Diff line
@@ -239,21 +239,21 @@ class Computing(models.Model):
    def manager(self):
        from . import computing_managers
        
        # Instantiate the computing manager based on type (if not already done)
        # Hash table mapping
        managers_mapping = {}
        managers_mapping['cluster'+'ssh+cli'+'user_keys'+'slurm'] = computing_managers.SlurmSSHClusterComputingManager
        managers_mapping['standalone'+'ssh+cli'+'user_keys'+'None'] = computing_managers.SSHStandaloneComputingManager
        managers_mapping['standalone'+'internal'+'internal'+'None'] = computing_managers.InternalStandaloneComputingManager
        
        # Instantiate the computing manager and return (if not already done)
        try:
            return self._manager
        except AttributeError:
            if self.type == 'cluster' and self.access_mode == 'ssh+cli' and self.auth_mode == 'user_keys' and self.wms == 'slurm':
                self._manager = computing_managers.SlurmSSHClusterComputingManager(self)
            elif self.type == 'standalone' and self.access_mode == 'ssh+cli' and self.auth_mode == 'user_keys' and self.wms is None:
                self._manager = computing_managers.SSHSingleNodeComputingManager(self)
            elif self.type == 'standalone' and self.access_mode == 'internal' and self.auth_mode == 'internal' and self.wms is None:
                self._manager = computing_managers.InternalSingleNodeComputingManager(self)
            else:
                raise ConsistencyException('Don\'t know how to instantiate a computing manager for computing resource of type "{}", access mode "{}" and WMS "{}"'.format(self.type, self.access_mode, self.wms))
            self._manager = managers_mapping[self.type+self.access_mode+self.auth_mode+str(self.wms)](self)
            return self._manager

    

#=========================
#  Tasks 
#=========================
@@ -331,6 +331,10 @@ class Task(models.Model):
    def __str__(self):
        return str('Task "{}" of user "{}" running on "{}" in status "{}" created at "{}"'.format(self.name, self.user.email, self.computing, self.status, self.created))

    @property
    def short_uuid(self):
        return str(self.uuid)[0:8]

    @property
    def color(self):
        string_int_hash = hash_string_to_int(self.name)
@@ -339,7 +343,7 @@ class Task(models.Model):
    
    @property
    def sharable_link(self):
        return 'https://{}/t/{}'.format(settings.ROSETTA_HOST, str(self.uuid)[0:8])
        return 'https://{}/t/{}'.format(settings.ROSETTA_HOST, self.short_uuid)
    
    @property
    def tcp_tunnel_host(self):
Loading