Commit 89086332 authored by Stefano Alberto Russo's avatar Stefano Alberto Russo
Browse files

Refactored the computing type by adding the access mode. Refactored the...

Refactored the computing type by adding the access mode. Refactored the computing manager classes. Added the "access_mode" field and related sys and user conf in the Computing model.
parent b044fa68
Loading
Loading
Loading
Loading
+5 −1
Original line number Diff line number Diff line
@@ -711,6 +711,10 @@ class FileManagerAPI(PrivateGETAPI, PrivatePOSTAPI):
                
                for computing in computings:
                    
                    # For now, we only support SSH-based computing resources
                    if not 'ssh' in computing.access_method:
                        continue
                        
                    # Attach user conf in any
                    computing.attach_user_conf(request.user)
                    
+187 −170
Original line number Diff line number Diff line
@@ -73,8 +73,21 @@ class ComputingManager(object):
        return self._get_task_log(task, **kwargs)


class SingleNodeComputingManager(ComputingManager):
    pass


class ClusterComputingManager(ComputingManager):
    pass


class SSHComputingManager(ComputingManager):
    # SSH-f + keys utils here
    pass



class LocalComputingManager(ComputingManager):
class InternalSingleNodeComputingManager(SingleNodeComputingManager):
    
    def _start_task(self, task):

@@ -161,7 +174,11 @@ class LocalComputingManager(ComputingManager):



class RemoteComputingManager(ComputingManager):




class SSHSingleNodeComputingManager(SingleNodeComputingManager, SSHComputingManager):
    
    def _start_task(self, task, **kwargs):
        logger.debug('Starting a remote task "{}"'.format(self.computing))
@@ -300,7 +317,7 @@ class RemoteComputingManager(ComputingManager):



class SlurmComputingManager(ComputingManager):
class SlurmSSHClusterComputingManager(ClusterComputingManager, SSHComputingManager):
    
    def _start_task(self, task, **kwargs):
        logger.debug('Starting a remote task "{}"'.format(self.computing))
@@ -467,173 +484,173 @@ class SlurmComputingManager(ComputingManager):
            return out.stdout



class RemotehopComputingManager(ComputingManager):
    
    def _start_task(self, task, **kwargs):
        logger.debug('Starting a remote task "{}"'.format(self.computing))

        # Get computing params
        first_host = self.computing.conf.get('first_host')
        first_user = self.computing.conf.get('first_user')
        second_host = self.computing.conf.get('second_host')
        second_user = self.computing.conf.get('second_user')
        setup_command = self.computing.conf.get('setup_command')

        # TODO: De hard-code
        use_agent = False

        # Get user keys
        if self.computing.requires_user_keys:
            user_keys = KeyPair.objects.get(user=task.user, default=True)
        else:
            raise NotImplementedError('Remote tasks not requiring keys are not yet supported')

        # Get webapp conn string
        from.utils import get_webapp_conn_string
        webapp_conn_string = get_webapp_conn_string()
            
        # Run the container on the host (non blocking)
        if task.container.type == 'singularity':

            task.tid    = task.uuid
            task.save()

            # Set pass if any
            if task.auth_pass:
                authstring = ' export SINGULARITYENV_AUTH_PASS={} && '.format(task.auth_pass)
            else:
                authstring = ''

            # Set binds, only from sys config if the resource is not owned by the user
            if self.computing.user != task.user:
                binds = self.computing.sys_conf.get('binds')
            else:
                binds = self.computing.conf.get('binds')
            if not binds:
                binds = ''
            else:
                binds = '-B {}'.format(binds)

            # Manage task extra binds
            if task.extra_binds:
                if not binds:
                    binds = '-B {}'.format(task.extra_binds)
                else:
                    binds += ',{}'.format(task.extra_binds)

            run_command  = 'ssh -o LogLevel=ERROR -i {} -4 -o StrictHostKeyChecking=no {}@{} '.format(user_keys.private_key_file, first_user, first_host)
            run_command += '"ssh -4 -o StrictHostKeyChecking=no {}@{} /bin/bash -c \''.format(second_user, second_host)
            
            if use_agent:
                run_command += '\'wget {}/api/v1/base/agent/?task_uuid={} -O \$HOME/agent_{}.py &> /dev/null && export BASE_PORT=\$(python \$HOME/agent_{}.py 2> \$HOME/{}.log) && '.format(webapp_conn_string, task.uuid, task.uuid, task.uuid, task.uuid)
                if setup_command:
                    run_command += setup_command + ' && '
                run_command += '\'export SINGULARITY_NOHTTPS=true && export SINGULARITYENV_BASE_PORT=\$BASE_PORT && {} '.format(authstring)
                run_command += 'rm -rf /tmp/{}_data && mkdir -p /tmp/{}_data/tmp &>> \$HOME/{}.log && mkdir -p /tmp/{}_data/home &>> \$HOME/{}.log && chmod 700 /tmp/{}_data && '.format(task.uuid, task.uuid, task.uuid, task.uuid, task.uuid, task.uuid)
                run_command += 'exec nohup singularity run {} --pid --writable-tmpfs --no-home --home=/home/metauser --workdir /tmp/{}_data/tmp -B/tmp/{}_data/home:/home --containall --cleanenv '.format(binds, task.uuid, task.uuid)
            else:
                run_command += ' : && ' # Trick to prevent some issues in exporting variables                
                if setup_command:
                    run_command += setup_command + ' && '
                run_command += 'export SINGULARITY_NOHTTPS=true && export SINGULARITYENV_BASE_PORT={} && {} '.format(task.port, authstring)
                run_command += 'rm -rf /tmp/{}_data && mkdir -p /tmp/{}_data/tmp &>> \$HOME/{}.log && mkdir -p /tmp/{}_data/home &>> \$HOME/{}.log && chmod 700 /tmp/{}_data && '.format(task.uuid, task.uuid, task.uuid, task.uuid, task.uuid, task.uuid)
                run_command += 'exec nohup singularity run {} --pid --writable-tmpfs --no-home --home=/home/metauser --workdir /tmp/{}_data/tmp -B/tmp/{}_data/home:/home --containall --cleanenv '.format(binds, task.uuid, task.uuid)
             
            # Set registry
            if task.container.registry == 'docker_local':
                raise Exception('This computing resource does not support local Docker registries yet')
                # Get local Docker registry conn string
                from.utils import get_local_docker_registry_conn_string
                local_docker_registry_conn_string = get_local_docker_registry_conn_string()
                registry = 'docker://{}/'.format(local_docker_registry_conn_string)
            elif task.container.registry == 'docker_hub':
                registry = 'docker://'
            else:
                raise NotImplementedError('Registry {} not supported'.format(task.container.registry))
     
            run_command+='{}{} &>> \$HOME/{}.log & echo \$!\'"'.format(registry, task.container.image, task.uuid)

        else:
            raise NotImplementedError('Container {} not supported'.format(task.container.type))

        out = os_shell(run_command, capture=True)
        if out.exit_code != 0:
            raise Exception(out.stderr)
        
        # Log        
        logger.debug('Shell exec output: "{}"'.format(out))


        # Load back the task to avoid  concurrency problems in the agent call
        task_uuid = task.uuid
        task = Task.objects.get(uuid=task_uuid)

        # Save pid echoed by the command above
        task_pid = out.stdout

        # Set fields
        task.status = TaskStatuses.running
        task.pid = task_pid
        task.ip  = second_host
 
        # Save
        task.save()


    def _stop_task(self, task, **kwargs):

        # Get user keys
        if self.computing.requires_user_keys:
            user_keys = KeyPair.objects.get(user=task.user, default=True)
        else:
            raise NotImplementedError('Remote tasks not requiring keys are not yet supported')

        # Get computing params
        first_host = self.computing.conf.get('first_host')
        first_user = self.computing.conf.get('first_user')
        second_host = self.computing.conf.get('second_host')
        second_user = self.computing.conf.get('second_user')

        # Stop the task remotely
        stop_command  = 'ssh -o LogLevel=ERROR -i {} -4 -o StrictHostKeyChecking=no {}@{} '.format(user_keys.private_key_file, first_user, first_host)
        stop_command += '"ssh -4 -o StrictHostKeyChecking=no {}@{} '.format(second_user, second_host)
        stop_command += 'kill -9 {}"'.format(task.pid)

        out = os_shell(stop_command, capture=True)
        if out.exit_code != 0:
            if not 'No such process' in out.stderr:
                raise Exception(out.stderr)

        # Set task as stopped
        task.status = TaskStatuses.stopped
        task.save()


    def _get_task_log(self, task, **kwargs):
        
        # Get user keys
        if self.computing.requires_user_keys:
            user_keys = KeyPair.objects.get(user=task.user, default=True)
        else:
            raise NotImplementedError('Remote tasks not requiring keys are not yet supported')

        # Get computing params
        first_host = self.computing.conf.get('first_host')
        first_user = self.computing.conf.get('first_user')
        second_host = self.computing.conf.get('second_host')
        second_user = self.computing.conf.get('second_user')

        # View log remotely
        view_log_command  = 'ssh -o LogLevel=ERROR -i {} -4 -o StrictHostKeyChecking=no {}@{} '.format(user_keys.private_key_file, first_user, first_host)
        view_log_command += '"ssh -4 -o StrictHostKeyChecking=no {}@{} '.format(second_user, second_host)
        view_log_command += 'cat \\\\\\$HOME/{}.log"'.format(task.uuid)

        out = os_shell(view_log_command, capture=True)
        if out.exit_code != 0:
            raise Exception(out.stderr)
        else:
            return out.stdout
# TODO: rename the following as "ssh+ssh" access mode? Ore somethign similar?
# class RemotehopComputingManager(ComputingManager):
#     
#     def _start_task(self, task, **kwargs):
#         logger.debug('Starting a remote task "{}"'.format(self.computing))
# 
#         # Get computing params
#         first_host = self.computing.conf.get('first_host')
#         first_user = self.computing.conf.get('first_user')
#         second_host = self.computing.conf.get('second_host')
#         second_user = self.computing.conf.get('second_user')
#         setup_command = self.computing.conf.get('setup_command')
# 
#         # TODO: De hard-code
#         use_agent = False
# 
#         # Get user keys
#         if self.computing.requires_user_keys:
#             user_keys = KeyPair.objects.get(user=task.user, default=True)
#         else:
#             raise NotImplementedError('Remote tasks not requiring keys are not yet supported')
# 
#         # Get webapp conn string
#         from.utils import get_webapp_conn_string
#         webapp_conn_string = get_webapp_conn_string()
#             
#         # Run the container on the host (non blocking)
#         if task.container.type == 'singularity':
# 
#             task.tid    = task.uuid
#             task.save()
# 
#             # Set pass if any
#             if task.auth_pass:
#                 authstring = ' export SINGULARITYENV_AUTH_PASS={} && '.format(task.auth_pass)
#             else:
#                 authstring = ''
# 
#             # Set binds, only from sys config if the resource is not owned by the user
#             if self.computing.user != task.user:
#                 binds = self.computing.sys_conf.get('binds')
#             else:
#                 binds = self.computing.conf.get('binds')
#             if not binds:
#                 binds = ''
#             else:
#                 binds = '-B {}'.format(binds)
# 
#             # Manage task extra binds
#             if task.extra_binds:
#                 if not binds:
#                     binds = '-B {}'.format(task.extra_binds)
#                 else:
#                     binds += ',{}'.format(task.extra_binds)
# 
#             run_command  = 'ssh -o LogLevel=ERROR -i {} -4 -o StrictHostKeyChecking=no {}@{} '.format(user_keys.private_key_file, first_user, first_host)
#             run_command += '"ssh -4 -o StrictHostKeyChecking=no {}@{} /bin/bash -c \''.format(second_user, second_host)
#             
#             if use_agent:
#                 run_command += '\'wget {}/api/v1/base/agent/?task_uuid={} -O \$HOME/agent_{}.py &> /dev/null && export BASE_PORT=\$(python \$HOME/agent_{}.py 2> \$HOME/{}.log) && '.format(webapp_conn_string, task.uuid, task.uuid, task.uuid, task.uuid)
#                 if setup_command:
#                     run_command += setup_command + ' && '
#                 run_command += '\'export SINGULARITY_NOHTTPS=true && export SINGULARITYENV_BASE_PORT=\$BASE_PORT && {} '.format(authstring)
#                 run_command += 'rm -rf /tmp/{}_data && mkdir -p /tmp/{}_data/tmp &>> \$HOME/{}.log && mkdir -p /tmp/{}_data/home &>> \$HOME/{}.log && chmod 700 /tmp/{}_data && '.format(task.uuid, task.uuid, task.uuid, task.uuid, task.uuid, task.uuid)
#                 run_command += 'exec nohup singularity run {} --pid --writable-tmpfs --no-home --home=/home/metauser --workdir /tmp/{}_data/tmp -B/tmp/{}_data/home:/home --containall --cleanenv '.format(binds, task.uuid, task.uuid)
#             else:
#                 run_command += ' : && ' # Trick to prevent some issues in exporting variables                
#                 if setup_command:
#                     run_command += setup_command + ' && '
#                 run_command += 'export SINGULARITY_NOHTTPS=true && export SINGULARITYENV_BASE_PORT={} && {} '.format(task.port, authstring)
#                 run_command += 'rm -rf /tmp/{}_data && mkdir -p /tmp/{}_data/tmp &>> \$HOME/{}.log && mkdir -p /tmp/{}_data/home &>> \$HOME/{}.log && chmod 700 /tmp/{}_data && '.format(task.uuid, task.uuid, task.uuid, task.uuid, task.uuid, task.uuid)
#                 run_command += 'exec nohup singularity run {} --pid --writable-tmpfs --no-home --home=/home/metauser --workdir /tmp/{}_data/tmp -B/tmp/{}_data/home:/home --containall --cleanenv '.format(binds, task.uuid, task.uuid)
#              
#             # Set registry
#             if task.container.registry == 'docker_local':
#                 raise Exception('This computing resource does not support local Docker registries yet')
#                 # Get local Docker registry conn string
#                 from.utils import get_local_docker_registry_conn_string
#                 local_docker_registry_conn_string = get_local_docker_registry_conn_string()
#                 registry = 'docker://{}/'.format(local_docker_registry_conn_string)
#             elif task.container.registry == 'docker_hub':
#                 registry = 'docker://'
#             else:
#                 raise NotImplementedError('Registry {} not supported'.format(task.container.registry))
#      
#             run_command+='{}{} &>> \$HOME/{}.log & echo \$!\'"'.format(registry, task.container.image, task.uuid)
# 
#         else:
#             raise NotImplementedError('Container {} not supported'.format(task.container.type))
# 
#         out = os_shell(run_command, capture=True)
#         if out.exit_code != 0:
#             raise Exception(out.stderr)
#         
#         # Log        
#         logger.debug('Shell exec output: "{}"'.format(out))
# 
# 
#         # Load back the task to avoid  concurrency problems in the agent call
#         task_uuid = task.uuid
#         task = Task.objects.get(uuid=task_uuid)
# 
#         # Save pid echoed by the command above
#         task_pid = out.stdout
# 
#         # Set fields
#         task.status = TaskStatuses.running
#         task.pid = task_pid
#         task.ip  = second_host
#  
#         # Save
#         task.save()
# 
# 
#     def _stop_task(self, task, **kwargs):
# 
#         # Get user keys
#         if self.computing.requires_user_keys:
#             user_keys = KeyPair.objects.get(user=task.user, default=True)
#         else:
#             raise NotImplementedError('Remote tasks not requiring keys are not yet supported')
# 
#         # Get computing params
#         first_host = self.computing.conf.get('first_host')
#         first_user = self.computing.conf.get('first_user')
#         second_host = self.computing.conf.get('second_host')
#         second_user = self.computing.conf.get('second_user')
# 
#         # Stop the task remotely
#         stop_command  = 'ssh -o LogLevel=ERROR -i {} -4 -o StrictHostKeyChecking=no {}@{} '.format(user_keys.private_key_file, first_user, first_host)
#         stop_command += '"ssh -4 -o StrictHostKeyChecking=no {}@{} '.format(second_user, second_host)
#         stop_command += 'kill -9 {}"'.format(task.pid)
# 
#         out = os_shell(stop_command, capture=True)
#         if out.exit_code != 0:
#             if not 'No such process' in out.stderr:
#                 raise Exception(out.stderr)
# 
#         # Set task as stopped
#         task.status = TaskStatuses.stopped
#         task.save()
# 
# 
#     def _get_task_log(self, task, **kwargs):
#         
#         # Get user keys
#         if self.computing.requires_user_keys:
#             user_keys = KeyPair.objects.get(user=task.user, default=True)
#         else:
#             raise NotImplementedError('Remote tasks not requiring keys are not yet supported')
# 
#         # Get computing params
#         first_host = self.computing.conf.get('first_host')
#         first_user = self.computing.conf.get('first_user')
#         second_host = self.computing.conf.get('second_host')
#         second_user = self.computing.conf.get('second_user')
# 
#         # View log remotely
#         view_log_command  = 'ssh -o LogLevel=ERROR -i {} -4 -o StrictHostKeyChecking=no {}@{} '.format(user_keys.private_key_file, first_user, first_host)
#         view_log_command += '"ssh -4 -o StrictHostKeyChecking=no {}@{} '.format(second_user, second_host)
#         view_log_command += 'cat \\\\\\$HOME/{}.log"'.format(task.uuid)
# 
#         out = os_shell(view_log_command, capture=True)
#         if out.exit_code != 0:
#             raise Exception(out.stderr)
#         else:
#             return out.stdout



+20 −17
Original line number Diff line number Diff line
@@ -209,11 +209,12 @@ class Command(BaseCommand):
            print('Creating demo computing resources containers...')

            #==============================
            #  Local remote computing
            #  Demo Internal computing
            #==============================
            Computing.objects.create(user = None,
                                     name = 'Local',
                                     type = 'local',
                                     name = 'Demo Internal',
                                     type = 'singlenode',
                                     access_method = 'internal',
                                     requires_sys_conf  = False,
                                     requires_user_conf = False,
                                     requires_user_keys = False,
@@ -222,32 +223,34 @@ class Command(BaseCommand):


            #==============================
            # Demo remote computing 
            # Demo Single Node computing 
            #==============================    
            demo_remote_auth_computing = Computing.objects.create(user = None,
                                                             name = 'Demo remote',
                                                             type = 'remote',
            demo_singlenode_computing = Computing.objects.create(user = None,
                                                                 name = 'Demo Single Node',
                                                                 type = 'singlenode',
                                                                 access_method = 'ssh',
                                                                 requires_sys_conf  = True,
                                                                 requires_user_conf = True,
                                                                 requires_user_keys = True,
                                                                 supports_docker = True,
                                                                 supports_singularity = True)
    
            ComputingSysConf.objects.create(computing = demo_remote_auth_computing,
            ComputingSysConf.objects.create(computing = demo_singlenode_computing,
                                            data      = {'host': 'slurmclusterworker-one',
                                                         'binds': '/shared/data/users:/shared/data/users,/shared/scratch:/shared/scratch'})

            ComputingUserConf.objects.create(user      = testuser,
                                             computing = demo_remote_auth_computing,
                                             computing = demo_singlenode_computing,
                                             data      = {'user': 'slurmtestuser'})
         

            #==============================
            #  Demo Slurm computing
            #  Demo Cluster computing
            #==============================
            demo_slurm_computing = Computing.objects.create(user = None,
                                                            name = 'Demo Slurm',
                                                            type = 'slurm',
                                                            name = 'Demo Cluster',
                                                            type = 'cluster',
                                                            access_method = 'slurm+ssh',
                                                            requires_sys_conf  = True,
                                                            requires_user_conf = True,
                                                            requires_user_keys = True,
+30 −0
Original line number Diff line number Diff line
# Generated by Django 2.2.1 on 2021-04-08 10:41

from django.db import migrations, models
import django.db.models.deletion


class Migration(migrations.Migration):

    dependencies = [
        ('core_app', '0003_text'),
    ]

    operations = [
        migrations.AddField(
            model_name='computing',
            name='access_method',
            field=models.CharField(default='NA', max_length=255, verbose_name='Computing Access method'),
            preserve_default=False,
        ),
        migrations.AlterField(
            model_name='computingsysconf',
            name='computing',
            field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='related_sys_conf', to='core_app.Computing'),
        ),
        migrations.AlterField(
            model_name='computinguserconf',
            name='computing',
            field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='related_user_conf', to='core_app.Computing'),
        ),
    ]
+26 −7
Original line number Diff line number Diff line
@@ -139,6 +139,7 @@ class Computing(models.Model):
    
    name = models.CharField('Computing Name', max_length=255, blank=False, null=False)
    type = models.CharField('Computing Type', max_length=255, blank=False, null=False)
    access_method = models.CharField('Computing Access method', max_length=255, blank=False, null=False)

    requires_sys_conf  = models.BooleanField(default=False)
    requires_user_conf = models.BooleanField(default=False)
@@ -147,6 +148,24 @@ class Computing(models.Model):
    supports_docker  = models.BooleanField(default=False)
    supports_singularity  = models.BooleanField(default=False)

    @property
    def type_str(self):
        # TODO: improve me?
        if self.type == 'cluster':
            return 'Cluster'
        elif self.type == 'singlenode':
            return 'Single Node'
        else:
            raise ConsistencyException('Unknown computing resource type "{}"'.format(self.type))

    @property
    def access_method_str(self):
        # TODO: improve me?
        access_method = self.access_method
        access_method = access_method.replace('ssh', 'SSH')
        access_method = access_method.replace('slurm', 'Slurm')
        return access_method

    class Meta:
        ordering = ['name']

@@ -179,14 +198,14 @@ class Computing(models.Model):
        try:
            return self._manager
        except AttributeError:
            if self.type == 'local':
                self._manager = computing_managers.LocalComputingManager(self)
            elif self.type == 'remote':
                self._manager = computing_managers.RemoteComputingManager(self)            
            elif self.type == 'slurm':
                self._manager = computing_managers.SlurmComputingManager(self)
            if self.type == 'cluster' and self.access_method == 'slurm+ssh':
                self._manager = computing_managers.SlurmSSHClusterComputingManager(self)
            elif self.type == 'singlenode' and self.access_method == 'ssh':
                self._manager = computing_managers.SSHSingleNodeComputingManager(self)            
            elif self.type == 'singlenode' and self.access_method == 'internal':
                self._manager = computing_managers.InternalSingleNodeComputingManager(self)
            else:
                raise ConsistencyException('Don\'t know how to instantiate a computing manager for computing resource of type "{}"'.format(self.type))
                raise ConsistencyException('Don\'t know how to instantiate a computing manager for computing resource of type "{}" and access mode "{}"'.format(self.type, self.access_method))
            return self._manager
    
    
Loading