From f857f50fd0728e4cad3be81eb06614a66b1f94ca Mon Sep 17 00:00:00 2001 From: Rithin Pullela Date: Tue, 4 Feb 2025 11:34:26 -0800 Subject: [PATCH] Stash context for GET calls and Modify ITs to ignore transient warnings (#128) Signed-off-by: rithin-pullela-aws --- .../store/index/IndexFeatureStore.java | 11 ++++- .../o19s/es/ltr/rest/RestFeatureManager.java | 13 +++--- .../es/ltr/rest/RestSearchStoreElements.java | 22 ++++++--- .../o19s/es/ltr/rest/RestStoreManager.java | 45 ++++++++++++++----- .../fstore/80_search_w_partial_models.yml | 2 +- 5 files changed, 68 insertions(+), 25 deletions(-) diff --git a/src/main/java/com/o19s/es/ltr/feature/store/index/IndexFeatureStore.java b/src/main/java/com/o19s/es/ltr/feature/store/index/IndexFeatureStore.java index f43c54d..6176cdf 100644 --- a/src/main/java/com/o19s/es/ltr/feature/store/index/IndexFeatureStore.java +++ b/src/main/java/com/o19s/es/ltr/feature/store/index/IndexFeatureStore.java @@ -41,6 +41,7 @@ import org.opensearch.common.CheckedFunction; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.common.xcontent.LoggingDeprecationHandler; import org.opensearch.common.xcontent.XContentType; import org.opensearch.core.ParseField; @@ -213,7 +214,15 @@ public GetResponse getModel(String name) { } private Supplier internalGet(String id) { - return () -> clientSupplier.get().prepareGet(index, id).get(); + return () -> { + Client client = clientSupplier.get(); + if (client.threadPool() == null) { + return client.prepareGet(index, id).get(); + } + try (ThreadContext.StoredContext ignored = client.threadPool().getThreadContext().stashContext()) { + return client.prepareGet(index, id).get(); + } + }; } /** diff --git a/src/main/java/com/o19s/es/ltr/rest/RestFeatureManager.java b/src/main/java/com/o19s/es/ltr/rest/RestFeatureManager.java index ce47e32..2459757 100644 --- a/src/main/java/com/o19s/es/ltr/rest/RestFeatureManager.java +++ b/src/main/java/com/o19s/es/ltr/rest/RestFeatureManager.java @@ -136,16 +136,19 @@ RestChannelConsumer get(NodeClient client, String type, String indexName, RestRe String name = request.param("name"); String routing = request.param("routing"); String id = generateId(type, name); - // refresh index before performing get return (channel) -> { - client.admin().indices().prepareRefresh(indexName).execute(ActionListener.wrap(refreshResponse -> { - client.prepareGet(indexName, id).setRouting(routing).execute(new RestToXContentListener(channel) { + try (ThreadContext.StoredContext threadContext = client.threadPool().getThreadContext().stashContext()) { + ActionListener wrappedListener = ActionListener.runBefore(new RestToXContentListener(channel) { @Override protected RestStatus getStatus(final GetResponse response) { return response.isExists() ? OK : NOT_FOUND; } - }); - }, e -> channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage())))); + }, () -> threadContext.restore()); + + client.prepareGet(indexName, id).setRouting(routing).execute(wrappedListener); + } catch (Exception e) { + channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage())); + } }; } diff --git a/src/main/java/com/o19s/es/ltr/rest/RestSearchStoreElements.java b/src/main/java/com/o19s/es/ltr/rest/RestSearchStoreElements.java index 87c849e..2c19224 100644 --- a/src/main/java/com/o19s/es/ltr/rest/RestSearchStoreElements.java +++ b/src/main/java/com/o19s/es/ltr/rest/RestSearchStoreElements.java @@ -23,9 +23,14 @@ import java.util.List; +import org.opensearch.action.search.SearchResponse; import org.opensearch.client.node.NodeClient; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.rest.RestStatus; import org.opensearch.index.query.BoolQueryBuilder; import org.opensearch.ltr.settings.LTRSettings; +import org.opensearch.rest.BytesRestResponse; import org.opensearch.rest.RestRequest; import org.opensearch.rest.action.RestStatusToXContentListener; @@ -65,12 +70,17 @@ RestChannelConsumer search(NodeClient client, String type, String indexName, Res if (prefix != null && !prefix.isEmpty()) { qb.must(matchQuery("name.prefix", prefix)); } - return (channel) -> client - .prepareSearch(indexName) - .setQuery(qb) - .setSize(size) - .setFrom(from) - .execute(new RestStatusToXContentListener<>(channel)); + return (channel) -> { + try (ThreadContext.StoredContext threadContext = client.threadPool().getThreadContext().stashContext()) { + ActionListener searchListener = new RestStatusToXContentListener<>(channel); + + ActionListener wrappedListener = ActionListener.runBefore(searchListener, () -> threadContext.restore()); + + client.prepareSearch(indexName).setQuery(qb).setSize(size).setFrom(from).execute(wrappedListener); + } catch (Exception e) { + channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage())); + } + }; } } diff --git a/src/main/java/com/o19s/es/ltr/rest/RestStoreManager.java b/src/main/java/com/o19s/es/ltr/rest/RestStoreManager.java index dd77f28..37bba74 100644 --- a/src/main/java/com/o19s/es/ltr/rest/RestStoreManager.java +++ b/src/main/java/com/o19s/es/ltr/rest/RestStoreManager.java @@ -105,21 +105,42 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli } RestChannelConsumer listStores(NodeClient client) { - return (channel) -> new ListStoresAction.ListStoresActionBuilder(client).execute(new RestToXContentListener<>(channel)); + return (channel) -> { + try (ThreadContext.StoredContext threadContext = client.threadPool().getThreadContext().stashContext()) { + RestToXContentListener listStoresListener = new RestToXContentListener<>( + channel + ); + + ActionListener wrappedListener = ActionListener + .runBefore(listStoresListener, () -> threadContext.restore()); + + new ListStoresAction.ListStoresActionBuilder(client).execute(wrappedListener); + } catch (Exception e) { + channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage())); + } + }; } RestChannelConsumer getStore(NodeClient client, String indexName) { - return (channel) -> client - .admin() - .indices() - .prepareExists(indexName) - .execute(new RestBuilderListener(channel) { - @Override - public RestResponse buildResponse(IndicesExistsResponse indicesExistsResponse, XContentBuilder builder) throws Exception { - builder.startObject().field("exists", indicesExistsResponse.isExists()).endObject().close(); - return new BytesRestResponse(indicesExistsResponse.isExists() ? RestStatus.OK : RestStatus.NOT_FOUND, builder); - } - }); + return (channel) -> { + try (ThreadContext.StoredContext threadContext = client.threadPool().getThreadContext().stashContext()) { + RestBuilderListener existsListener = new RestBuilderListener(channel) { + @Override + public RestResponse buildResponse(IndicesExistsResponse indicesExistsResponse, XContentBuilder builder) + throws Exception { + builder.startObject().field("exists", indicesExistsResponse.isExists()).endObject().close(); + return new BytesRestResponse(indicesExistsResponse.isExists() ? RestStatus.OK : RestStatus.NOT_FOUND, builder); + } + }; + + ActionListener wrappedListener = ActionListener + .runBefore(existsListener, () -> threadContext.restore()); + + client.admin().indices().prepareExists(indexName).execute(wrappedListener); + } catch (Exception e) { + channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage())); + } + }; } RestChannelConsumer createIndex(NodeClient client, String indexName) { diff --git a/src/test/resources/rest-api-spec/test/fstore/80_search_w_partial_models.yml b/src/test/resources/rest-api-spec/test/fstore/80_search_w_partial_models.yml index 4bb76a3..91931db 100644 --- a/src/test/resources/rest-api-spec/test/fstore/80_search_w_partial_models.yml +++ b/src/test/resources/rest-api-spec/test/fstore/80_search_w_partial_models.yml @@ -69,7 +69,7 @@ setup: - do: allowed_warnings: - - "this request accesses system indices: [.plugins-ml-config], but in a future major version, direct access to system indices will be prevented by default" + - "this request accesses system indices: [.plugins-ml-config], but in a future major version, direct access to system indices will be prevented by default" indices.refresh: {} - do: