Loading services/webapp/code/rosetta/core_app/admin.py +2 −2 Original line number Diff line number Diff line from django.contrib import admin from .models import Profile, LoginToken, Task, Container, Computing, ComputingSysConf, ComputingUserConf, Storage, KeyPair, Text from .models import Profile, LoginToken, Task, Container, Computing, ComputingConf, ComputingUserConf, Storage, KeyPair, Text admin.site.register(Profile) admin.site.register(LoginToken) admin.site.register(Task) admin.site.register(Container) admin.site.register(Computing) admin.site.register(ComputingSysConf) admin.site.register(ComputingConf) admin.site.register(ComputingUserConf) admin.site.register(Storage) admin.site.register(KeyPair) Loading services/webapp/code/rosetta/core_app/api.py +14 −33 Original line number Diff line number Diff line Loading @@ -11,7 +11,7 @@ from django.conf import settings from rest_framework.response import Response from rest_framework import status, serializers, viewsets from rest_framework.views import APIView from .utils import format_exception, send_email, os_shell, now_t from .utils import format_exception, send_email, os_shell, now_t, get_ssh_access_mode_credentials from .models import Profile, Task, TaskStatuses, Computing, Storage, KeyPair from .exceptions import ConsistencyException import json Loading Loading @@ -369,50 +369,27 @@ class FileManagerAPI(PrivateGETAPI, PrivatePOSTAPI): else: dest = dest.replace('\ ', '\\\\\\ ') # Get user key user_keys = KeyPair.objects.get(user=user, default=True) # Get computing host computing.attach_user_conf(user) computing_host = computing.conf.get('host') computing_user = computing.conf.get('user') if not computing_host: raise Exception('No computing host?!') if not computing_user: raise Exception('No computing user?!') # Get credentials computing_user, computing_host, computing_keys = get_ssh_access_mode_credentials(computing, user) # Command if mode=='get': command = 'scp -o LogLevel=ERROR -i {} -4 -o StrictHostKeyChecking=no {}@{}:{} {}'.format(user_keys.private_key_file, computing_user, computing_host, source, dest) command = 'scp -o LogLevel=ERROR -i {} -4 -o StrictHostKeyChecking=no {}@{}:{} {}'.format(computing_keys.private_key_file, computing_user, computing_host, source, dest) elif mode == 'put': command = 'scp -o LogLevel=ERROR -i {} -4 -o StrictHostKeyChecking=no {} {}@{}:{}'.format(user_keys.private_key_file, source, computing_user, computing_host, dest) command = 'scp -o LogLevel=ERROR -i {} -4 -o StrictHostKeyChecking=no {} {}@{}:{}'.format(computing_keys.private_key_file, source, computing_user, computing_host, dest) else: raise ValueError('Unknown mode "{}"'.format(mode)) return command def ssh_command(self, command, user, computing): # Get user key user_keys = KeyPair.objects.get(user=user, default=True) # Get computing host computing.attach_user_conf(user) computing_host = computing.conf.get('host') computing_user = computing.conf.get('user') if not computing_host: raise Exception('No computing host?!') if not computing_user: raise Exception('No computing user?!') # Get credentials computing_user, computing_host, computing_keys = get_ssh_access_mode_credentials(computing, user) # Command command = 'ssh -o LogLevel=ERROR -i {} -4 -o StrictHostKeyChecking=no {}@{} "{}"'.format(user_keys.private_key_file, computing_user, computing_host, command) command = 'ssh -o LogLevel=ERROR -i {} -4 -o StrictHostKeyChecking=no {}@{} "{}"'.format(computing_keys.private_key_file, computing_user, computing_host, command) return command Loading Loading @@ -441,7 +418,11 @@ class FileManagerAPI(PrivateGETAPI, PrivatePOSTAPI): if storage.access_through_computing: computing = storage.computing computing.attach_user_conf(user) if computing.auth_mode == 'user_keys': base_path_expanded = base_path_expanded.replace('$SSH_USER', computing.user_conf.get('user')) else: base_path_expanded = base_path_expanded.replace('$SSH_USER', computing.conf.get('user')) else: raise NotImplementedError('Accessing a storage with ssh+cli without going through its computing resource is not implemented') if '$USER' in base_path_expanded: Loading services/webapp/code/rosetta/core_app/computing_managers.py +20 −242 Original line number Diff line number Diff line from .models import TaskStatuses, KeyPair, Task, Storage from .utils import os_shell from .utils import os_shell, get_ssh_access_mode_credentials from .exceptions import ErrorMessage, ConsistencyException from django.conf import settings Loading Loading @@ -180,15 +180,8 @@ class SSHSingleNodeComputingManager(SingleNodeComputingManager, SSHComputingMana def _start_task(self, task, **kwargs): logger.debug('Starting a remote task "{}"'.format(self.computing)) # Get computing user and host computing_user = self.computing.conf.get('user') computing_host = self.computing.conf.get('host') # Get user keys if self.computing.requires_user_keys: user_keys = KeyPair.objects.get(user=task.user, default=True) else: raise NotImplementedError('Remote tasks not requiring keys are not yet supported') # Get credentials computing_user, computing_host, computing_keys = get_ssh_access_mode_credentials(self.computing, task.user) # Get webapp conn string from.utils import get_webapp_conn_string Loading Loading @@ -238,7 +231,7 @@ class SSHSingleNodeComputingManager(SingleNodeComputingManager, SSHComputingMana else: binds += ',{}:{}'.format(expanded_base_path, expanded_bind_path) run_command = 'ssh -o LogLevel=ERROR -i {} -4 -o StrictHostKeyChecking=no {}@{} '.format(user_keys.private_key_file, computing_user, computing_host) run_command = 'ssh -o LogLevel=ERROR -i {} -4 -o StrictHostKeyChecking=no {}@{} '.format(computing_keys.private_key_file, computing_user, computing_host) run_command += '/bin/bash -c \'"rm -rf /tmp/{}_data && mkdir -p /tmp/{}_data/tmp && mkdir -p /tmp/{}_data/home && chmod 700 /tmp/{}_data && '.format(task.uuid, task.uuid, task.uuid, task.uuid) run_command += 'wget {}/api/v1/base/agent/?task_uuid={} -O /tmp/{}_data/agent.py &> /dev/null && export BASE_PORT=\$(python /tmp/{}_data/agent.py 2> /tmp/{}_data/task.log) && '.format(webapp_conn_string, task.uuid, task.uuid, task.uuid, task.uuid) run_command += 'export SINGULARITY_NOHTTPS=true && export SINGULARITYENV_BASE_PORT=\$BASE_PORT && {} '.format(authstring) Loading Loading @@ -271,18 +264,11 @@ class SSHSingleNodeComputingManager(SingleNodeComputingManager, SSHComputingMana def _stop_task(self, task, **kwargs): # Get user keys if self.computing.requires_user_keys: user_keys = KeyPair.objects.get(user=task.user, default=True) else: raise NotImplementedError('Remote tasks not requiring keys are not yet supported') # Get computing host host = self.computing.conf.get('host') user = self.computing.conf.get('user') # Get credentials computing_user, computing_host, computing_keys = get_ssh_access_mode_credentials(self.computing, task.user) # Stop the task remotely stop_command = 'ssh -o LogLevel=ERROR -i {} -4 -o StrictHostKeyChecking=no {}@{} \'/bin/bash -c "kill -9 {}"\''.format(user_keys.private_key_file, user, host, task.id) stop_command = 'ssh -o LogLevel=ERROR -i {} -4 -o StrictHostKeyChecking=no {}@{} \'/bin/bash -c "kill -9 {}"\''.format(computing_keys.private_key_file, computing_user, computing_host, task.id) out = os_shell(stop_command, capture=True) if out.exit_code != 0: if not 'No such process' in out.stderr: Loading @@ -295,18 +281,11 @@ class SSHSingleNodeComputingManager(SingleNodeComputingManager, SSHComputingMana def _get_task_log(self, task, **kwargs): # Get user keys if self.computing.requires_user_keys: user_keys = KeyPair.objects.get(user=task.user, default=True) else: raise NotImplementedError('Remote tasks not requiring keys are not yet supported') # Get computing host host = self.computing.conf.get('host') user = self.computing.conf.get('user') # Get credentials computing_user, computing_host, computing_keys = get_ssh_access_mode_credentials(self.computing, task.user) # View log remotely view_log_command = 'ssh -o LogLevel=ERROR -i {} -4 -o StrictHostKeyChecking=no {}@{} \'/bin/bash -c "cat /tmp/{}_data/task.log"\''.format(user_keys.private_key_file, user, host, task.uuid) view_log_command = 'ssh -o LogLevel=ERROR -i {} -4 -o StrictHostKeyChecking=no {}@{} \'/bin/bash -c "cat /tmp/{}_data/task.log"\''.format(computing_keys.private_key_file, computing_user, computing_host, task.uuid) out = os_shell(view_log_command, capture=True) if out.exit_code != 0: Loading @@ -321,15 +300,8 @@ class SlurmSSHClusterComputingManager(ClusterComputingManager, SSHComputingManag def _start_task(self, task, **kwargs): logger.debug('Starting a remote task "{}"'.format(self.computing)) # Get computing host computing_host = self.computing.conf.get('host') computing_user = self.computing.conf.get('user') # Get user keys if self.computing.requires_user_keys: user_keys = KeyPair.objects.get(user=task.user, default=True) else: raise NotImplementedError('Remote tasks not requiring keys are not yet supported') # Get credentials computing_user, computing_host, computing_keys = get_ssh_access_mode_credentials(self.computing, task.user) # Get webapp conn string from.utils import get_webapp_conn_string Loading Loading @@ -399,7 +371,7 @@ class SlurmSSHClusterComputingManager(ClusterComputingManager, SSHComputingManag else: binds += ',{}:{}'.format(expanded_base_path, expanded_bind_path) run_command = 'ssh -o LogLevel=ERROR -i {} -4 -o StrictHostKeyChecking=no {}@{} '.format(user_keys.private_key_file, computing_user, computing_host) run_command = 'ssh -o LogLevel=ERROR -i {} -4 -o StrictHostKeyChecking=no {}@{} '.format(computing_keys.private_key_file, computing_user, computing_host) run_command += '\'bash -c "echo \\"#!/bin/bash\nwget {}/api/v1/base/agent/?task_uuid={} -O \$HOME/agent_{}.py &> \$HOME/{}.log && export BASE_PORT=\\\\\\$(python \$HOME/agent_{}.py 2> \$HOME/{}.log) && '.format(webapp_conn_string, task.uuid, task.uuid, task.uuid, task.uuid, task.uuid) run_command += 'export SINGULARITY_NOHTTPS=true && export SINGULARITYENV_BASE_PORT=\\\\\\$BASE_PORT && {} '.format(authstring) run_command += 'rm -rf /tmp/{}_data && mkdir -p /tmp/{}_data/tmp &>> \$HOME/{}.log && mkdir -p /tmp/{}_data/home &>> \$HOME/{}.log && chmod 700 /tmp/{}_data && '.format(task.uuid, task.uuid, task.uuid, task.uuid, task.uuid, task.uuid) Loading Loading @@ -442,18 +414,11 @@ class SlurmSSHClusterComputingManager(ClusterComputingManager, SSHComputingManag def _stop_task(self, task, **kwargs): # Get user keys if self.computing.requires_user_keys: user_keys = KeyPair.objects.get(user=task.user, default=True) else: raise NotImplementedError('Remote tasks not requiring keys are not yet supported') # Get computing host host = self.computing.conf.get('host') user = self.computing.conf.get('user') # Get credentials computing_user, computing_host, computing_keys = get_ssh_access_mode_credentials(self.computing, task.user) # Stop the task remotely stop_command = 'ssh -o LogLevel=ERROR -i {} -4 -o StrictHostKeyChecking=no {}@{} \'/bin/bash -c "scancel {}"\''.format(user_keys.private_key_file, user, host, task.id) stop_command = 'ssh -o LogLevel=ERROR -i {} -4 -o StrictHostKeyChecking=no {}@{} \'/bin/bash -c "scancel {}"\''.format(computing_keys.private_key_file, computing_user, computing_host, task.id) out = os_shell(stop_command, capture=True) if out.exit_code != 0: raise Exception(out.stderr) Loading @@ -465,18 +430,11 @@ class SlurmSSHClusterComputingManager(ClusterComputingManager, SSHComputingManag def _get_task_log(self, task, **kwargs): # Get user keys if self.computing.requires_user_keys: user_keys = KeyPair.objects.get(user=task.user, default=True) else: raise NotImplementedError('Remote tasks not requiring keys are not yet supported') # Get computing host host = self.computing.conf.get('host') user = self.computing.conf.get('user') # Get credentials computing_user, computing_host, computing_keys = get_ssh_access_mode_credentials(self.computing, task.user) # View log remotely view_log_command = 'ssh -o LogLevel=ERROR -i {} -4 -o StrictHostKeyChecking=no {}@{} \'/bin/bash -c "cat \$HOME/{}.log"\''.format(user_keys.private_key_file, user, host, task.uuid) view_log_command = 'ssh -o LogLevel=ERROR -i {} -4 -o StrictHostKeyChecking=no {}@{} \'/bin/bash -c "cat \$HOME/{}.log"\''.format(computing_keys.private_key_file, computing_user, computing_host, task.uuid) out = os_shell(view_log_command, capture=True) if out.exit_code != 0: Loading @@ -485,183 +443,3 @@ class SlurmSSHClusterComputingManager(ClusterComputingManager, SSHComputingManag return out.stdout # TODO: rename the following as "ssh+ssh" access mode? Ore somethign similar? # class RemotehopComputingManager(ComputingManager): # # def _start_task(self, task, **kwargs): # logger.debug('Starting a remote task "{}"'.format(self.computing)) # # # Get computing params # first_host = self.computing.conf.get('first_host') # first_user = self.computing.conf.get('first_user') # second_host = self.computing.conf.get('second_host') # second_user = self.computing.conf.get('second_user') # setup_command = self.computing.conf.get('setup_command') # # # TODO: De hard-code # use_agent = False # # # Get user keys # if self.computing.requires_user_keys: # user_keys = KeyPair.objects.get(user=task.user, default=True) # else: # raise NotImplementedError('Remote tasks not requiring keys are not yet supported') # # # Get webapp conn string # from.utils import get_webapp_conn_string # webapp_conn_string = get_webapp_conn_string() # # # Run the container on the host (non blocking) # if task.container.type == 'singularity': # # task.tid = task.uuid # task.save() # # # Set pass if any # if task.auth_pass: # authstring = ' export SINGULARITYENV_AUTH_PASS={} && '.format(task.auth_pass) # else: # authstring = '' # # # Set binds, only from sys config if the resource is not owned by the user # if self.computing.user != task.user: # binds = self.computing.sys_conf.get('binds') # else: # binds = self.computing.conf.get('binds') # if not binds: # binds = '' # else: # binds = '-B {}'.format(binds) # # # Manage task extra binds # if task.extra_binds: # if not binds: # binds = '-B {}'.format(task.extra_binds) # else: # binds += ',{}'.format(task.extra_binds) # # run_command = 'ssh -o LogLevel=ERROR -i {} -4 -o StrictHostKeyChecking=no {}@{} '.format(user_keys.private_key_file, first_user, first_host) # run_command += '"ssh -4 -o StrictHostKeyChecking=no {}@{} /bin/bash -c \''.format(second_user, second_host) # # if use_agent: # run_command += '\'wget {}/api/v1/base/agent/?task_uuid={} -O \$HOME/agent_{}.py &> /dev/null && export BASE_PORT=\$(python \$HOME/agent_{}.py 2> \$HOME/{}.log) && '.format(webapp_conn_string, task.uuid, task.uuid, task.uuid, task.uuid) # if setup_command: # run_command += setup_command + ' && ' # run_command += '\'export SINGULARITY_NOHTTPS=true && export SINGULARITYENV_BASE_PORT=\$BASE_PORT && {} '.format(authstring) # run_command += 'rm -rf /tmp/{}_data && mkdir -p /tmp/{}_data/tmp &>> \$HOME/{}.log && mkdir -p /tmp/{}_data/home &>> \$HOME/{}.log && chmod 700 /tmp/{}_data && '.format(task.uuid, task.uuid, task.uuid, task.uuid, task.uuid, task.uuid) # run_command += 'exec nohup singularity run {} --pid --writable-tmpfs --no-home --home=/home/metauser --workdir /tmp/{}_data/tmp -B/tmp/{}_data/home:/home --containall --cleanenv '.format(binds, task.uuid, task.uuid) # else: # run_command += ' : && ' # Trick to prevent some issues in exporting variables # if setup_command: # run_command += setup_command + ' && ' # run_command += 'export SINGULARITY_NOHTTPS=true && export SINGULARITYENV_BASE_PORT={} && {} '.format(task.port, authstring) # run_command += 'rm -rf /tmp/{}_data && mkdir -p /tmp/{}_data/tmp &>> \$HOME/{}.log && mkdir -p /tmp/{}_data/home &>> \$HOME/{}.log && chmod 700 /tmp/{}_data && '.format(task.uuid, task.uuid, task.uuid, task.uuid, task.uuid, task.uuid) # run_command += 'exec nohup singularity run {} --pid --writable-tmpfs --no-home --home=/home/metauser --workdir /tmp/{}_data/tmp -B/tmp/{}_data/home:/home --containall --cleanenv '.format(binds, task.uuid, task.uuid) # # # Set registry # if task.container.registry == 'docker_local': # raise Exception('This computing resource does not support local Docker registries yet') # # Get local Docker registry conn string # from.utils import get_local_docker_registry_conn_string # local_docker_registry_conn_string = get_local_docker_registry_conn_string() # registry = 'docker://{}/'.format(local_docker_registry_conn_string) # elif task.container.registry == 'docker_hub': # registry = 'docker://' # else: # raise NotImplementedError('Registry {} not supported'.format(task.container.registry)) # # run_command+='{}{} &>> \$HOME/{}.log & echo \$!\'"'.format(registry, task.container.image, task.uuid) # # else: # raise NotImplementedError('Container {} not supported'.format(task.container.type)) # # out = os_shell(run_command, capture=True) # if out.exit_code != 0: # raise Exception(out.stderr) # # # Log # logger.debug('Shell exec output: "{}"'.format(out)) # # # # Load back the task to avoid concurrency problems in the agent call # task_uuid = task.uuid # task = Task.objects.get(uuid=task_uuid) # # # Save pid echoed by the command above # task_pid = out.stdout # # # Set fields # task.status = TaskStatuses.running # task.pid = task_pid # task.ip = second_host # # # Save # task.save() # # # def _stop_task(self, task, **kwargs): # # # Get user keys # if self.computing.requires_user_keys: # user_keys = KeyPair.objects.get(user=task.user, default=True) # else: # raise NotImplementedError('Remote tasks not requiring keys are not yet supported') # # # Get computing params # first_host = self.computing.conf.get('first_host') # first_user = self.computing.conf.get('first_user') # second_host = self.computing.conf.get('second_host') # second_user = self.computing.conf.get('second_user') # # # Stop the task remotely # stop_command = 'ssh -o LogLevel=ERROR -i {} -4 -o StrictHostKeyChecking=no {}@{} '.format(user_keys.private_key_file, first_user, first_host) # stop_command += '"ssh -4 -o StrictHostKeyChecking=no {}@{} '.format(second_user, second_host) # stop_command += 'kill -9 {}"'.format(task.pid) # # out = os_shell(stop_command, capture=True) # if out.exit_code != 0: # if not 'No such process' in out.stderr: # raise Exception(out.stderr) # # # Set task as stopped # task.status = TaskStatuses.stopped # task.save() # # # def _get_task_log(self, task, **kwargs): # # # Get user keys # if self.computing.requires_user_keys: # user_keys = KeyPair.objects.get(user=task.user, default=True) # else: # raise NotImplementedError('Remote tasks not requiring keys are not yet supported') # # # Get computing params # first_host = self.computing.conf.get('first_host') # first_user = self.computing.conf.get('first_user') # second_host = self.computing.conf.get('second_host') # second_user = self.computing.conf.get('second_user') # # # View log remotely # view_log_command = 'ssh -o LogLevel=ERROR -i {} -4 -o StrictHostKeyChecking=no {}@{} '.format(user_keys.private_key_file, first_user, first_host) # view_log_command += '"ssh -4 -o StrictHostKeyChecking=no {}@{} '.format(second_user, second_host) # view_log_command += 'cat \\\\\\$HOME/{}.log"'.format(task.uuid) # # out = os_shell(view_log_command, capture=True) # if out.exit_code != 0: # raise Exception(out.stderr) # else: # return out.stdout services/webapp/code/rosetta/core_app/management/commands/core_app_populate.py +29 −21 File changed.Preview size limit exceeded, changes collapsed. Show changes services/webapp/code/rosetta/core_app/migrations/0012_remove_task_extra_binds.py 0 → 100644 +17 −0 Original line number Diff line number Diff line # Generated by Django 2.2.1 on 2021-11-08 14:52 from django.db import migrations class Migration(migrations.Migration): dependencies = [ ('core_app', '0011_storage'), ] operations = [ migrations.RemoveField( model_name='task', name='extra_binds', ), ] Loading
services/webapp/code/rosetta/core_app/admin.py +2 −2 Original line number Diff line number Diff line from django.contrib import admin from .models import Profile, LoginToken, Task, Container, Computing, ComputingSysConf, ComputingUserConf, Storage, KeyPair, Text from .models import Profile, LoginToken, Task, Container, Computing, ComputingConf, ComputingUserConf, Storage, KeyPair, Text admin.site.register(Profile) admin.site.register(LoginToken) admin.site.register(Task) admin.site.register(Container) admin.site.register(Computing) admin.site.register(ComputingSysConf) admin.site.register(ComputingConf) admin.site.register(ComputingUserConf) admin.site.register(Storage) admin.site.register(KeyPair) Loading
services/webapp/code/rosetta/core_app/api.py +14 −33 Original line number Diff line number Diff line Loading @@ -11,7 +11,7 @@ from django.conf import settings from rest_framework.response import Response from rest_framework import status, serializers, viewsets from rest_framework.views import APIView from .utils import format_exception, send_email, os_shell, now_t from .utils import format_exception, send_email, os_shell, now_t, get_ssh_access_mode_credentials from .models import Profile, Task, TaskStatuses, Computing, Storage, KeyPair from .exceptions import ConsistencyException import json Loading Loading @@ -369,50 +369,27 @@ class FileManagerAPI(PrivateGETAPI, PrivatePOSTAPI): else: dest = dest.replace('\ ', '\\\\\\ ') # Get user key user_keys = KeyPair.objects.get(user=user, default=True) # Get computing host computing.attach_user_conf(user) computing_host = computing.conf.get('host') computing_user = computing.conf.get('user') if not computing_host: raise Exception('No computing host?!') if not computing_user: raise Exception('No computing user?!') # Get credentials computing_user, computing_host, computing_keys = get_ssh_access_mode_credentials(computing, user) # Command if mode=='get': command = 'scp -o LogLevel=ERROR -i {} -4 -o StrictHostKeyChecking=no {}@{}:{} {}'.format(user_keys.private_key_file, computing_user, computing_host, source, dest) command = 'scp -o LogLevel=ERROR -i {} -4 -o StrictHostKeyChecking=no {}@{}:{} {}'.format(computing_keys.private_key_file, computing_user, computing_host, source, dest) elif mode == 'put': command = 'scp -o LogLevel=ERROR -i {} -4 -o StrictHostKeyChecking=no {} {}@{}:{}'.format(user_keys.private_key_file, source, computing_user, computing_host, dest) command = 'scp -o LogLevel=ERROR -i {} -4 -o StrictHostKeyChecking=no {} {}@{}:{}'.format(computing_keys.private_key_file, source, computing_user, computing_host, dest) else: raise ValueError('Unknown mode "{}"'.format(mode)) return command def ssh_command(self, command, user, computing): # Get user key user_keys = KeyPair.objects.get(user=user, default=True) # Get computing host computing.attach_user_conf(user) computing_host = computing.conf.get('host') computing_user = computing.conf.get('user') if not computing_host: raise Exception('No computing host?!') if not computing_user: raise Exception('No computing user?!') # Get credentials computing_user, computing_host, computing_keys = get_ssh_access_mode_credentials(computing, user) # Command command = 'ssh -o LogLevel=ERROR -i {} -4 -o StrictHostKeyChecking=no {}@{} "{}"'.format(user_keys.private_key_file, computing_user, computing_host, command) command = 'ssh -o LogLevel=ERROR -i {} -4 -o StrictHostKeyChecking=no {}@{} "{}"'.format(computing_keys.private_key_file, computing_user, computing_host, command) return command Loading Loading @@ -441,7 +418,11 @@ class FileManagerAPI(PrivateGETAPI, PrivatePOSTAPI): if storage.access_through_computing: computing = storage.computing computing.attach_user_conf(user) if computing.auth_mode == 'user_keys': base_path_expanded = base_path_expanded.replace('$SSH_USER', computing.user_conf.get('user')) else: base_path_expanded = base_path_expanded.replace('$SSH_USER', computing.conf.get('user')) else: raise NotImplementedError('Accessing a storage with ssh+cli without going through its computing resource is not implemented') if '$USER' in base_path_expanded: Loading
services/webapp/code/rosetta/core_app/computing_managers.py +20 −242 Original line number Diff line number Diff line from .models import TaskStatuses, KeyPair, Task, Storage from .utils import os_shell from .utils import os_shell, get_ssh_access_mode_credentials from .exceptions import ErrorMessage, ConsistencyException from django.conf import settings Loading Loading @@ -180,15 +180,8 @@ class SSHSingleNodeComputingManager(SingleNodeComputingManager, SSHComputingMana def _start_task(self, task, **kwargs): logger.debug('Starting a remote task "{}"'.format(self.computing)) # Get computing user and host computing_user = self.computing.conf.get('user') computing_host = self.computing.conf.get('host') # Get user keys if self.computing.requires_user_keys: user_keys = KeyPair.objects.get(user=task.user, default=True) else: raise NotImplementedError('Remote tasks not requiring keys are not yet supported') # Get credentials computing_user, computing_host, computing_keys = get_ssh_access_mode_credentials(self.computing, task.user) # Get webapp conn string from.utils import get_webapp_conn_string Loading Loading @@ -238,7 +231,7 @@ class SSHSingleNodeComputingManager(SingleNodeComputingManager, SSHComputingMana else: binds += ',{}:{}'.format(expanded_base_path, expanded_bind_path) run_command = 'ssh -o LogLevel=ERROR -i {} -4 -o StrictHostKeyChecking=no {}@{} '.format(user_keys.private_key_file, computing_user, computing_host) run_command = 'ssh -o LogLevel=ERROR -i {} -4 -o StrictHostKeyChecking=no {}@{} '.format(computing_keys.private_key_file, computing_user, computing_host) run_command += '/bin/bash -c \'"rm -rf /tmp/{}_data && mkdir -p /tmp/{}_data/tmp && mkdir -p /tmp/{}_data/home && chmod 700 /tmp/{}_data && '.format(task.uuid, task.uuid, task.uuid, task.uuid) run_command += 'wget {}/api/v1/base/agent/?task_uuid={} -O /tmp/{}_data/agent.py &> /dev/null && export BASE_PORT=\$(python /tmp/{}_data/agent.py 2> /tmp/{}_data/task.log) && '.format(webapp_conn_string, task.uuid, task.uuid, task.uuid, task.uuid) run_command += 'export SINGULARITY_NOHTTPS=true && export SINGULARITYENV_BASE_PORT=\$BASE_PORT && {} '.format(authstring) Loading Loading @@ -271,18 +264,11 @@ class SSHSingleNodeComputingManager(SingleNodeComputingManager, SSHComputingMana def _stop_task(self, task, **kwargs): # Get user keys if self.computing.requires_user_keys: user_keys = KeyPair.objects.get(user=task.user, default=True) else: raise NotImplementedError('Remote tasks not requiring keys are not yet supported') # Get computing host host = self.computing.conf.get('host') user = self.computing.conf.get('user') # Get credentials computing_user, computing_host, computing_keys = get_ssh_access_mode_credentials(self.computing, task.user) # Stop the task remotely stop_command = 'ssh -o LogLevel=ERROR -i {} -4 -o StrictHostKeyChecking=no {}@{} \'/bin/bash -c "kill -9 {}"\''.format(user_keys.private_key_file, user, host, task.id) stop_command = 'ssh -o LogLevel=ERROR -i {} -4 -o StrictHostKeyChecking=no {}@{} \'/bin/bash -c "kill -9 {}"\''.format(computing_keys.private_key_file, computing_user, computing_host, task.id) out = os_shell(stop_command, capture=True) if out.exit_code != 0: if not 'No such process' in out.stderr: Loading @@ -295,18 +281,11 @@ class SSHSingleNodeComputingManager(SingleNodeComputingManager, SSHComputingMana def _get_task_log(self, task, **kwargs): # Get user keys if self.computing.requires_user_keys: user_keys = KeyPair.objects.get(user=task.user, default=True) else: raise NotImplementedError('Remote tasks not requiring keys are not yet supported') # Get computing host host = self.computing.conf.get('host') user = self.computing.conf.get('user') # Get credentials computing_user, computing_host, computing_keys = get_ssh_access_mode_credentials(self.computing, task.user) # View log remotely view_log_command = 'ssh -o LogLevel=ERROR -i {} -4 -o StrictHostKeyChecking=no {}@{} \'/bin/bash -c "cat /tmp/{}_data/task.log"\''.format(user_keys.private_key_file, user, host, task.uuid) view_log_command = 'ssh -o LogLevel=ERROR -i {} -4 -o StrictHostKeyChecking=no {}@{} \'/bin/bash -c "cat /tmp/{}_data/task.log"\''.format(computing_keys.private_key_file, computing_user, computing_host, task.uuid) out = os_shell(view_log_command, capture=True) if out.exit_code != 0: Loading @@ -321,15 +300,8 @@ class SlurmSSHClusterComputingManager(ClusterComputingManager, SSHComputingManag def _start_task(self, task, **kwargs): logger.debug('Starting a remote task "{}"'.format(self.computing)) # Get computing host computing_host = self.computing.conf.get('host') computing_user = self.computing.conf.get('user') # Get user keys if self.computing.requires_user_keys: user_keys = KeyPair.objects.get(user=task.user, default=True) else: raise NotImplementedError('Remote tasks not requiring keys are not yet supported') # Get credentials computing_user, computing_host, computing_keys = get_ssh_access_mode_credentials(self.computing, task.user) # Get webapp conn string from.utils import get_webapp_conn_string Loading Loading @@ -399,7 +371,7 @@ class SlurmSSHClusterComputingManager(ClusterComputingManager, SSHComputingManag else: binds += ',{}:{}'.format(expanded_base_path, expanded_bind_path) run_command = 'ssh -o LogLevel=ERROR -i {} -4 -o StrictHostKeyChecking=no {}@{} '.format(user_keys.private_key_file, computing_user, computing_host) run_command = 'ssh -o LogLevel=ERROR -i {} -4 -o StrictHostKeyChecking=no {}@{} '.format(computing_keys.private_key_file, computing_user, computing_host) run_command += '\'bash -c "echo \\"#!/bin/bash\nwget {}/api/v1/base/agent/?task_uuid={} -O \$HOME/agent_{}.py &> \$HOME/{}.log && export BASE_PORT=\\\\\\$(python \$HOME/agent_{}.py 2> \$HOME/{}.log) && '.format(webapp_conn_string, task.uuid, task.uuid, task.uuid, task.uuid, task.uuid) run_command += 'export SINGULARITY_NOHTTPS=true && export SINGULARITYENV_BASE_PORT=\\\\\\$BASE_PORT && {} '.format(authstring) run_command += 'rm -rf /tmp/{}_data && mkdir -p /tmp/{}_data/tmp &>> \$HOME/{}.log && mkdir -p /tmp/{}_data/home &>> \$HOME/{}.log && chmod 700 /tmp/{}_data && '.format(task.uuid, task.uuid, task.uuid, task.uuid, task.uuid, task.uuid) Loading Loading @@ -442,18 +414,11 @@ class SlurmSSHClusterComputingManager(ClusterComputingManager, SSHComputingManag def _stop_task(self, task, **kwargs): # Get user keys if self.computing.requires_user_keys: user_keys = KeyPair.objects.get(user=task.user, default=True) else: raise NotImplementedError('Remote tasks not requiring keys are not yet supported') # Get computing host host = self.computing.conf.get('host') user = self.computing.conf.get('user') # Get credentials computing_user, computing_host, computing_keys = get_ssh_access_mode_credentials(self.computing, task.user) # Stop the task remotely stop_command = 'ssh -o LogLevel=ERROR -i {} -4 -o StrictHostKeyChecking=no {}@{} \'/bin/bash -c "scancel {}"\''.format(user_keys.private_key_file, user, host, task.id) stop_command = 'ssh -o LogLevel=ERROR -i {} -4 -o StrictHostKeyChecking=no {}@{} \'/bin/bash -c "scancel {}"\''.format(computing_keys.private_key_file, computing_user, computing_host, task.id) out = os_shell(stop_command, capture=True) if out.exit_code != 0: raise Exception(out.stderr) Loading @@ -465,18 +430,11 @@ class SlurmSSHClusterComputingManager(ClusterComputingManager, SSHComputingManag def _get_task_log(self, task, **kwargs): # Get user keys if self.computing.requires_user_keys: user_keys = KeyPair.objects.get(user=task.user, default=True) else: raise NotImplementedError('Remote tasks not requiring keys are not yet supported') # Get computing host host = self.computing.conf.get('host') user = self.computing.conf.get('user') # Get credentials computing_user, computing_host, computing_keys = get_ssh_access_mode_credentials(self.computing, task.user) # View log remotely view_log_command = 'ssh -o LogLevel=ERROR -i {} -4 -o StrictHostKeyChecking=no {}@{} \'/bin/bash -c "cat \$HOME/{}.log"\''.format(user_keys.private_key_file, user, host, task.uuid) view_log_command = 'ssh -o LogLevel=ERROR -i {} -4 -o StrictHostKeyChecking=no {}@{} \'/bin/bash -c "cat \$HOME/{}.log"\''.format(computing_keys.private_key_file, computing_user, computing_host, task.uuid) out = os_shell(view_log_command, capture=True) if out.exit_code != 0: Loading @@ -485,183 +443,3 @@ class SlurmSSHClusterComputingManager(ClusterComputingManager, SSHComputingManag return out.stdout # TODO: rename the following as "ssh+ssh" access mode? Ore somethign similar? # class RemotehopComputingManager(ComputingManager): # # def _start_task(self, task, **kwargs): # logger.debug('Starting a remote task "{}"'.format(self.computing)) # # # Get computing params # first_host = self.computing.conf.get('first_host') # first_user = self.computing.conf.get('first_user') # second_host = self.computing.conf.get('second_host') # second_user = self.computing.conf.get('second_user') # setup_command = self.computing.conf.get('setup_command') # # # TODO: De hard-code # use_agent = False # # # Get user keys # if self.computing.requires_user_keys: # user_keys = KeyPair.objects.get(user=task.user, default=True) # else: # raise NotImplementedError('Remote tasks not requiring keys are not yet supported') # # # Get webapp conn string # from.utils import get_webapp_conn_string # webapp_conn_string = get_webapp_conn_string() # # # Run the container on the host (non blocking) # if task.container.type == 'singularity': # # task.tid = task.uuid # task.save() # # # Set pass if any # if task.auth_pass: # authstring = ' export SINGULARITYENV_AUTH_PASS={} && '.format(task.auth_pass) # else: # authstring = '' # # # Set binds, only from sys config if the resource is not owned by the user # if self.computing.user != task.user: # binds = self.computing.sys_conf.get('binds') # else: # binds = self.computing.conf.get('binds') # if not binds: # binds = '' # else: # binds = '-B {}'.format(binds) # # # Manage task extra binds # if task.extra_binds: # if not binds: # binds = '-B {}'.format(task.extra_binds) # else: # binds += ',{}'.format(task.extra_binds) # # run_command = 'ssh -o LogLevel=ERROR -i {} -4 -o StrictHostKeyChecking=no {}@{} '.format(user_keys.private_key_file, first_user, first_host) # run_command += '"ssh -4 -o StrictHostKeyChecking=no {}@{} /bin/bash -c \''.format(second_user, second_host) # # if use_agent: # run_command += '\'wget {}/api/v1/base/agent/?task_uuid={} -O \$HOME/agent_{}.py &> /dev/null && export BASE_PORT=\$(python \$HOME/agent_{}.py 2> \$HOME/{}.log) && '.format(webapp_conn_string, task.uuid, task.uuid, task.uuid, task.uuid) # if setup_command: # run_command += setup_command + ' && ' # run_command += '\'export SINGULARITY_NOHTTPS=true && export SINGULARITYENV_BASE_PORT=\$BASE_PORT && {} '.format(authstring) # run_command += 'rm -rf /tmp/{}_data && mkdir -p /tmp/{}_data/tmp &>> \$HOME/{}.log && mkdir -p /tmp/{}_data/home &>> \$HOME/{}.log && chmod 700 /tmp/{}_data && '.format(task.uuid, task.uuid, task.uuid, task.uuid, task.uuid, task.uuid) # run_command += 'exec nohup singularity run {} --pid --writable-tmpfs --no-home --home=/home/metauser --workdir /tmp/{}_data/tmp -B/tmp/{}_data/home:/home --containall --cleanenv '.format(binds, task.uuid, task.uuid) # else: # run_command += ' : && ' # Trick to prevent some issues in exporting variables # if setup_command: # run_command += setup_command + ' && ' # run_command += 'export SINGULARITY_NOHTTPS=true && export SINGULARITYENV_BASE_PORT={} && {} '.format(task.port, authstring) # run_command += 'rm -rf /tmp/{}_data && mkdir -p /tmp/{}_data/tmp &>> \$HOME/{}.log && mkdir -p /tmp/{}_data/home &>> \$HOME/{}.log && chmod 700 /tmp/{}_data && '.format(task.uuid, task.uuid, task.uuid, task.uuid, task.uuid, task.uuid) # run_command += 'exec nohup singularity run {} --pid --writable-tmpfs --no-home --home=/home/metauser --workdir /tmp/{}_data/tmp -B/tmp/{}_data/home:/home --containall --cleanenv '.format(binds, task.uuid, task.uuid) # # # Set registry # if task.container.registry == 'docker_local': # raise Exception('This computing resource does not support local Docker registries yet') # # Get local Docker registry conn string # from.utils import get_local_docker_registry_conn_string # local_docker_registry_conn_string = get_local_docker_registry_conn_string() # registry = 'docker://{}/'.format(local_docker_registry_conn_string) # elif task.container.registry == 'docker_hub': # registry = 'docker://' # else: # raise NotImplementedError('Registry {} not supported'.format(task.container.registry)) # # run_command+='{}{} &>> \$HOME/{}.log & echo \$!\'"'.format(registry, task.container.image, task.uuid) # # else: # raise NotImplementedError('Container {} not supported'.format(task.container.type)) # # out = os_shell(run_command, capture=True) # if out.exit_code != 0: # raise Exception(out.stderr) # # # Log # logger.debug('Shell exec output: "{}"'.format(out)) # # # # Load back the task to avoid concurrency problems in the agent call # task_uuid = task.uuid # task = Task.objects.get(uuid=task_uuid) # # # Save pid echoed by the command above # task_pid = out.stdout # # # Set fields # task.status = TaskStatuses.running # task.pid = task_pid # task.ip = second_host # # # Save # task.save() # # # def _stop_task(self, task, **kwargs): # # # Get user keys # if self.computing.requires_user_keys: # user_keys = KeyPair.objects.get(user=task.user, default=True) # else: # raise NotImplementedError('Remote tasks not requiring keys are not yet supported') # # # Get computing params # first_host = self.computing.conf.get('first_host') # first_user = self.computing.conf.get('first_user') # second_host = self.computing.conf.get('second_host') # second_user = self.computing.conf.get('second_user') # # # Stop the task remotely # stop_command = 'ssh -o LogLevel=ERROR -i {} -4 -o StrictHostKeyChecking=no {}@{} '.format(user_keys.private_key_file, first_user, first_host) # stop_command += '"ssh -4 -o StrictHostKeyChecking=no {}@{} '.format(second_user, second_host) # stop_command += 'kill -9 {}"'.format(task.pid) # # out = os_shell(stop_command, capture=True) # if out.exit_code != 0: # if not 'No such process' in out.stderr: # raise Exception(out.stderr) # # # Set task as stopped # task.status = TaskStatuses.stopped # task.save() # # # def _get_task_log(self, task, **kwargs): # # # Get user keys # if self.computing.requires_user_keys: # user_keys = KeyPair.objects.get(user=task.user, default=True) # else: # raise NotImplementedError('Remote tasks not requiring keys are not yet supported') # # # Get computing params # first_host = self.computing.conf.get('first_host') # first_user = self.computing.conf.get('first_user') # second_host = self.computing.conf.get('second_host') # second_user = self.computing.conf.get('second_user') # # # View log remotely # view_log_command = 'ssh -o LogLevel=ERROR -i {} -4 -o StrictHostKeyChecking=no {}@{} '.format(user_keys.private_key_file, first_user, first_host) # view_log_command += '"ssh -4 -o StrictHostKeyChecking=no {}@{} '.format(second_user, second_host) # view_log_command += 'cat \\\\\\$HOME/{}.log"'.format(task.uuid) # # out = os_shell(view_log_command, capture=True) # if out.exit_code != 0: # raise Exception(out.stderr) # else: # return out.stdout
services/webapp/code/rosetta/core_app/management/commands/core_app_populate.py +29 −21 File changed.Preview size limit exceeded, changes collapsed. Show changes
services/webapp/code/rosetta/core_app/migrations/0012_remove_task_extra_binds.py 0 → 100644 +17 −0 Original line number Diff line number Diff line # Generated by Django 2.2.1 on 2021-11-08 14:52 from django.db import migrations class Migration(migrations.Migration): dependencies = [ ('core_app', '0011_storage'), ] operations = [ migrations.RemoveField( model_name='task', name='extra_binds', ), ]