diff --git a/cli/integration/tests/waiter/cli.py b/cli/integration/tests/waiter/cli.py index efda4de49..8eadea158 100644 --- a/cli/integration/tests/waiter/cli.py +++ b/cli/integration/tests/waiter/cli.py @@ -372,3 +372,11 @@ def maintenance(subcommand, token_name, waiter_url=None, flags=None, maintenance args = f"maintenance {subcommand} {token_name} {maintenance_flags or ''}" cp = cli(args, waiter_url, flags, stdin, env=env) return cp + + +def ssh(waiter_url=None, token_or_service_id_or_instance_id=None, ssh_command=None, ssh_flags=None, flags=None, + stdin=None, env=None): + """Attempts to ssh to token, service_id, or instance_id""" + args = f"ssh {ssh_flags or ''} {token_or_service_id_or_instance_id or ''} {ssh_command or ''}" + cp = cli(args, waiter_url, flags, stdin=stdin, env=env) + return cp diff --git a/cli/integration/tests/waiter/test_cli.py b/cli/integration/tests/waiter/test_cli.py index 82d16bd98..20b06da87 100644 --- a/cli/integration/tests/waiter/test_cli.py +++ b/cli/integration/tests/waiter/test_cli.py @@ -1577,3 +1577,180 @@ def test_maintenance_no_sub_command(self): cp_help = cli.maintenance('', '', maintenance_flags='-h') self.assertEqual(0, cp.returncode, cp.stderr) self.assertEqual(cli.stdout(cp_help), cli.stdout(cp)) + + def __test_ssh(self, instance_fn, command_to_run=None, stdin=None, min_instances=1, admin=False, ssh_flags=None, + container_name=None, is_failed_instance=False, test_service=False, test_instance=False, + multiple_services=False, quick=False, expect_no_data=False, expect_no_instances=False, + expect_out_of_range=False): + token_name = self.token_name() + token_fields = util.minimal_service_description() + token_fields['min-instances'] = min_instances + if is_failed_instance: + token_fields['cmd'] = 'this_is_an_invalid_command' + try: + if multiple_services: + token_new_fields = util.minimal_service_description() + util.post_token(self.waiter_url, token_name, token_new_fields) + util.ping_token(self.waiter_url, token_name) + util.post_token(self.waiter_url, token_name, token_fields) + service_id = util.ping_token(self.waiter_url, token_name, + expected_status_code=503 if is_failed_instance else 200) + if is_failed_instance: + goal_fn = lambda insts: 0 < len(insts['failed-instances']) and \ + 0 == len(insts['killed-instances']) + else: + goal_fn = lambda insts: min_instances == len(insts['active-instances']) and \ + 0 == len(insts['failed-instances']) and \ + 0 == len(insts['killed-instances']) + util.wait_until(lambda: util.instances_for_service(self.waiter_url, service_id), goal_fn) + instances = util.instances_for_service(self.waiter_url, service_id) + env = os.environ.copy() + env['WAITER_SSH'] = 'echo' + env['WAITER_KUBECTL'] = 'echo' + if admin: + env['WAITER_ADMIN'] = 'true' + instance = instance_fn(service_id, instances) + ssh_flags = [ssh_flags] if ssh_flags else [] + if quick: + ssh_flags.append('-q') + if container_name: + ssh_flags.append(f'--container-name {container_name}') + if test_instance: + ssh_dest = instance['id'] + ssh_flags.append('-i') + elif test_service: + ssh_dest = service_id + ssh_flags.append('-s') + else: + ssh_dest = token_name + cp = cli.ssh(self.waiter_url, ssh_dest, stdin=stdin, ssh_command=command_to_run, + ssh_flags=' '.join(ssh_flags), + env=env) + if expect_out_of_range: + self.assertEqual(1, cp.returncode, cp.stderr) + self.assertIn('Input is out of range!', cli.stderr(cp)) + elif expect_no_data: + self.assertEqual(1, cp.returncode, cp.stderr) + self.assertIn('No matching data found', cli.stdout(cp)) + elif expect_no_instances: + self.assertEqual(1, cp.returncode, cp.stderr) + self.assertIn(f'There are no relevant instances using service id {service_id}', cli.stdout(cp)) + else: + log_directory = instance['log-directory'] + self.assertEqual(0, cp.returncode, cp.stderr) + if util.using_kubernetes(self.waiter_url): + container_name = container_name or 'waiter-app' + api_server = instance['k8s/api-server-url'] + namespace = instance['k8s/namespace'] + pod_name = instance['k8s/pod-name'] + self.assertIn(f'--server {api_server} --namespace {namespace} exec -it {pod_name} -c ' + f"{container_name} -- /bin/bash -c cd {log_directory}; " + f"{command_to_run or 'exec /bin/bash'}", + cli.stdout(cp)) + else: + self.assertIn(f"-t {instance['host']} cd {log_directory} ; {command_to_run or '/bin/bash'}", + cli.stdout(cp)) + finally: + util.delete_token(self.waiter_url, token_name, kill_services=True) + + def test_ssh_instance_id(self): + self.__test_ssh(lambda _, instances: instances['active-instances'][0], test_instance=True) + + def test_ssh_instance_id_failed_instance(self): + self.__test_ssh(lambda _, instances: instances['failed-instances'][0], is_failed_instance=True, + test_instance=True) + + def test_ssh_instance_id_custom_cmd(self): + self.__test_ssh(lambda _, instances: instances['active-instances'][0], test_instance=True, + command_to_run='ls -al') + + def test_ssh_instance_id_custom_cmd_failed_instance(self): + self.__test_ssh(lambda _, instances: instances['failed-instances'][0], is_failed_instance=True, + test_instance=True, command_to_run='ls -al') + + def test_ssh_instance_id_no_instance(self): + self.__test_ssh(lambda service_id, _: {'id': service_id + '.nonexistent'}, test_instance=True, + expect_no_data=True) + + def test_ssh_instance_id_no_service(self): + instance_id_no_service = "a.a" + cp = cli.ssh(self.waiter_url, instance_id_no_service, ssh_flags='-i') + self.assertEqual(1, cp.returncode, cp.stderr) + self.assertIn('No matching data found', cli.stdout(cp)) + + def test_ssh_service_id_single_instance(self): + self.__test_ssh(lambda _, instances: instances['active-instances'][0], test_service=True) + + def test_ssh_service_id_no_relevant_instances(self): + self.__test_ssh(lambda _, instances: instances['active-instances'][0], test_service=True, + ssh_flags='--no-active', expect_no_instances=True) + + def test_ssh_service_id_multiple_instances(self): + self.__test_ssh(lambda _, instances: instances['active-instances'][0], min_instances=2, + stdin='1\n'.encode('utf8'), test_service=True) + + def test_ssh_service_id_invalid_prompt_input(self): + self.__test_ssh(lambda _, instances: instances['active-instances'][0], min_instances=2, + stdin='-123\n'.encode('utf8'), test_service=True, expect_out_of_range=True) + + def test_ssh_service_id_non_existent_service(self): + service_id = "nonexistent" + cp = cli.ssh(self.waiter_url, service_id, ssh_flags='-s') + self.assertEqual(1, cp.returncode, cp.stderr) + self.assertIn('No matching data found', cli.stdout(cp)) + + def test_ssh_service_id_quick(self): + self.__test_ssh(lambda _, instances: instances['active-instances'][0], min_instances=2, test_service=True, + quick=True) + + def test_ssh_token_single_instance(self): + self.__test_ssh(lambda _, instances: instances['active-instances'][0]) + + def test_ssh_token_multiple_services_sorted(self): + self.__test_ssh(lambda _, instances: instances['active-instances'][0], stdin='1\n'.encode('utf8'), + multiple_services=True) + + def test_ssh_token_multiple_instances(self): + self.__test_ssh(lambda _, instances: instances['active-instances'][0], min_instances=2, + stdin='1\n'.encode('utf8')) + + def test_ssh_token_multiple_services_instances(self): + self.__test_ssh(lambda _, instances: instances['active-instances'][0], min_instances=2, multiple_services=True, + stdin='1\n1\n'.encode('utf8')) + + def test_ssh_token_multiple_services_instances_quick(self): + self.__test_ssh(lambda _, instances: instances['active-instances'][0], min_instances=2, multiple_services=True, + quick=True) + + def test_ssh_token_custom_container(self): + self.__test_ssh(lambda _, instances: instances['active-instances'][0], admin=True, + container_name='waiter-files') + + def test_ssh_token_invalid_token(self): + token_name = "nonexistent" + cp = cli.ssh(self.waiter_url, token_name) + self.assertEqual(1, cp.returncode, cp.stderr) + self.assertIn('No matching data found', cli.stdout(cp)) + + def test_ssh_token_invalid_token_quick(self): + token_name = "nonexistent" + cp = cli.ssh(self.waiter_url, token_name, ssh_flags='-q') + self.assertEqual(1, cp.returncode, cp.stderr) + self.assertIn('The token does not exist. You must create it first.', cli.stderr(cp)) + + def __test_ssh_token_no_services(self, ssh_flags=None): + token_name = self.token_name() + token_fields = util.minimal_service_description() + util.post_token(self.waiter_url, token_name, token_fields) + try: + cp = cli.ssh(self.waiter_url, token_name, ssh_flags=ssh_flags) + self.assertEqual(1, cp.returncode, cp.stderr) + self.assertIn(f'There are no services using token {token_name}', cli.stdout(cp)) + finally: + util.delete_token(self.waiter_url, token_name) + + def test_ssh_token_no_services(self): + self.__test_ssh_token_no_services() + + def test_ssh_token_no_services_quick(self): + self.__test_ssh_token_no_services(ssh_flags='-q') diff --git a/cli/integration/tests/waiter/util.py b/cli/integration/tests/waiter/util.py index b761bcc16..82249bf75 100644 --- a/cli/integration/tests/waiter/util.py +++ b/cli/integration/tests/waiter/util.py @@ -295,6 +295,19 @@ def services_for_token(waiter_url, token_name, assert_response=True, expected_st return services +def instances_for_service(waiter_url, service_id, expected_status_code=200): + """returns instances map of a service""" + headers = { + 'Content-Type': 'application/json', + 'x-cid': cid() + } + response = session.get(f'{waiter_url}/apps/{service_id}', headers=headers) + service = response.json() + assert expected_status_code == response.status_code, \ + f'Expected {expected_status_code}, got {response.status_code} with body {response.text}' + return service['instances'] + + def multi_cluster_tests_enabled(): """ Returns true if the WAITER_TEST_MULTI_CLUSTER environment variable is set to "true", @@ -322,3 +335,16 @@ def wait_until_services_for_token(waiter_url, token_name, expected_num_services) def wait_until_no_services_for_token(waiter_url, token_name): wait_until_services_for_token(waiter_url, token_name, 0) + + +def retrieve_default_scheduler_name(waiter_url): + """gets the scheduler of waiter""" + settings = retrieve_waiter_settings(waiter_url) + kind = settings["scheduler-config"]["kind"] + default_scheduler = settings["scheduler-config"][kind].get("default-scheduler", False) + return default_scheduler or kind + + +def using_kubernetes(waiter_url): + """returns True if the scheduler of waiter is k8s and False otherwise""" + return "kubernetes" == retrieve_default_scheduler_name(waiter_url) diff --git a/cli/waiter/cli.py b/cli/waiter/cli.py index 271068707..f2d9aedd5 100644 --- a/cli/waiter/cli.py +++ b/cli/waiter/cli.py @@ -3,7 +3,8 @@ from urllib.parse import urlparse from waiter import configuration, http_util, metrics, version -from waiter.subcommands import create, delete, init, kill, maintenance, ping, show, tokens, update +from waiter.subcommands import create, delete, init, kill, maintenance, ping, show, ssh, tokens, update +import waiter.plugins as waiter_plugins parser = argparse.ArgumentParser(description='waiter is the Waiter CLI') parser.add_argument('--cluster', '-c', help='the name of the Waiter cluster to use') @@ -40,6 +41,9 @@ 'show': { 'run-function': show.register(subparsers.add_parser) }, + 'ssh': { + 'run-function': ssh.register(subparsers.add_parser) + }, 'tokens': { 'run-function': tokens.register(subparsers.add_parser) }, @@ -105,6 +109,7 @@ def run(args, plugins): url = args.pop('url') logging.debug('plugins: %s', plugins) + waiter_plugins.configure(plugins) if action is None: parser.print_help() diff --git a/cli/waiter/display.py b/cli/waiter/display.py new file mode 100644 index 000000000..82cbd7749 --- /dev/null +++ b/cli/waiter/display.py @@ -0,0 +1,133 @@ +import collections + +from tabulate import tabulate + +from waiter import terminal +from waiter.format import format_last_request_time, format_mem_field, format_memory_amount, format_status +from waiter.util import is_service_current, print_error + + +def retrieve_num_instances(service): + """Returns the total number of instances.""" + instance_counts = service["instance-counts"] + return instance_counts["healthy-instances"] + instance_counts["unhealthy-instances"] + + +def format_using_current_token(service, token_etag, token_name): + """Formats the "Current?" column for the given service""" + is_current = is_service_current(service, token_etag, token_name) + if is_current: + return terminal.success('Current') + else: + return 'Not Current' + + +def format_instance_status(instance): + """Formats the "Healthy?" column for the given instance""" + if instance['healthy?']: + return terminal.success('Healthy') + else: + if instance['_status'] == 'failed': + status = 'Failed' + elif instance['_status'] == 'killed': + status = 'Killed' + else: + status = 'Unhealthy' + return terminal.failed(status) + + +def tabulate_token_services(services, token_name, token_etag=None, show_index=False, summary_table=True, + column_names=[]): + """ + :param services: list of services to be displayed as rows + :param token_name: the token that the services belong to + :param token_etag: token_etag determines if a the service is current, will default to service['etag'] or None + :param show_index: shows index column (usually for future choice prompt) + :param summary_table: provides summary table of services (e.g. Total Memory, CPUS) + :param column_names: column fields to be included in table + :return: tabular output string and sorted services list in descending order by last request time + """ + num_services = len(services) + if num_services > 0: + services = sorted(services, key=lambda s: s.get('last-request-time', None) or '', reverse=True) + rows = [collections.OrderedDict([(key, data) + for key, data in + [('Index', f'[{index + 1}]'), + ('Service Id', s['service-id']), + ('Cluster', s.get('cluster', None)), + ('Run as user', s['effective-parameters']['run-as-user']), + ('Instances', retrieve_num_instances(s)), + ('CPUs', s['effective-parameters']['cpus']), + ('Memory', format_mem_field(s['effective-parameters'])), + ('Version', s['effective-parameters']['version']), + ('In-flight req.', s['request-metrics']['outstanding']), + ('Status', format_status(s['status'])), + ('Last request', format_last_request_time(s)), + ('Current?', + format_using_current_token(s, token_etag or s.get('etag', None), + token_name))] + if key in column_names or show_index and key == 'Index']) + for index, s in enumerate(services)] + service_table = tabulate(rows, headers='keys', tablefmt='plain') + if summary_table: + num_failing_services = len([s for s in services if s['status'] == 'Failing']) + num_instances = sum(retrieve_num_instances(s) for s in services) + total_mem_usage = format_memory_amount(sum(s['resource-usage']['mem'] for s in services)) + total_cpu_usage = round(sum(s['resource-usage']['cpus'] for s in services), 2) + table = [['# Services', num_services], + ['# Failing', num_failing_services], + ['# Instances', num_instances], + ['Total Memory', total_mem_usage], + ['Total CPUs', total_cpu_usage]] + summary_table = tabulate(table, tablefmt='plain') + return f'\n\n{summary_table}\n\n{service_table}', services + else: + return service_table, services + else: + return '', services + + +def tabulate_service_instances(instances, show_index=False, column_names=[]): + """ + :param instances: list of instances to be displayed + :param show_index: shows index column (usually for future choice prompt) + :param column_names: column names to be displayed in table + :return: tabular output string + """ + if len(instances) > 0: + rows = [collections.OrderedDict([(key, data) + for key, data in + [('Index', f'[{index + 1}]'), + ('Instance Id', inst['id']), + ('Host', inst['host']), + ('Status', format_instance_status(inst))] + if key in column_names or show_index and key == 'Index']) + for index, inst in enumerate(instances)] + return tabulate(rows, headers='keys', tablefmt='plain') + else: + return '' + + +def get_user_selection(items, tabular_str, short_circuit_choice=True): + """ + :param items: list of possible choices + :param tabular_str: table output to provide user with options + :param short_circuit_choice: When True and only one item in items, return that item as the selection without user + prompt. + :exception Raises exception when user input is invalid + :return selected item (an element from the items list) + """ + if short_circuit_choice and len(items) == 1: + return items[0] + print(tabular_str) + print("testing", end='\r') + answer = input(f'Enter the Index of your choice: ') + print() + try: + index = int(answer) - 1 + if index < 0 or index >= len(items): + raise Exception('Input is out of range!') + return items[index] + except ValueError as error: + print_error('Input received did not match any of the choices!') + raise error diff --git a/cli/waiter/plugins.py b/cli/waiter/plugins.py new file mode 100644 index 000000000..cc600540a --- /dev/null +++ b/cli/waiter/plugins.py @@ -0,0 +1,12 @@ +__plugins = {} + + +def configure(plugins): + """Configures global plugins to the plugins map""" + global __plugins + __plugins = plugins + + +def get_fn(plugin_name, default_fn): + """Returns the plugin function corresponding to the given plugin name if found, otherwise, default_fn""" + return __plugins.get(plugin_name, default_fn) diff --git a/cli/waiter/querying.py b/cli/waiter/querying.py index 659516bcd..8c37ebee4 100644 --- a/cli/waiter/querying.py +++ b/cli/waiter/querying.py @@ -47,6 +47,18 @@ def print_no_data(clusters): print(no_data_message(clusters)) +def print_no_services(clusters, token): + """Prints a message that there were no services found for a token""" + clusters_text = ' / '.join([terminal.bold(c['name']) for c in clusters]) + print(f'There are no services using token {terminal.bold(token)} in {clusters_text}.') + + +def print_no_instances(service): + """Prints a message that there are no relevant instances for the service""" + print(f'There are no relevant instances using service id {terminal.bold(service)}.') + print(f'Check the --include flags for active, failed, and killed instances.') + + def get_token_on_cluster(cluster, token_name, include_services=False): """Gets the token with the given name on the given cluster""" token_data, token_etag = get_token(cluster, token_name, include='metadata') @@ -208,3 +220,8 @@ def get_target_cluster_from_token(clusters, token_name, enforce_cluster): f'clusters-{cluster_names}.' '\nConsider specifying with the --cluster flag which cluster you are targeting.') return _get_latest_cluster(clusters, query_result) + + +def get_service_id_from_instance_id(instance_id): + """Extracts the service_id from the instance_id. instance_ids begin with a service_id followed by a period""" + return instance_id.split('.')[0] diff --git a/cli/waiter/subcommands/show.py b/cli/waiter/subcommands/show.py index 97b28137b..7655580b7 100644 --- a/cli/waiter/subcommands/show.py +++ b/cli/waiter/subcommands/show.py @@ -1,61 +1,12 @@ -import collections - from tabulate import tabulate from waiter import terminal from waiter.data_format import display_data -from waiter.format import format_field_name, format_last_request_time, format_mem_field, format_memory_amount, \ - format_status, format_timestamp_string +from waiter.format import format_field_name, format_mem_field, format_timestamp_string +from waiter.display import tabulate_token_services from waiter.querying import print_no_data, query_token -from waiter.util import guard_no_cluster, is_service_current - - -def format_using_current_token(service, token_etag, token_name): - """Formats the "Current?" column for the given service""" - is_current = is_service_current(service, token_etag, token_name) - if is_current: - return terminal.success('Current') - else: - return 'Not Current' - - -def retrieve_num_instances(service): - """Returns the total number of instances.""" - instance_counts = service["instance-counts"] - return instance_counts["healthy-instances"] + instance_counts["unhealthy-instances"] - - -def tabulate_token_services(services, token_etag, token_name): - """Returns a table displaying the service info""" - num_services = len(services) - if num_services > 0: - num_failing_services = len([s for s in services if s['status'] == 'Failing']) - num_instances = sum(retrieve_num_instances(s) for s in services) - total_mem_usage = format_memory_amount(sum(s['resource-usage']['mem'] for s in services)) - total_cpu_usage = round(sum(s['resource-usage']['cpus'] for s in services), 2) - table = [['# Services', num_services], - ['# Failing', num_failing_services], - ['# Instances', num_instances], - ['Total Memory', total_mem_usage], - ['Total CPUs', total_cpu_usage]] - summary_table = tabulate(table, tablefmt='plain') - - services = sorted(services, key=lambda s: s.get('last-request-time', None) or '', reverse=True) - rows = [collections.OrderedDict([('Service Id', s['service-id']), - ('Run as user', s['effective-parameters']['run-as-user']), - ('Instances', retrieve_num_instances(s)), - ('CPUs', s['effective-parameters']['cpus']), - ('Memory', format_mem_field(s['effective-parameters'])), - ('Version', s['effective-parameters']['version']), - ('Status', format_status(s['status'])), - ('Last request', format_last_request_time(s)), - ('Current?', format_using_current_token(s, token_etag, token_name))]) - for s in services] - service_table = tabulate(rows, headers='keys', tablefmt='plain') - return f'\n\n{summary_table}\n\n{service_table}' - else: - return '' +from waiter.util import guard_no_cluster def tabulate_token(cluster_name, token, token_name, services, token_etag): @@ -97,7 +48,9 @@ def tabulate_token(cluster_name, token, token_name, services, token_etag): table_text = tabulate(table, tablefmt='plain') last_update_time = format_timestamp_string(token['last-update-time']) last_update_user = f' ({token["last-update-user"]})' if 'last-update-user' in token else '' - service_table = tabulate_token_services(services, token_etag, token_name) + column_names = ['Service Id', 'Run as user', 'Instances', 'CPUs', 'Memory', 'Version', 'Status', 'Last request', + 'Current?'] + service_table, _ = tabulate_token_services(services, token_name, token_etag=token_etag, column_names=column_names) return f'\n' \ f'=== {terminal.bold(cluster_name)} / {terminal.bold(token_name)} ===\n' \ f'\n' \ diff --git a/cli/waiter/subcommands/ssh.py b/cli/waiter/subcommands/ssh.py new file mode 100644 index 000000000..0acfaaeec --- /dev/null +++ b/cli/waiter/subcommands/ssh.py @@ -0,0 +1,202 @@ +import argparse +import logging +import os +from enum import Enum + +from waiter import plugins, terminal +from waiter.display import get_user_selection, tabulate_service_instances, tabulate_token_services +from waiter.querying import get_service_id_from_instance_id, get_target_cluster_from_token, print_no_data, \ + print_no_services, query_service, query_token, get_services_on_cluster, print_no_instances +from waiter.util import guard_no_cluster, is_admin_enabled, print_info + +BASH_PATH = '/bin/bash' + + +class Destination(Enum): + TOKEN = 'token' + SERVICE_ID = 'service_id' + INSTANCE_ID = 'instance_id' + + +def map_instances_with_status(instances, status): + return [{'_status': status, **inst} for inst in instances] + + +def get_instances_from_service_id(clusters, service_id, include_active_instances, include_failed_instances, + include_killed_instances): + query_result = query_service(clusters, service_id) + num_services = query_result['count'] + if num_services == 0: + return False + service = list(query_result['clusters'].values())[0]['service'] + instances = [] + if include_active_instances: + instances += map_instances_with_status(service['instances']['active-instances'], 'active') + if include_failed_instances: + instances += map_instances_with_status(service['instances']['failed-instances'], 'failed') + if include_killed_instances: + instances += map_instances_with_status(service['instances']['killed-instances'], 'killed') + return instances + + +def kubectl_exec_to_instance(kubectl_cmd, api_server, namespace, pod_name, container_name, log_directory, + command_to_run=None): + args = ['--server', api_server, + '--namespace', namespace, + 'exec', + '-it', pod_name, + '-c', container_name, + '--', + '/bin/bash', '-c', f"cd {log_directory}; {' '.join(command_to_run) or 'exec /bin/bash'}"] + os.execlp(kubectl_cmd, 'kubectl', *args) + + +def ssh_instance(instance, container_name, command_to_run=None): + print_info(f'Attempting to ssh into instance {terminal.bold(instance["id"])}...') + log_directory = instance['log-directory'] + k8s_pod_name = instance.get('k8s/pod-name', False) + if k8s_pod_name: + k8s_api_server = instance['k8s/api-server-url'] + kubectl_cmd = os.getenv('WAITER_KUBECTL', plugins.get_fn('get-kubectl-cmd', lambda: 'kubectl')()) + k8s_namespace = instance['k8s/namespace'] + print_info(f'Executing ssh to k8s pod {terminal.bold(k8s_pod_name)}') + logging.debug(f'Executing ssh to k8s pod {terminal.bold(k8s_pod_name)} ' + f'using namespace={k8s_namespace} api_server={k8s_api_server}') + kubectl_exec_to_instance(kubectl_cmd, k8s_api_server, k8s_namespace, k8s_pod_name, container_name, + log_directory, command_to_run) + else: + hostname = instance['host'] + command_to_run = command_to_run or [BASH_PATH] + ssh_cmd = os.getenv('WAITER_SSH', 'ssh') + args = [ssh_cmd, '-t', hostname, 'cd', log_directory, ';'] + command_to_run + print_info(f'Executing ssh to {terminal.bold(hostname)}') + os.execlp(ssh_cmd, *args) + + +def ssh_instance_id(clusters, instance_id, command, container_name): + service_id = get_service_id_from_instance_id(instance_id) + instances = get_instances_from_service_id(clusters, service_id, True, True, True) + if instances is False: + print_no_data(clusters) + return 1 + found_instance = next((instance + for instance in instances + if instance['id'] == instance_id), + False) + if not found_instance: + print_no_data(clusters) + return 1 + return ssh_instance(found_instance, container_name, command) + + +def ssh_service_id(clusters, service_id, command, container_name, skip_prompts, include_active_instances, + include_failed_instances, include_killed_instances): + instances = get_instances_from_service_id(clusters, service_id, include_active_instances, include_failed_instances, + include_killed_instances) + if instances is False: + print_no_data(clusters) + return 1 + if len(instances) == 0: + print_no_instances(service_id) + return 1 + if skip_prompts: + selected_instance = instances[0] + else: + column_names = ['Instance Id', 'Host', 'Status'] + tabular_output = tabulate_service_instances(instances, show_index=True, column_names=column_names) + selected_instance = get_user_selection(instances, tabular_output) + return ssh_instance(selected_instance, container_name, command) + + +def ssh_token(clusters, enforce_cluster, token, command, container_name, skip_prompts, include_active_instances, + include_failed_instances, include_killed_instances): + if skip_prompts: + cluster = get_target_cluster_from_token(clusters, token, enforce_cluster) + query_result = get_services_on_cluster(cluster, token) + services = [s + for s in query_result.get('services', []) + if s['instance-counts']['healthy-instances'] + s['instance-counts']['unhealthy-instances'] > 0] + if len(services) == 0: + print_no_services(clusters, token) + return 1 + max_last_request = max(s.get('last-request-time', '') for s in services) + selected_service_id = next(s['service-id'] for s in services if s['last-request-time'] == max_last_request) + else: + query_result = query_token(clusters, token, include_services=True) + if query_result['count'] == 0: + print_no_data(clusters) + return 1 + cluster_data = query_result['clusters'] + services = [{'cluster': cluster, 'etag': data['etag'], **service} + for cluster, data in cluster_data.items() + for service in data['services']] + if len(services) == 0: + print_no_services(clusters, token) + return 1 + column_names = ['Service Id', 'Cluster', 'Instances', 'In-flight req.', 'Status', 'Last request', 'Current?'] + tabular_output, sorted_services = tabulate_token_services(services, token, show_index=True, summary_table=False, + column_names=column_names) + selected_service = get_user_selection(sorted_services, tabular_output) + selected_service_id = selected_service['service-id'] + return ssh_service_id(clusters, selected_service_id, command, container_name, skip_prompts, + include_active_instances, include_failed_instances, include_killed_instances) + + +def ssh(clusters, args, _, enforce_cluster): + guard_no_cluster(clusters) + token_or_service_id_or_instance_id = args.pop('token-or-service-id-or-instance-id') + command = args.pop('command') + ssh_destination = args.pop('ssh_destination') + include_active_instances = args.pop('include_active_instances') + include_failed_instances = args.pop('include_failed_instances') + include_killed_instances = args.pop('include_killed_instances') + container_name = args.pop('container_name', 'waiter-app') + skip_prompts = args.pop('quick') + if ssh_destination == Destination.TOKEN: + return ssh_token(clusters, enforce_cluster, token_or_service_id_or_instance_id, command, container_name, + skip_prompts, include_active_instances, include_failed_instances, include_killed_instances) + elif ssh_destination == Destination.SERVICE_ID: + return ssh_service_id(clusters, token_or_service_id_or_instance_id, command, container_name, skip_prompts, + include_active_instances, include_failed_instances, include_killed_instances) + elif ssh_destination == Destination.INSTANCE_ID: + return ssh_instance_id(clusters, token_or_service_id_or_instance_id, command, container_name) + + +def register(add_parser): + """Adds this sub-command's parser and returns the action function""" + parser = add_parser('ssh', + help='ssh to a Waiter instance', + description='ssh to an instance given the token, service id, or instance id. Working directory ' + 'will be the log directory.') + parser.add_argument('token-or-service-id-or-instance-id') + if is_admin_enabled(): + parser.add_argument('--container-name', '-c', + help='specify the container name you want to ssh into. Defaults to "waiter-app". Has no ' + 'effect if instance is not k8s pod.') + id_group = parser.add_mutually_exclusive_group(required=False) + id_group.add_argument('--token', '-t', dest='ssh_destination', action='store_const', const=Destination.TOKEN, + default=Destination.TOKEN, help='Default; ssh with token') + id_group.add_argument('--service-id', '-s', dest='ssh_destination', action='store_const', + const=Destination.SERVICE_ID, help='ssh using a service id') + id_group.add_argument('--instance-id', '-i', dest='ssh_destination', action='store_const', + const=Destination.INSTANCE_ID, help='ssh directly to instance id') + parser.add_argument('--quick', '-q', dest='quick', action='store_true', + help='skips services prompt by selecting the service with latest request, and instances prompt ' + 'by selecting a random one.') + active_group = parser.add_mutually_exclusive_group(required=False) + failed_group = parser.add_mutually_exclusive_group(required=False) + killed_group = parser.add_mutually_exclusive_group(required=False) + active_group.add_argument('--active', '-a', dest='include_active_instances', action='store_true', default=True, + help='included by default; includes active instances when prompting') + failed_group.add_argument('--failed', '-f', dest='include_failed_instances', action='store_true', + help='includes failed instances when prompting') + killed_group.add_argument('--killed', '-k', dest='include_killed_instances', action='store_true', + help='includes killed instances when prompting') + active_group.add_argument('--no-active', dest='include_active_instances', action='store_false', + help="don't show active instances in prompt") + failed_group.add_argument('--no-failed', dest='include_failed_instances', action='store_false', + help="don't show failed instances in prompt") + killed_group.add_argument('--no-killed', dest='include_killed_instances', action='store_false', + help="don't show killed instances in prompt") + parser.add_argument('command', nargs=argparse.REMAINDER, help='command to be run on instance') + return ssh diff --git a/cli/waiter/token_post.py b/cli/waiter/token_post.py index 32514a01e..b4d4dbd9d 100644 --- a/cli/waiter/token_post.py +++ b/cli/waiter/token_post.py @@ -8,7 +8,8 @@ from waiter import terminal, http_util from waiter.data_format import load_data from waiter.querying import get_token, query_token, get_target_cluster_from_token -from waiter.util import FALSE_STRINGS, print_info, response_message, TRUE_STRINGS, guard_no_cluster, str2bool +from waiter.util import FALSE_STRINGS, is_admin_enabled, print_info, response_message, TRUE_STRINGS, guard_no_cluster, \ + str2bool BOOL_STRINGS = TRUE_STRINGS + FALSE_STRINGS INT_PARAM_SUFFIXES = ['-failures', '-index', '-instances', '-length', '-level', '-mins', '-secs'] @@ -138,10 +139,9 @@ def create_or_update_token(clusters, args, _, enforce_cluster, action): def add_arguments(parser): """Adds arguments to the given parser""" - is_admin_enabled = str2bool(os.getenv('WAITER_ADMIN', default=FALSE_STRINGS[0])) add_token_flags(parser) parser.add_argument('token', nargs='?') - if is_admin_enabled: + if is_admin_enabled(): parser.add_argument('--admin', '-a', help='run command in admin mode', action='store_true') format_group = parser.add_mutually_exclusive_group() format_group.add_argument('--json', help='provide the data in a JSON file', dest='json') diff --git a/cli/waiter/util.py b/cli/waiter/util.py index 47e3da7f7..9d32e34d1 100644 --- a/cli/waiter/util.py +++ b/cli/waiter/util.py @@ -125,3 +125,8 @@ def is_service_current(service, current_token_etag, token_name): for sources in service['source-tokens'] for source in sources) return is_current + + +def is_admin_enabled(): + """Returns True if current user is an admin""" + return str2bool(os.getenv('WAITER_ADMIN', default=FALSE_STRINGS[0]))