Skip to content

Commit

Permalink
Stash context for GET calls and Modify ITs to ignore transient warnin…
Browse files Browse the repository at this point in the history
…gs (opensearch-project#128)

Signed-off-by: rithin-pullela-aws <[email protected]>
  • Loading branch information
rithin-pullela-aws committed Feb 5, 2025
1 parent 8b4960d commit f857f50
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.opensearch.common.CheckedFunction;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.ParseField;
Expand Down Expand Up @@ -213,7 +214,15 @@ public GetResponse getModel(String name) {
}

private Supplier<GetResponse> internalGet(String id) {
return () -> clientSupplier.get().prepareGet(index, id).get();
return () -> {
Client client = clientSupplier.get();
if (client.threadPool() == null) {
return client.prepareGet(index, id).get();
}
try (ThreadContext.StoredContext ignored = client.threadPool().getThreadContext().stashContext()) {
return client.prepareGet(index, id).get();
}
};
}

/**
Expand Down
13 changes: 8 additions & 5 deletions src/main/java/com/o19s/es/ltr/rest/RestFeatureManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -136,16 +136,19 @@ RestChannelConsumer get(NodeClient client, String type, String indexName, RestRe
String name = request.param("name");
String routing = request.param("routing");
String id = generateId(type, name);
// refresh index before performing get
return (channel) -> {
client.admin().indices().prepareRefresh(indexName).execute(ActionListener.wrap(refreshResponse -> {
client.prepareGet(indexName, id).setRouting(routing).execute(new RestToXContentListener<GetResponse>(channel) {
try (ThreadContext.StoredContext threadContext = client.threadPool().getThreadContext().stashContext()) {
ActionListener<GetResponse> wrappedListener = ActionListener.runBefore(new RestToXContentListener<GetResponse>(channel) {
@Override
protected RestStatus getStatus(final GetResponse response) {
return response.isExists() ? OK : NOT_FOUND;
}
});
}, e -> channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage()))));
}, () -> threadContext.restore());

client.prepareGet(indexName, id).setRouting(routing).execute(wrappedListener);
} catch (Exception e) {
channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage()));
}
};
}

Expand Down
22 changes: 16 additions & 6 deletions src/main/java/com/o19s/es/ltr/rest/RestSearchStoreElements.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,14 @@

import java.util.List;

import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.node.NodeClient;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.ltr.settings.LTRSettings;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.action.RestStatusToXContentListener;

Expand Down Expand Up @@ -65,12 +70,17 @@ RestChannelConsumer search(NodeClient client, String type, String indexName, Res
if (prefix != null && !prefix.isEmpty()) {
qb.must(matchQuery("name.prefix", prefix));
}
return (channel) -> client
.prepareSearch(indexName)
.setQuery(qb)
.setSize(size)
.setFrom(from)
.execute(new RestStatusToXContentListener<>(channel));
return (channel) -> {
try (ThreadContext.StoredContext threadContext = client.threadPool().getThreadContext().stashContext()) {
ActionListener<SearchResponse> searchListener = new RestStatusToXContentListener<>(channel);

ActionListener<SearchResponse> wrappedListener = ActionListener.runBefore(searchListener, () -> threadContext.restore());

client.prepareSearch(indexName).setQuery(qb).setSize(size).setFrom(from).execute(wrappedListener);
} catch (Exception e) {
channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage()));
}
};
}

}
45 changes: 33 additions & 12 deletions src/main/java/com/o19s/es/ltr/rest/RestStoreManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,21 +105,42 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
}

RestChannelConsumer listStores(NodeClient client) {
return (channel) -> new ListStoresAction.ListStoresActionBuilder(client).execute(new RestToXContentListener<>(channel));
return (channel) -> {
try (ThreadContext.StoredContext threadContext = client.threadPool().getThreadContext().stashContext()) {
RestToXContentListener<ListStoresAction.ListStoresActionResponse> listStoresListener = new RestToXContentListener<>(
channel
);

ActionListener<ListStoresAction.ListStoresActionResponse> wrappedListener = ActionListener
.runBefore(listStoresListener, () -> threadContext.restore());

new ListStoresAction.ListStoresActionBuilder(client).execute(wrappedListener);
} catch (Exception e) {
channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage()));
}
};
}

RestChannelConsumer getStore(NodeClient client, String indexName) {
return (channel) -> client
.admin()
.indices()
.prepareExists(indexName)
.execute(new RestBuilderListener<IndicesExistsResponse>(channel) {
@Override
public RestResponse buildResponse(IndicesExistsResponse indicesExistsResponse, XContentBuilder builder) throws Exception {
builder.startObject().field("exists", indicesExistsResponse.isExists()).endObject().close();
return new BytesRestResponse(indicesExistsResponse.isExists() ? RestStatus.OK : RestStatus.NOT_FOUND, builder);
}
});
return (channel) -> {
try (ThreadContext.StoredContext threadContext = client.threadPool().getThreadContext().stashContext()) {
RestBuilderListener<IndicesExistsResponse> existsListener = new RestBuilderListener<IndicesExistsResponse>(channel) {
@Override
public RestResponse buildResponse(IndicesExistsResponse indicesExistsResponse, XContentBuilder builder)
throws Exception {
builder.startObject().field("exists", indicesExistsResponse.isExists()).endObject().close();
return new BytesRestResponse(indicesExistsResponse.isExists() ? RestStatus.OK : RestStatus.NOT_FOUND, builder);
}
};

ActionListener<IndicesExistsResponse> wrappedListener = ActionListener
.runBefore(existsListener, () -> threadContext.restore());

client.admin().indices().prepareExists(indexName).execute(wrappedListener);
} catch (Exception e) {
channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage()));
}
};
}

RestChannelConsumer createIndex(NodeClient client, String indexName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ setup:

- do:
allowed_warnings:
- "this request accesses system indices: [.plugins-ml-config], but in a future major version, direct access to system indices will be prevented by default"
- "this request accesses system indices: [.plugins-ml-config], but in a future major version, direct access to system indices will be prevented by default"
indices.refresh: {}

- do:
Expand Down

0 comments on commit f857f50

Please sign in to comment.