Skip to content
This repository has been archived by the owner on Mar 22, 2023. It is now read-only.

Commit

Permalink
CLI: choose correct cluster when updating token (#1268)
Browse files Browse the repository at this point in the history
* get proper cluster

* wip

* wip

* fix other subcommands

* add support for enforce clusters arg

* add changes to update and create subcommands

* add doc strings describing new functions

* modify exception string

* add integration tests for multi router

* add more tests for updating tokens

* combine tests to use util

* add more testing with sync groups

* add tests

* revert .waiter.json changes

* revert .waiter.json changes

* revert .waiter.json changes

* maintenance mode tests
  • Loading branch information
Kevin Tang authored Jan 24, 2021
1 parent 0ed2bc4 commit 8546dba
Show file tree
Hide file tree
Showing 12 changed files with 302 additions and 29 deletions.
218 changes: 218 additions & 0 deletions cli/integration/tests/waiter/test_cli_multi_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ def setUp(self):
self.waiter_url_1 = type(self).waiter_url_1
self.waiter_url_2 = type(self).waiter_url_2
self.logger = logging.getLogger(__name__)
self.waiter_1_cluster = util.retrieve_waiter_cluster_name(self.waiter_url_1)
self.waiter_2_cluster = util.retrieve_waiter_cluster_name(self.waiter_url_2)

def __two_cluster_config(self):
return {'clusters': [{'name': 'waiter1', 'url': self.waiter_url_1},
Expand Down Expand Up @@ -232,6 +234,221 @@ def test_update_non_default_cluster(self):
finally:
util.delete_token(self.waiter_url_2, token_name)

def _test_choose_latest_configured_cluster(self, cluster_test_configs, expected_updated_cluster_index):
config = {'clusters': cluster_test_configs}
token_name = self.token_name()
expected_cluster_with_latest_token = cluster_test_configs[expected_updated_cluster_index]
# last token defined in the cluster_test_configs array will be the latest token
for cluster_test_config in cluster_test_configs:
util.post_token(cluster_test_config['url'], token_name, cluster_test_config['token-to-create'],
update_mode_admin=True)
try:
with cli.temp_config_file(config) as path:
version = str(uuid.uuid4())
cp = cli.update(token_name=token_name, flags=f'--config {path}', update_flags=f'--version {version}')
self.assertEqual(0, cp.returncode, cp.stderr)
self.assertIn(expected_cluster_with_latest_token['name'], cli.stdout(cp))
modified_token = util.load_token(expected_cluster_with_latest_token['url'], token_name,
expected_status_code=200)
self.assertEqual(version, modified_token['version'])
for cluster_test_config in cluster_test_configs:
waiter_url = cluster_test_config['url']
if waiter_url != expected_cluster_with_latest_token['url']:
not_modified_token = util.load_token(waiter_url, token_name, expected_status_code=200)
self.assertNotEqual(version, not_modified_token)
self.assertNotIn(cluster_test_config['name'], cli.stdout(cp))
finally:
for cluster_test_config in cluster_test_configs:
util.delete_token(cluster_test_config['url'], token_name)

def test_update_token_latest_configured_same_cluster(self):
sync_group_name = 'group_name'
cluster_test_configs = [{'name': 'waiter1',
'url': self.waiter_url_1,
'token-to-create': util.minimal_service_description(cluster=self.waiter_1_cluster),
'default-for-create': True,
'sync-group': sync_group_name},
{'name': 'waiter2',
'url': self.waiter_url_2,
'token-to-create': util.minimal_service_description(cluster=self.waiter_2_cluster),
'sync-group': sync_group_name}]
self._test_choose_latest_configured_cluster(cluster_test_configs, 1)

def test_update_token_latest_configured_different_cluster(self):
sync_group_name = 'group_name'
cluster_test_configs = [{'name': 'waiter1',
'url': self.waiter_url_1,
'token-to-create': util.minimal_service_description(cluster=self.waiter_1_cluster),
'default-for-create': True,
'sync-group': sync_group_name},
{'name': 'waiter2',
'url': self.waiter_url_2,
'token-to-create': util.minimal_service_description(cluster=self.waiter_1_cluster),
'sync-group': sync_group_name}]
self._test_choose_latest_configured_cluster(cluster_test_configs, 0)

def test_update_token_latest_configured_to_missing_cluster(self):
sync_group_1 = "sync-group-1"
unlisted_cluster_name = "unlisted_cluster"
config = {'clusters': [{'name': 'waiter1',
'url': self.waiter_url_1,
'default-for-create': True,
'sync-group': sync_group_1},
{'name': 'waiter2',
'url': self.waiter_url_2,
'sync-group': sync_group_1}]}
token_name = self.token_name()
util.post_token(self.waiter_url_1, token_name, util.minimal_service_description(cluster=self.waiter_1_cluster))
util.post_token(self.waiter_url_2, token_name, util.minimal_service_description(cluster=unlisted_cluster_name))
try:
with cli.temp_config_file(config) as path:
version = str(uuid.uuid4())
cp = cli.update(token_name=token_name, flags=f'--config {path}', update_flags=f'--version {version}')
self.assertEqual(1, cp.returncode, cp.stderr)
self.assertIn('The token is configured in cluster', cli.stderr(cp))
self.assertIn(unlisted_cluster_name, cli.stderr(cp))
self.assertIn(self.waiter_1_cluster, cli.stderr(cp))
self.assertIn(self.waiter_2_cluster, cli.stderr(cp))
finally:
util.delete_token(self.waiter_url_1, token_name)
util.delete_token(self.waiter_url_2, token_name)

def _test_update_token_multiple_sync_groups(self, config):
token_name = self.token_name()
util.post_token(self.waiter_url_2, token_name, util.minimal_service_description(cluster=self.waiter_2_cluster))
try:
with cli.temp_config_file(config) as path:
version = str(uuid.uuid4())
cp = cli.update(token_name=token_name, flags=f'--config {path}', update_flags=f'--version {version}')
self.assertEqual(0, cp.returncode, cp.stderr)
self.assertIn('waiter2', cli.stdout(cp))
token_2 = util.load_token(self.waiter_url_2, token_name, expected_status_code=200)
self.assertEqual(version, token_2['version'])
finally:
util.delete_token(self.waiter_url_2, token_name)

def test_update_token_multiple_sync_groups_no_conflict(self):
config = {'clusters': [{'name': 'waiter1',
'url': self.waiter_url_1,
'default-for-create': True,
'sync-group': "sync-group-1"},
{'name': 'waiter2', 'url': self.waiter_url_2,
'sync-group': "sync-group-2"}]}
self._test_update_token_multiple_sync_groups(config)

def test_update_token_multiple_sync_groups_not_listed(self):
# by default, if no sync group is listed the sync-group is given a unique group
config = {'clusters': [{'name': 'waiter1',
'url': self.waiter_url_1,
'default-for-create': True},
{'name': 'waiter2', 'url': self.waiter_url_2}]}
self._test_update_token_multiple_sync_groups(config)

def test_update_token_multiple_sync_groups_with_conflict(self):
sync_group_1 = "sync-group-1"
sync_group_2 = "sync-group-2"
config = {'clusters': [{'name': 'waiter1',
'url': self.waiter_url_1,
'default-for-create': True,
'sync-group': sync_group_1},
{'name': 'waiter2',
'url': self.waiter_url_2,
'sync-group': sync_group_2}]}
token_name = self.token_name()
util.post_token(self.waiter_url_1, token_name, util.minimal_service_description(cluster=self.waiter_1_cluster))
util.post_token(self.waiter_url_2, token_name, util.minimal_service_description(cluster=self.waiter_2_cluster))
try:
with cli.temp_config_file(config) as path:
version = str(uuid.uuid4())
cp = cli.update(token_name=token_name, flags=f'--config {path}', update_flags=f'--version {version}')
self.assertEqual(1, cp.returncode, cp.stderr)
self.assertIn('Could not infer the target cluster', cli.stderr(cp))
self.assertIn(sync_group_1, cli.stderr(cp))
self.assertIn(sync_group_2, cli.stderr(cp))
finally:
util.delete_token(self.waiter_url_1, token_name)
util.delete_token(self.waiter_url_2, token_name)

def test_maintenance_start_latest_configured_cluster(self):
custom_maintenance_message = "custom maintenance message"
config = {'clusters': [{'name': 'waiter1',
'url': self.waiter_url_1,
'default-for-create': True,
'sync-group': 'group_name'},
{'name': 'waiter2',
'url': self.waiter_url_2,
'sync-group': 'group_name'}]}
token_name = self.token_name()
util.post_token(self.waiter_url_1, token_name, util.minimal_service_description(cluster=self.waiter_1_cluster))
util.post_token(self.waiter_url_2, token_name, util.minimal_service_description(cluster=self.waiter_2_cluster))
try:
with cli.temp_config_file(config) as path:
cp = cli.maintenance('start', token_name, flags=f'--config {path}',
maintenance_flags=f'"{custom_maintenance_message}"')
self.assertEqual(0, cp.returncode, cp.stderr)
token_1 = util.load_token(self.waiter_url_1, token_name, expected_status_code=200)
token_2 = util.load_token(self.waiter_url_2, token_name, expected_status_code=200)
self.assertEqual(custom_maintenance_message, token_2['maintenance']['message'])
self.assertTrue('maintenance' not in token_1)
finally:
util.delete_token(self.waiter_url_1, token_name)
util.delete_token(self.waiter_url_2, token_name)

def test_maintenance_stop_latest_configured_cluster(self):
custom_maintenance_message = "custom maintenance message"
config = {'clusters': [{'name': 'waiter1',
'url': self.waiter_url_1,
'default-for-create': True,
'sync-group': 'group_name'},
{'name': 'waiter2',
'url': self.waiter_url_2,
'sync-group': 'group_name'}]}
token_name = self.token_name()
util.post_token(self.waiter_url_1,
token_name,
util.minimal_service_description(cluster=self.waiter_1_cluster,
maintenance={'message': custom_maintenance_message}))
util.post_token(self.waiter_url_2,
token_name,
util.minimal_service_description(cluster=self.waiter_2_cluster,
maintenance={'message': custom_maintenance_message}))
try:
with cli.temp_config_file(config) as path:
cp = cli.maintenance('stop', token_name, flags=f'--config {path}')
self.assertEqual(0, cp.returncode, cp.stderr)
token_1 = util.load_token(self.waiter_url_1, token_name, expected_status_code=200)
token_2 = util.load_token(self.waiter_url_2, token_name, expected_status_code=200)
self.assertTrue('maintenance' not in token_2)
self.assertTrue(custom_maintenance_message, token_1['maintenance']['message'])
finally:
util.delete_token(self.waiter_url_1, token_name)
util.delete_token(self.waiter_url_2, token_name)

def test_maintenance_check_latest_configured_cluster(self):
config = {'clusters': [{'name': 'waiter1',
'url': self.waiter_url_1,
'default-for-create': True,
'sync-group': 'group_name'},
{'name': 'waiter2',
'url': self.waiter_url_2,
'sync-group': 'group_name'}]}
token_name = self.token_name()
util.post_token(self.waiter_url_1,
token_name,
util.minimal_service_description(cluster=self.waiter_1_cluster,
maintenance={'message': "custom maintenance message"}))
util.post_token(self.waiter_url_2,
token_name,
util.minimal_service_description(cluster=self.waiter_2_cluster))
try:
with cli.temp_config_file(config) as path:
cp = cli.maintenance('check', token_name, flags=f'--config {path}')
self.assertEqual(1, cp.returncode, cp.stderr)
self.assertIn(f'{token_name} is not in maintenance mode', cli.stdout(cp))
finally:
util.delete_token(self.waiter_url_1, token_name)
util.delete_token(self.waiter_url_2, token_name)

def test_ping_via_token_cluster(self):
# Create in cluster #1
token_name = self.token_name()
Expand Down Expand Up @@ -301,3 +518,4 @@ def test_federated_tokens(self):
util.delete_token(self.waiter_url_2, token_name)
finally:
util.delete_token(self.waiter_url_1, token_name)

10 changes: 9 additions & 1 deletion cli/integration/tests/waiter/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ def retrieve_waiter_url(varname='WAITER_URL', value='http://localhost:9091'):
return cook_url


def retrieve_waiter_settings(waiter_url):
return session.get(f'{waiter_url}/settings').json()


def retrieve_waiter_cluster_name(waiter_url):
return retrieve_waiter_settings(waiter_url)['cluster-config']['name']


def is_connection_error(exception):
return isinstance(exception, requests.exceptions.ConnectionError)

Expand Down Expand Up @@ -233,7 +241,7 @@ def load_file(file_format, path):

def wait_until_routers(waiter_url, predicate):
auth_cookie = {'x-waiter-auth': session.cookies['x-waiter-auth']}
max_wait_ms = session.get(f'{waiter_url}/settings').json()['scheduler-syncer-interval-secs'] * 2 * 1000
max_wait_ms = retrieve_waiter_settings(waiter_url)['scheduler-syncer-interval-secs'] * 2 * 1000
routers = session.get(f'{waiter_url}/state/maintainer').json()['state']['routers']
for _, router_url in routers.items():
logging.debug(f'Waiting for at most {max_wait_ms}ms on {router_url}')
Expand Down
3 changes: 2 additions & 1 deletion cli/waiter/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,10 @@ def run(args, plugins):
metrics.initialize(config_map)
metrics.inc(f'command.{action}.runs')
clusters = load_target_clusters(config_map, url, cluster)
enforce_cluster = (url or cluster) and True
http_util.configure(config_map, plugins)
args = {k: v for k, v in args.items() if v is not None}
result = actions[action]['run-function'](clusters, args, config_path)
result = actions[action]['run-function'](clusters, args, config_path, enforce_cluster)
logging.debug(f'result: {result}')
if result == 0:
metrics.inc(f'command.{action}.result.success')
Expand Down
53 changes: 50 additions & 3 deletions cli/waiter/querying.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,57 @@ def query_tokens(clusters, user):
lambda cluster, executor: executor.submit(get_tokens_on_cluster, cluster, user))


def get_cluster_with_token(clusters, token_name):
def _get_latest_cluster(clusters, query_result):
"""
:param clusters: list of local cluster configs from the configuration file
:param query_result: value from query_token function
:return: Finds latest token configuration from the query_result. Gets the cluster that is configured in the
token description and returns a local cluster who's serverside name matches the one specified in the token.
If the token's cluster does not exist in one of the local cluster configurations then an Exception is raised.
"""
token_descriptions = list(query_result['clusters'].values())
token_result = max(token_descriptions, key=lambda token: token['token']['last-update-time'])
cluster_name_goal = token_result['token']['cluster']
provided_cluster_names = []
for c in clusters:
cluster_settings, _ = http_util.make_data_request(c, lambda: http_util.get(c, '/settings'))
cluster_config_name = cluster_settings['cluster-config']['name']
provided_cluster_names.append(cluster_config_name)
if cluster_name_goal.upper() == cluster_config_name.upper():
return c
raise Exception(f'The token is configured in cluster {cluster_name_goal}, which is not provided.' +
f' The following clusters were provided: {", ".join(provided_cluster_names)}.')


def get_target_cluster_from_token(clusters, token_name, enforce_cluster):
"""
:param clusters: list of local cluster configs from the configuration file
:param token_name: string name of token
:param enforce_cluster: boolean describing if cluster was explicitly specified as an cli argument
:return: Return the target cluster config for various token operations
"""
query_result = query_token(clusters, token_name)
if query_result["count"] == 0:
raise Exception('The token does not exist. You must create it first.')
elif enforce_cluster:
logging.debug(f'Forcing cluster {clusters[0]} as the target_cluster')
return clusters[0]
else:
cluster_names_with_token = list(query_result['clusters'].keys())
return next(c for c in clusters if c['name'] == cluster_names_with_token[0])
sync_group_count = 0
sync_groups_set = set()
cluster_names = set()
for cluster in list(query_result['clusters'].keys()):
cluster_config = next(c for c in clusters if c['name'] == cluster)
sync_group = cluster_config.get('sync-group', False)
# consider clusters that don't have a configured sync-group as in their own unique group
if not sync_group:
sync_group = sync_group_count
sync_group_count += 1
sync_groups_set.add(sync_group)
cluster_names.add(cluster)
if len(sync_groups_set) > 1:
raise Exception('Could not infer the target cluster for this operation because there are multiple cluster '
f'groups that contain a description for this token: groups-{sync_groups_set} '
f'clusters-{cluster_names}.'
'\nConsider specifying with the --cluster flag which cluster you are targeting.')
return _get_latest_cluster(clusters, query_result)
2 changes: 1 addition & 1 deletion cli/waiter/subcommands/delete.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def delete_token_on_cluster(cluster, token_name, token_etag):
print_error(message)


def delete(clusters, args, _):
def delete(clusters, args, _, __):
"""Deletes the token with the given token name."""
guard_no_cluster(clusters)
token_name = args.get('token')[0]
Expand Down
2 changes: 1 addition & 1 deletion cli/waiter/subcommands/init.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
}


def init_token_json(_, args, __):
def init_token_json(_, args, __, ___):
"""Creates (or updates) a Waiter token"""
logging.debug('args: %s' % args)
file = os.path.abspath(args.pop('file'))
Expand Down
Loading

0 comments on commit 8546dba

Please sign in to comment.