Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x][Tiered Caching] Enable serialization of IndicesRequestCache.Key (#10… #11972

Merged
merged 4 commits into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add search query categorizor ([#10255](https://github.com/opensearch-project/OpenSearch/pull/10255))
- Per request phase latency ([#10351](https://github.com/opensearch-project/OpenSearch/issues/10351))
- Add cluster state stats ([#10670](https://github.com/opensearch-project/OpenSearch/pull/10670))
- [Tiered caching] Enabling serialization for IndicesRequestCache key object ([#10275](https://github.com/opensearch-project/OpenSearch/pull/10275))
- [Tiered caching] Defining interfaces, listeners and extending IndicesRequestCache with Tiered cache support ([#10753](https://github.com/opensearch-project/OpenSearch/pull/10753))
- Remove ingest processor supports excluding fields ([#10967](https://github.com/opensearch-project/OpenSearch/pull/10967), [#11983](https://github.com/opensearch-project/OpenSearch/pull/11983))
- [Remote cluster state] Restore cluster state version during remote state auto restore ([#10853](https://github.com/opensearch-project/OpenSearch/pull/10853))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.core.index.shard;

import org.apache.lucene.util.RamUsageEstimator;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.io.stream.StreamInput;
Expand All @@ -55,6 +56,8 @@ public class ShardId implements Comparable<ShardId>, ToXContentFragment, Writeab
private final int shardId;
private final int hashCode;

private final static long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(ShardId.class);

/**
* Constructs a new shard id.
* @param index the index name
Expand Down Expand Up @@ -88,6 +91,10 @@ public ShardId(StreamInput in) throws IOException {
hashCode = computeHashCode();
}

public long getBaseRamBytesUsed() {
return BASE_RAM_BYTES_USED;
}

/**
* Writes this shard id to a stream.
* @param out the stream to write to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,45 @@ public void testProfileDisableCache() throws Exception {
}
}

public void testCacheWithInvalidation() throws Exception {
Client client = client();
assertAcked(
client.admin()
.indices()
.prepareCreate("index")
.setMapping("k", "type=keyword")
.setSettings(
Settings.builder()
.put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
)
.get()
);
indexRandom(true, client.prepareIndex("index").setSource("k", "hello"));
ensureSearchable("index");
SearchResponse resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get();
assertSearchResponse(resp);
OpenSearchAssertions.assertAllSuccessful(resp);
assertThat(resp.getHits().getTotalHits().value, equalTo(1L));

assertCacheState(client, "index", 0, 1);
// Index but don't refresh
indexRandom(false, client.prepareIndex("index").setSource("k", "hello2"));
resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get();
assertSearchResponse(resp);
// Should expect hit as here as refresh didn't happen
assertCacheState(client, "index", 1, 1);

// Explicit refresh would invalidate cache
refresh();
// Hit same query again
resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get();
assertSearchResponse(resp);
// Should expect miss as key has changed due to change in IndexReader.CacheKey (due to refresh)
assertCacheState(client, "index", 1, 2);
}

private static void assertCacheState(Client client, String index, long expectedHits, long expectedMisses) {
RequestCacheStats requestCacheStats = client.admin()
.indices()
Expand All @@ -644,6 +683,7 @@ private static void assertCacheState(Client client, String index, long expectedH
Arrays.asList(expectedHits, expectedMisses, 0L),
Arrays.asList(requestCacheStats.getHitCount(), requestCacheStats.getMissCount(), requestCacheStats.getEvictions())
);

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import org.opensearch.core.index.shard.ShardId;

import java.io.IOException;
import java.util.Optional;
import java.util.UUID;

/**
* A {@link org.apache.lucene.index.FilterDirectoryReader} that exposes
Expand All @@ -53,11 +55,14 @@ public final class OpenSearchDirectoryReader extends FilterDirectoryReader {
private final ShardId shardId;
private final FilterDirectoryReader.SubReaderWrapper wrapper;

private final DelegatingCacheHelper delegatingCacheHelper;

private OpenSearchDirectoryReader(DirectoryReader in, FilterDirectoryReader.SubReaderWrapper wrapper, ShardId shardId)
throws IOException {
super(in, wrapper);
this.wrapper = wrapper;
this.shardId = shardId;
this.delegatingCacheHelper = new DelegatingCacheHelper(in.getReaderCacheHelper());
}

/**
Expand All @@ -70,7 +75,61 @@ public ShardId shardId() {
@Override
public CacheHelper getReaderCacheHelper() {
// safe to delegate since this reader does not alter the index
return in.getReaderCacheHelper();
return this.delegatingCacheHelper;
}

public DelegatingCacheHelper getDelegatingCacheHelper() {
return this.delegatingCacheHelper;
}

/**
* Wraps existing IndexReader cache helper which internally provides a way to wrap CacheKey.
* @opensearch.internal
*/
public class DelegatingCacheHelper implements CacheHelper {
private final CacheHelper cacheHelper;
private final DelegatingCacheKey serializableCacheKey;

DelegatingCacheHelper(CacheHelper cacheHelper) {
this.cacheHelper = cacheHelper;
this.serializableCacheKey = new DelegatingCacheKey(Optional.ofNullable(cacheHelper).map(key -> getKey()).orElse(null));
}

@Override
public CacheKey getKey() {
return this.cacheHelper.getKey();
}

public DelegatingCacheKey getDelegatingCacheKey() {
return this.serializableCacheKey;
}

@Override
public void addClosedListener(ClosedListener listener) {
this.cacheHelper.addClosedListener(listener);
}
}

/**
* Wraps internal IndexReader.CacheKey and attaches a uniqueId to it which can be eventually be used instead of
* object itself for serialization purposes.
*/
public class DelegatingCacheKey {
private final CacheKey cacheKey;
private final String uniqueId;

DelegatingCacheKey(CacheKey cacheKey) {
this.cacheKey = cacheKey;
this.uniqueId = UUID.randomUUID().toString();
}

public CacheKey getCacheKey() {
return this.cacheKey;
}

public String getId() {
return uniqueId;
}
}

@Override
Expand Down
Loading
Loading