Skip to content
This repository has been archived by the owner on Mar 22, 2023. It is now read-only.

Commit

Permalink
Waiter SSH (#1295)
Browse files Browse the repository at this point in the history
* initial interface for cli

* add command possibility

* get arguments

* return cli op code 1 if no token exists

* temp

* add getting user selection

* user selection

* add ssh into non k8s host

* add support for instance-id

* remove unecessary logging

* add simple test case

* add instance_id testing for invalid ids

* relax to helper function

* use util functions instead of cmd

* add k8s testing support

* handle custom commands

* fix k8s tests

* add include different kinds of instances flag

* add failed instance test

* add test for ssh_service_id

* add test for multiple instances for a service_id

* todo

* add help message and format of values

* complicated skip_prompts

* modify get_user_selection to use unique field as well as index

* fix short circuit

* update docstrings

* fix k8s test

* cleanup with skip prompts

* fix container name placement

* testing between prompt messages

* fix tabular output and use helper functions

* combined test cases

* add custom container test

* add some test cases

* change to use goal functiosn

* remove active-instance assertion

* handle empty last-request-time case

* fix k8s container_name specification

* unecessary imports

* doc string

* remove print

* fix test

* make indexes start at 1

* failed and unhealthy show as a status

* fix typo

* lower case convention

* mutually exclusive grouping
  • Loading branch information
Kevin Tang authored Mar 3, 2021
1 parent 6b05a54 commit 527d4a4
Show file tree
Hide file tree
Showing 11 changed files with 595 additions and 57 deletions.
8 changes: 8 additions & 0 deletions cli/integration/tests/waiter/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,3 +372,11 @@ def maintenance(subcommand, token_name, waiter_url=None, flags=None, maintenance
args = f"maintenance {subcommand} {token_name} {maintenance_flags or ''}"
cp = cli(args, waiter_url, flags, stdin, env=env)
return cp


def ssh(waiter_url=None, token_or_service_id_or_instance_id=None, ssh_command=None, ssh_flags=None, flags=None,
stdin=None, env=None):
"""Attempts to ssh to token, service_id, or instance_id"""
args = f"ssh {ssh_flags or ''} {token_or_service_id_or_instance_id or ''} {ssh_command or ''}"
cp = cli(args, waiter_url, flags, stdin=stdin, env=env)
return cp
177 changes: 177 additions & 0 deletions cli/integration/tests/waiter/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1577,3 +1577,180 @@ def test_maintenance_no_sub_command(self):
cp_help = cli.maintenance('', '', maintenance_flags='-h')
self.assertEqual(0, cp.returncode, cp.stderr)
self.assertEqual(cli.stdout(cp_help), cli.stdout(cp))

def __test_ssh(self, instance_fn, command_to_run=None, stdin=None, min_instances=1, admin=False, ssh_flags=None,
container_name=None, is_failed_instance=False, test_service=False, test_instance=False,
multiple_services=False, quick=False, expect_no_data=False, expect_no_instances=False,
expect_out_of_range=False):
token_name = self.token_name()
token_fields = util.minimal_service_description()
token_fields['min-instances'] = min_instances
if is_failed_instance:
token_fields['cmd'] = 'this_is_an_invalid_command'
try:
if multiple_services:
token_new_fields = util.minimal_service_description()
util.post_token(self.waiter_url, token_name, token_new_fields)
util.ping_token(self.waiter_url, token_name)
util.post_token(self.waiter_url, token_name, token_fields)
service_id = util.ping_token(self.waiter_url, token_name,
expected_status_code=503 if is_failed_instance else 200)
if is_failed_instance:
goal_fn = lambda insts: 0 < len(insts['failed-instances']) and \
0 == len(insts['killed-instances'])
else:
goal_fn = lambda insts: min_instances == len(insts['active-instances']) and \
0 == len(insts['failed-instances']) and \
0 == len(insts['killed-instances'])
util.wait_until(lambda: util.instances_for_service(self.waiter_url, service_id), goal_fn)
instances = util.instances_for_service(self.waiter_url, service_id)
env = os.environ.copy()
env['WAITER_SSH'] = 'echo'
env['WAITER_KUBECTL'] = 'echo'
if admin:
env['WAITER_ADMIN'] = 'true'
instance = instance_fn(service_id, instances)
ssh_flags = [ssh_flags] if ssh_flags else []
if quick:
ssh_flags.append('-q')
if container_name:
ssh_flags.append(f'--container-name {container_name}')
if test_instance:
ssh_dest = instance['id']
ssh_flags.append('-i')
elif test_service:
ssh_dest = service_id
ssh_flags.append('-s')
else:
ssh_dest = token_name
cp = cli.ssh(self.waiter_url, ssh_dest, stdin=stdin, ssh_command=command_to_run,
ssh_flags=' '.join(ssh_flags),
env=env)
if expect_out_of_range:
self.assertEqual(1, cp.returncode, cp.stderr)
self.assertIn('Input is out of range!', cli.stderr(cp))
elif expect_no_data:
self.assertEqual(1, cp.returncode, cp.stderr)
self.assertIn('No matching data found', cli.stdout(cp))
elif expect_no_instances:
self.assertEqual(1, cp.returncode, cp.stderr)
self.assertIn(f'There are no relevant instances using service id {service_id}', cli.stdout(cp))
else:
log_directory = instance['log-directory']
self.assertEqual(0, cp.returncode, cp.stderr)
if util.using_kubernetes(self.waiter_url):
container_name = container_name or 'waiter-app'
api_server = instance['k8s/api-server-url']
namespace = instance['k8s/namespace']
pod_name = instance['k8s/pod-name']
self.assertIn(f'--server {api_server} --namespace {namespace} exec -it {pod_name} -c '
f"{container_name} -- /bin/bash -c cd {log_directory}; "
f"{command_to_run or 'exec /bin/bash'}",
cli.stdout(cp))
else:
self.assertIn(f"-t {instance['host']} cd {log_directory} ; {command_to_run or '/bin/bash'}",
cli.stdout(cp))
finally:
util.delete_token(self.waiter_url, token_name, kill_services=True)

def test_ssh_instance_id(self):
self.__test_ssh(lambda _, instances: instances['active-instances'][0], test_instance=True)

def test_ssh_instance_id_failed_instance(self):
self.__test_ssh(lambda _, instances: instances['failed-instances'][0], is_failed_instance=True,
test_instance=True)

def test_ssh_instance_id_custom_cmd(self):
self.__test_ssh(lambda _, instances: instances['active-instances'][0], test_instance=True,
command_to_run='ls -al')

def test_ssh_instance_id_custom_cmd_failed_instance(self):
self.__test_ssh(lambda _, instances: instances['failed-instances'][0], is_failed_instance=True,
test_instance=True, command_to_run='ls -al')

def test_ssh_instance_id_no_instance(self):
self.__test_ssh(lambda service_id, _: {'id': service_id + '.nonexistent'}, test_instance=True,
expect_no_data=True)

def test_ssh_instance_id_no_service(self):
instance_id_no_service = "a.a"
cp = cli.ssh(self.waiter_url, instance_id_no_service, ssh_flags='-i')
self.assertEqual(1, cp.returncode, cp.stderr)
self.assertIn('No matching data found', cli.stdout(cp))

def test_ssh_service_id_single_instance(self):
self.__test_ssh(lambda _, instances: instances['active-instances'][0], test_service=True)

def test_ssh_service_id_no_relevant_instances(self):
self.__test_ssh(lambda _, instances: instances['active-instances'][0], test_service=True,
ssh_flags='--no-active', expect_no_instances=True)

def test_ssh_service_id_multiple_instances(self):
self.__test_ssh(lambda _, instances: instances['active-instances'][0], min_instances=2,
stdin='1\n'.encode('utf8'), test_service=True)

def test_ssh_service_id_invalid_prompt_input(self):
self.__test_ssh(lambda _, instances: instances['active-instances'][0], min_instances=2,
stdin='-123\n'.encode('utf8'), test_service=True, expect_out_of_range=True)

def test_ssh_service_id_non_existent_service(self):
service_id = "nonexistent"
cp = cli.ssh(self.waiter_url, service_id, ssh_flags='-s')
self.assertEqual(1, cp.returncode, cp.stderr)
self.assertIn('No matching data found', cli.stdout(cp))

def test_ssh_service_id_quick(self):
self.__test_ssh(lambda _, instances: instances['active-instances'][0], min_instances=2, test_service=True,
quick=True)

def test_ssh_token_single_instance(self):
self.__test_ssh(lambda _, instances: instances['active-instances'][0])

def test_ssh_token_multiple_services_sorted(self):
self.__test_ssh(lambda _, instances: instances['active-instances'][0], stdin='1\n'.encode('utf8'),
multiple_services=True)

def test_ssh_token_multiple_instances(self):
self.__test_ssh(lambda _, instances: instances['active-instances'][0], min_instances=2,
stdin='1\n'.encode('utf8'))

def test_ssh_token_multiple_services_instances(self):
self.__test_ssh(lambda _, instances: instances['active-instances'][0], min_instances=2, multiple_services=True,
stdin='1\n1\n'.encode('utf8'))

def test_ssh_token_multiple_services_instances_quick(self):
self.__test_ssh(lambda _, instances: instances['active-instances'][0], min_instances=2, multiple_services=True,
quick=True)

def test_ssh_token_custom_container(self):
self.__test_ssh(lambda _, instances: instances['active-instances'][0], admin=True,
container_name='waiter-files')

def test_ssh_token_invalid_token(self):
token_name = "nonexistent"
cp = cli.ssh(self.waiter_url, token_name)
self.assertEqual(1, cp.returncode, cp.stderr)
self.assertIn('No matching data found', cli.stdout(cp))

def test_ssh_token_invalid_token_quick(self):
token_name = "nonexistent"
cp = cli.ssh(self.waiter_url, token_name, ssh_flags='-q')
self.assertEqual(1, cp.returncode, cp.stderr)
self.assertIn('The token does not exist. You must create it first.', cli.stderr(cp))

def __test_ssh_token_no_services(self, ssh_flags=None):
token_name = self.token_name()
token_fields = util.minimal_service_description()
util.post_token(self.waiter_url, token_name, token_fields)
try:
cp = cli.ssh(self.waiter_url, token_name, ssh_flags=ssh_flags)
self.assertEqual(1, cp.returncode, cp.stderr)
self.assertIn(f'There are no services using token {token_name}', cli.stdout(cp))
finally:
util.delete_token(self.waiter_url, token_name)

def test_ssh_token_no_services(self):
self.__test_ssh_token_no_services()

def test_ssh_token_no_services_quick(self):
self.__test_ssh_token_no_services(ssh_flags='-q')
26 changes: 26 additions & 0 deletions cli/integration/tests/waiter/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,19 @@ def services_for_token(waiter_url, token_name, assert_response=True, expected_st
return services


def instances_for_service(waiter_url, service_id, expected_status_code=200):
"""returns instances map of a service"""
headers = {
'Content-Type': 'application/json',
'x-cid': cid()
}
response = session.get(f'{waiter_url}/apps/{service_id}', headers=headers)
service = response.json()
assert expected_status_code == response.status_code, \
f'Expected {expected_status_code}, got {response.status_code} with body {response.text}'
return service['instances']


def multi_cluster_tests_enabled():
"""
Returns true if the WAITER_TEST_MULTI_CLUSTER environment variable is set to "true",
Expand Down Expand Up @@ -322,3 +335,16 @@ def wait_until_services_for_token(waiter_url, token_name, expected_num_services)

def wait_until_no_services_for_token(waiter_url, token_name):
wait_until_services_for_token(waiter_url, token_name, 0)


def retrieve_default_scheduler_name(waiter_url):
"""gets the scheduler of waiter"""
settings = retrieve_waiter_settings(waiter_url)
kind = settings["scheduler-config"]["kind"]
default_scheduler = settings["scheduler-config"][kind].get("default-scheduler", False)
return default_scheduler or kind


def using_kubernetes(waiter_url):
"""returns True if the scheduler of waiter is k8s and False otherwise"""
return "kubernetes" == retrieve_default_scheduler_name(waiter_url)
7 changes: 6 additions & 1 deletion cli/waiter/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
from urllib.parse import urlparse

from waiter import configuration, http_util, metrics, version
from waiter.subcommands import create, delete, init, kill, maintenance, ping, show, tokens, update
from waiter.subcommands import create, delete, init, kill, maintenance, ping, show, ssh, tokens, update
import waiter.plugins as waiter_plugins

parser = argparse.ArgumentParser(description='waiter is the Waiter CLI')
parser.add_argument('--cluster', '-c', help='the name of the Waiter cluster to use')
Expand Down Expand Up @@ -40,6 +41,9 @@
'show': {
'run-function': show.register(subparsers.add_parser)
},
'ssh': {
'run-function': ssh.register(subparsers.add_parser)
},
'tokens': {
'run-function': tokens.register(subparsers.add_parser)
},
Expand Down Expand Up @@ -105,6 +109,7 @@ def run(args, plugins):
url = args.pop('url')

logging.debug('plugins: %s', plugins)
waiter_plugins.configure(plugins)

if action is None:
parser.print_help()
Expand Down
133 changes: 133 additions & 0 deletions cli/waiter/display.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
import collections

from tabulate import tabulate

from waiter import terminal
from waiter.format import format_last_request_time, format_mem_field, format_memory_amount, format_status
from waiter.util import is_service_current, print_error


def retrieve_num_instances(service):
"""Returns the total number of instances."""
instance_counts = service["instance-counts"]
return instance_counts["healthy-instances"] + instance_counts["unhealthy-instances"]


def format_using_current_token(service, token_etag, token_name):
"""Formats the "Current?" column for the given service"""
is_current = is_service_current(service, token_etag, token_name)
if is_current:
return terminal.success('Current')
else:
return 'Not Current'


def format_instance_status(instance):
"""Formats the "Healthy?" column for the given instance"""
if instance['healthy?']:
return terminal.success('Healthy')
else:
if instance['_status'] == 'failed':
status = 'Failed'
elif instance['_status'] == 'killed':
status = 'Killed'
else:
status = 'Unhealthy'
return terminal.failed(status)


def tabulate_token_services(services, token_name, token_etag=None, show_index=False, summary_table=True,
column_names=[]):
"""
:param services: list of services to be displayed as rows
:param token_name: the token that the services belong to
:param token_etag: token_etag determines if a the service is current, will default to service['etag'] or None
:param show_index: shows index column (usually for future choice prompt)
:param summary_table: provides summary table of services (e.g. Total Memory, CPUS)
:param column_names: column fields to be included in table
:return: tabular output string and sorted services list in descending order by last request time
"""
num_services = len(services)
if num_services > 0:
services = sorted(services, key=lambda s: s.get('last-request-time', None) or '', reverse=True)
rows = [collections.OrderedDict([(key, data)
for key, data in
[('Index', f'[{index + 1}]'),
('Service Id', s['service-id']),
('Cluster', s.get('cluster', None)),
('Run as user', s['effective-parameters']['run-as-user']),
('Instances', retrieve_num_instances(s)),
('CPUs', s['effective-parameters']['cpus']),
('Memory', format_mem_field(s['effective-parameters'])),
('Version', s['effective-parameters']['version']),
('In-flight req.', s['request-metrics']['outstanding']),
('Status', format_status(s['status'])),
('Last request', format_last_request_time(s)),
('Current?',
format_using_current_token(s, token_etag or s.get('etag', None),
token_name))]
if key in column_names or show_index and key == 'Index'])
for index, s in enumerate(services)]
service_table = tabulate(rows, headers='keys', tablefmt='plain')
if summary_table:
num_failing_services = len([s for s in services if s['status'] == 'Failing'])
num_instances = sum(retrieve_num_instances(s) for s in services)
total_mem_usage = format_memory_amount(sum(s['resource-usage']['mem'] for s in services))
total_cpu_usage = round(sum(s['resource-usage']['cpus'] for s in services), 2)
table = [['# Services', num_services],
['# Failing', num_failing_services],
['# Instances', num_instances],
['Total Memory', total_mem_usage],
['Total CPUs', total_cpu_usage]]
summary_table = tabulate(table, tablefmt='plain')
return f'\n\n{summary_table}\n\n{service_table}', services
else:
return service_table, services
else:
return '', services


def tabulate_service_instances(instances, show_index=False, column_names=[]):
"""
:param instances: list of instances to be displayed
:param show_index: shows index column (usually for future choice prompt)
:param column_names: column names to be displayed in table
:return: tabular output string
"""
if len(instances) > 0:
rows = [collections.OrderedDict([(key, data)
for key, data in
[('Index', f'[{index + 1}]'),
('Instance Id', inst['id']),
('Host', inst['host']),
('Status', format_instance_status(inst))]
if key in column_names or show_index and key == 'Index'])
for index, inst in enumerate(instances)]
return tabulate(rows, headers='keys', tablefmt='plain')
else:
return ''


def get_user_selection(items, tabular_str, short_circuit_choice=True):
"""
:param items: list of possible choices
:param tabular_str: table output to provide user with options
:param short_circuit_choice: When True and only one item in items, return that item as the selection without user
prompt.
:exception Raises exception when user input is invalid
:return selected item (an element from the items list)
"""
if short_circuit_choice and len(items) == 1:
return items[0]
print(tabular_str)
print("testing", end='\r')
answer = input(f'Enter the Index of your choice: ')
print()
try:
index = int(answer) - 1
if index < 0 or index >= len(items):
raise Exception('Input is out of range!')
return items[index]
except ValueError as error:
print_error('Input received did not match any of the choices!')
raise error
Loading

0 comments on commit 527d4a4

Please sign in to comment.