From c42a787d2d793f6a59a05636b79df901685a430a Mon Sep 17 00:00:00 2001 From: Vijayan Balasubramanian Date: Mon, 22 Apr 2024 19:08:18 -0700 Subject: [PATCH] Update force merge polling (#489) Signed-off-by: Vijayan Balasubramanian --- osbenchmark/worker_coordinator/runner.py | 41 +++--- tests/worker_coordinator/runner_test.py | 167 ++++++++++------------- 2 files changed, 95 insertions(+), 113 deletions(-) diff --git a/osbenchmark/worker_coordinator/runner.py b/osbenchmark/worker_coordinator/runner.py index 56b5c2667..1941c90ac 100644 --- a/osbenchmark/worker_coordinator/runner.py +++ b/osbenchmark/worker_coordinator/runner.py @@ -688,30 +688,39 @@ class ForceMerge(Runner): Runs a force merge operation against OpenSearch. """ + PARAM_WAIT_FOR_COMPLETION = "wait_for_completion" + async def __call__(self, opensearch, params): - # pylint: disable=import-outside-toplevel - import opensearchpy max_num_segments = params.get("max-num-segments") mode = params.get("mode") merge_params = self._default_kw_params(params) if max_num_segments: merge_params["max_num_segments"] = max_num_segments + # Request end time will not be 100% accurate, since we are using polling + # to check whether task status is completed or not. if mode == "polling": - complete = False - try: - request_context_holder.on_client_request_start() - await opensearch.indices.forcemerge(**merge_params) - request_context_holder.on_client_request_end() - complete = True - except opensearchpy.ConnectionTimeout: - pass - while not complete: - await asyncio.sleep(params.get("poll-period")) - tasks = await opensearch.tasks.list(params={"actions": "indices:admin/forcemerge"}) - if len(tasks["nodes"]) == 0: - # empty nodes response indicates no tasks + self.logger.warning( + "%s will be updated to false to run force merge in asynchronous way", self.PARAM_WAIT_FOR_COMPLETION) + merge_params[self.PARAM_WAIT_FOR_COMPLETION] = "false" + request_context_holder.on_client_request_start() + response_task = await opensearch.indices.forcemerge(**merge_params) + while True: + force_merge_task_id = response_task['task'] + task = await opensearch.tasks.get(task_id=force_merge_task_id) + if not task: + self.logger.error("Failed to get task for task id: [%s]", force_merge_task_id) + request_context_holder.on_client_request_end() + raise exceptions.BenchmarkAssertionError( + "Force merge request failure: task was expected but not found in the get tasks api response.") + if 'completed' not in task: request_context_holder.on_client_request_end() - complete = True + raise exceptions.BenchmarkAssertionError( + "Force merge request failure: 'completed' was expected but not found " + "in the get task api response.") + if task['completed']: + request_context_holder.on_client_request_end() + break + await asyncio.sleep(params.get("poll-period")) else: request_context_holder.on_client_request_start() await opensearch.indices.forcemerge(**merge_params) diff --git a/tests/worker_coordinator/runner_test.py b/tests/worker_coordinator/runner_test.py index ecbde21a1..6969937c1 100644 --- a/tests/worker_coordinator/runner_test.py +++ b/tests/worker_coordinator/runner_test.py @@ -1154,122 +1154,95 @@ async def test_force_merge_with_params(self, opensearch, on_client_request_start opensearch.indices.forcemerge.assert_called_once_with(index="_all", max_num_segments=1, request_timeout=50000) - @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') - @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') - @mock.patch("opensearchpy.OpenSearch") - @run_async - async def test_force_merge_with_polling_no_timeout(self, opensearch, on_client_request_start, on_client_request_end): - opensearch.indices.forcemerge.return_value = as_future() - - force_merge = runner.ForceMerge() - await force_merge(opensearch, params={"index" : "_all", "mode": "polling", 'poll-period': 0}) - opensearch.indices.forcemerge.assert_called_once_with(index="_all") - @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async async def test_force_merge_with_polling(self, opensearch, on_client_request_start, on_client_request_end): - opensearch.indices.forcemerge.return_value = as_future(exception=opensearchpy.ConnectionTimeout()) - opensearch.tasks.list.side_effect = [ - as_future({ - "nodes": { - "Ap3OfntPT7qL4CBeKvamxg": { - "name": "instance-0000000001", - "transport_address": "10.46.79.231:19693", - "host": "10.46.79.231", - "ip": "10.46.79.231:19693", - "roles": [ - "data", - "ingest", - "master", - "remote_cluster_client", - "transform" - ], - "attributes": { - "logical_availability_zone": "zone-1", - "server_name": "instance-0000000001.64cb4c66f4f24d85b41f120ef2df5526", - "availability_zone": "us-east4-a", - "instance_configuration": "gcp.data.highio.1", - "transform.node": "true", - "region": "unknown-region" - }, - "tasks": { - "Ap3OfntPT7qL4CBeKvamxg:417009036": { - "node": "Ap3OfntPT7qL4CBeKvamxg", - "id": 417009036, - "type": "transport", - "action": "indices:admin/forcemerge", - "start_time_in_millis": 1598018980850, - "running_time_in_nanos": 3659821411, - "cancellable": False, - "headers": {} - } - } - } - } - }), - as_future({ - "nodes": {} - }) - ] + opensearch.indices.forcemerge.return_value = as_future({"task": "7PtzISisT5SiwlBGUi2GzQ:2820798"}) + opensearch.tasks.get.return_value = as_future({ + "completed": True, + "task": { + "node": "7PtzISisT5SiwlBGUi2GzQ", + "id": 2820798, + "type": "transport", + "action": "indices:admin/forcemerge", + "description": "Force-merge indices [_all], , onlyExpungeDeletes[false], flush[true]", + "start_time_in_millis": 1711389911601, + "running_time_in_nanos": 2806258, + "cancellable": False, + "cancelled": False, + "headers": {} + }, + "response": { + "_shards": { + "total": 10, + "successful": 10, + "failed": 0 + } + } + }) force_merge = runner.ForceMerge() - await force_merge(opensearch, params={"index": "_all", "mode": "polling", "poll-period": 0}) - opensearch.indices.forcemerge.assert_called_once_with(index="_all") + await force_merge(opensearch, params={ + "index": "_all", "mode": "polling", 'poll-period': 10, "wait_for_completion": True}) + opensearch.indices.forcemerge.assert_called_once_with(index="_all", wait_for_completion='false') + opensearch.tasks.get.assert_called_once_with(task_id="7PtzISisT5SiwlBGUi2GzQ:2820798") @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async async def test_force_merge_with_polling_and_params(self, opensearch, on_client_request_start, on_client_request_end): - opensearch.indices.forcemerge.return_value = as_future(exception=opensearchpy.ConnectionTimeout()) - opensearch.tasks.list.side_effect = [ + opensearch.indices.forcemerge.return_value = as_future({"task": "7PtzISisT5SiwlBGUi2GzQ:2820798"}) + opensearch.tasks.get.side_effect = [ as_future({ - "nodes": { - "Ap3OfntPT7qL4CBeKvamxg": { - "name": "instance-0000000001", - "transport_address": "10.46.79.231:19693", - "host": "10.46.79.231", - "ip": "10.46.79.231:19693", - "roles": [ - "data", - "ingest", - "master", - "remote_cluster_client", - "transform" - ], - "attributes": { - "logical_availability_zone": "zone-1", - "server_name": "instance-0000000001.64cb4c66f4f24d85b41f120ef2df5526", - "availability_zone": "us-east4-a", - "instance_configuration": "gcp.data.highio.1", - "transform.node": "true", - "region": "unknown-region" - }, - "tasks": { - "Ap3OfntPT7qL4CBeKvamxg:417009036": { - "node": "Ap3OfntPT7qL4CBeKvamxg", - "id": 417009036, - "type": "transport", - "action": "indices:admin/forcemerge", - "start_time_in_millis": 1598018980850, - "running_time_in_nanos": 3659821411, - "cancellable": False, - "headers": {} - } - } - } - } + "completed": False, + "task": { + "node": "7PtzISisT5SiwlBGUi2GzQ", + "id": 2820798, + "type": "transport", + "action": "indices:admin/forcemerge", + "description": "Force-merge indices [_all], , onlyExpungeDeletes[false], flush[true]", + "start_time_in_millis": 1711389911601, + "running_time_in_nanos": 2806258, + "cancellable": False, + "cancelled": False, + "headers": {} + }, + "response": {} }), as_future({ - "nodes": {} + "completed": True, + "task": { + "node": "7PtzISisT5SiwlBGUi2GzQ", + "id": 2820798, + "type": "transport", + "action": "indices:admin/forcemerge", + "description": "Force-merge indices [_all], , onlyExpungeDeletes[false], flush[true]", + "start_time_in_millis": 1711389911601, + "running_time_in_nanos": 2806258, + "cancellable": "false", + "cancelled": "false", + "headers": {} + }, + "response": { + "_shards": { + "total": 10, + "successful": 10, + "failed": 0 + } + } }) ] force_merge = runner.ForceMerge() # request-timeout should be ignored as mode:polling - await force_merge(opensearch, params={"index" : "_all", "mode": "polling", "max-num-segments": 1, - "request-timeout": 50000, "poll-period": 0}) - opensearch.indices.forcemerge.assert_called_once_with(index="_all", max_num_segments=1, request_timeout=50000) + await force_merge(opensearch, params={ + "index": "_all", "mode": "polling", "max-num-segments": 1, "request-timeout": 50000, "poll-period": 10 + }) + opensearch.indices.forcemerge.assert_called_once_with( + index="_all", max_num_segments=1, request_timeout=50000, wait_for_completion='false') + opensearch.tasks.get.assert_called_with(task_id="7PtzISisT5SiwlBGUi2GzQ:2820798") + self.assertEqual(opensearch.tasks.get.call_count, 2) class IndicesStatsRunnerTests(TestCase):