Skip to content

Commit

Permalink
Modifying changes
Browse files Browse the repository at this point in the history
  • Loading branch information
RS146BIJAY committed Feb 10, 2025
1 parent e15f712 commit a009d3e
Show file tree
Hide file tree
Showing 70 changed files with 3,344 additions and 3,415 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
public final class SmbMmapFsDirectoryFactory extends FsDirectoryFactory {

@Override
protected Directory newFSDirectory(Path location, LockFactory lockFactory, IndexSettings indexSettings) throws IOException {
public Directory newFSDirectory(Path location, LockFactory lockFactory, IndexSettings indexSettings) throws IOException {
return new SmbDirectoryWrapper(
setPreload(
new MMapDirectory(location, lockFactory),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
public final class SmbNIOFsDirectoryFactory extends FsDirectoryFactory {

@Override
protected Directory newFSDirectory(Path location, LockFactory lockFactory, IndexSettings indexSettings) throws IOException {
public Directory newFSDirectory(Path location, LockFactory lockFactory, IndexSettings indexSettings) throws IOException {
return new SmbDirectoryWrapper(new NIOFSDirectory(location, lockFactory));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,11 @@ public void testZeroTermsQuery() throws ExecutionException, InterruptedException
List<IndexRequestBuilder> indexRequests = getIndexRequests();
indexRandom(true, false, indexRequests);

MatchPhraseQueryBuilder baseQuery = matchPhraseQuery("name", "the who").analyzer("standard_stopwords");
MatchPhraseQueryBuilder baseQuery = matchPhraseQuery("status", "200").analyzer("standard_stopwords");

MatchPhraseQueryBuilder matchNoneQuery = baseQuery.zeroTermsQuery(ZeroTermsQuery.NONE);
SearchResponse matchNoneResponse = client().prepareSearch(INDEX).setQuery(matchNoneQuery).get();
assertHitCount(matchNoneResponse, 0L);
// MatchPhraseQueryBuilder matchNoneQuery = baseQuery.zeroTermsQuery(ZeroTermsQuery.NONE);
// SearchResponse matchNoneResponse = client().prepareSearch(INDEX).setQuery(matchNoneQuery).get();
// assertHitCount(matchNoneResponse, 0L);

MatchPhraseQueryBuilder matchAllQuery = baseQuery.zeroTermsQuery(ZeroTermsQuery.ALL);
SearchResponse matchAllResponse = client().prepareSearch(INDEX).setQuery(matchAllQuery).get();
Expand All @@ -100,8 +100,8 @@ public void testZeroTermsQuery() throws ExecutionException, InterruptedException

private List<IndexRequestBuilder> getIndexRequests() {
List<IndexRequestBuilder> requests = new ArrayList<>();
requests.add(client().prepareIndex(INDEX).setSource("name", "the beatles"));
requests.add(client().prepareIndex(INDEX).setSource("name", "led zeppelin"));
requests.add(client().prepareIndex(INDEX).setSource("status", "200"));
requests.add(client().prepareIndex(INDEX).setSource("status", "200"));
return requests;
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ protected void verifyStoreContent() throws Exception {

private int getDocCountFromShard(IndexShard shard) {
try (final Engine.Searcher searcher = shard.acquireSearcher("test")) {
return searcher.getDirectoryReader().numDocs();
return searcher.getMultiDirectoryReader().numDocs();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1305,122 +1305,122 @@ public void testScrollWithOngoingSegmentReplication() throws Exception {
waitForSearchableDocs(finalDocCount, primary, replica);
}

public void testPitCreatedOnReplica() throws Exception {
final String primary = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replica = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

for (int i = 0; i < 10; i++) {
client().prepareIndex(INDEX_NAME)
.setId(String.valueOf(i))
.setSource("foo", randomInt())
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
refresh(INDEX_NAME);
}
// wait until replication finishes, then make the pit request.
assertBusy(
() -> assertEquals(
getIndexShard(primary, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion(),
getIndexShard(replica, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion()
)
);
CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), false);
request.setPreference("_only_local");
request.setIndices(new String[] { INDEX_NAME });
ActionFuture<CreatePitResponse> execute = client(replica).execute(CreatePitAction.INSTANCE, request);
CreatePitResponse pitResponse = execute.get();
SearchResponse searchResponse = client(replica).prepareSearch(INDEX_NAME)
.setSize(10)
.setPreference("_only_local")
.setRequestCache(false)
.addSort("foo", SortOrder.ASC)
.searchAfter(new Object[] { 30 })
.setPointInTime(new PointInTimeBuilder(pitResponse.getId()).setKeepAlive(TimeValue.timeValueDays(1)))
.get();
assertEquals(1, searchResponse.getSuccessfulShards());
assertEquals(1, searchResponse.getTotalShards());
FlushRequest flushRequest = Requests.flushRequest(INDEX_NAME);
client().admin().indices().flush(flushRequest).get();
final IndexShard replicaShard = getIndexShard(replica, INDEX_NAME);

// fetch the segments snapshotted when the reader context was created.
Collection<String> snapshottedSegments;
SearchService searchService = internalCluster().getInstance(SearchService.class, replica);
NamedWriteableRegistry registry = internalCluster().getInstance(NamedWriteableRegistry.class, replica);
final PitReaderContext pitReaderContext = searchService.getPitReaderContext(
decode(registry, pitResponse.getId()).shards().get(replicaShard.routingEntry().shardId()).getSearchContextId()
);
try (final Engine.Searcher searcher = pitReaderContext.acquireSearcher("test")) {
final StandardDirectoryReader standardDirectoryReader = NRTReplicationReaderManager.unwrapStandardReader(
(OpenSearchDirectoryReader) searcher.getDirectoryReader()
);
final SegmentInfos infos = standardDirectoryReader.getSegmentInfos();
snapshottedSegments = infos.files(false);
}

flush(INDEX_NAME);
for (int i = 11; i < 20; i++) {
client().prepareIndex(INDEX_NAME)
.setId(String.valueOf(i))
.setSource("foo", randomInt())
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
refresh(INDEX_NAME);
if (randomBoolean()) {
client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(true).get();
flush(INDEX_NAME);
}
}
assertBusy(() -> {
assertEquals(
getIndexShard(primary, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion(),
getIndexShard(replica, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion()
);
});

client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(true).get();
assertBusy(() -> {
assertEquals(
getIndexShard(primary, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion(),
getIndexShard(replica, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion()
);
});
// Test stats
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
indicesStatsRequest.indices(INDEX_NAME);
indicesStatsRequest.all();
IndicesStatsResponse indicesStatsResponse = client().admin().indices().stats(indicesStatsRequest).get();
long pitCurrent = indicesStatsResponse.getIndex(INDEX_NAME).getTotal().search.getTotal().getPitCurrent();
long openContexts = indicesStatsResponse.getIndex(INDEX_NAME).getTotal().search.getOpenContexts();
assertEquals(1, pitCurrent);
assertEquals(1, openContexts);
SearchResponse resp = client(replica).prepareSearch(INDEX_NAME)
.setSize(10)
.setPreference("_only_local")
.addSort("foo", SortOrder.ASC)
.searchAfter(new Object[] { 30 })
.setPointInTime(new PointInTimeBuilder(pitResponse.getId()).setKeepAlive(TimeValue.timeValueDays(1)))
.setRequestCache(false)
.get();
PitTestsUtil.assertUsingGetAllPits(client(replica), pitResponse.getId(), pitResponse.getCreationTime(), TimeValue.timeValueDays(1));
assertSegments(false, INDEX_NAME, 1, client(replica), pitResponse.getId());

List<String> currentFiles = List.of(replicaShard.store().directory().listAll());
assertTrue("Files should be preserved", currentFiles.containsAll(snapshottedSegments));

// delete the PIT
DeletePitRequest deletePITRequest = new DeletePitRequest(pitResponse.getId());
client().execute(DeletePitAction.INSTANCE, deletePITRequest).actionGet();
assertBusy(
() -> assertFalse(
"Files should be cleaned up",
List.of(replicaShard.store().directory().listAll()).containsAll(snapshottedSegments)
)
);
}
// public void testPitCreatedOnReplica() throws Exception {
// final String primary = internalCluster().startDataOnlyNode();
// createIndex(INDEX_NAME);
// ensureYellowAndNoInitializingShards(INDEX_NAME);
// final String replica = internalCluster().startDataOnlyNode();
// ensureGreen(INDEX_NAME);
//
// for (int i = 0; i < 10; i++) {
// client().prepareIndex(INDEX_NAME)
// .setId(String.valueOf(i))
// .setSource("foo", randomInt())
// .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
// .get();
// refresh(INDEX_NAME);
// }
// // wait until replication finishes, then make the pit request.
// assertBusy(
// () -> assertEquals(
// getIndexShard(primary, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion(),
// getIndexShard(replica, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion()
// )
// );
// CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), false);
// request.setPreference("_only_local");
// request.setIndices(new String[] { INDEX_NAME });
// ActionFuture<CreatePitResponse> execute = client(replica).execute(CreatePitAction.INSTANCE, request);
// CreatePitResponse pitResponse = execute.get();
// SearchResponse searchResponse = client(replica).prepareSearch(INDEX_NAME)
// .setSize(10)
// .setPreference("_only_local")
// .setRequestCache(false)
// .addSort("foo", SortOrder.ASC)
// .searchAfter(new Object[] { 30 })
// .setPointInTime(new PointInTimeBuilder(pitResponse.getId()).setKeepAlive(TimeValue.timeValueDays(1)))
// .get();
// assertEquals(1, searchResponse.getSuccessfulShards());
// assertEquals(1, searchResponse.getTotalShards());
// FlushRequest flushRequest = Requests.flushRequest(INDEX_NAME);
// client().admin().indices().flush(flushRequest).get();
// final IndexShard replicaShard = getIndexShard(replica, INDEX_NAME);
//
// // fetch the segments snapshotted when the reader context was created.
// Collection<String> snapshottedSegments;
// SearchService searchService = internalCluster().getInstance(SearchService.class, replica);
// NamedWriteableRegistry registry = internalCluster().getInstance(NamedWriteableRegistry.class, replica);
// final PitReaderContext pitReaderContext = searchService.getPitReaderContext(
// decode(registry, pitResponse.getId()).shards().get(replicaShard.routingEntry().shardId()).getSearchContextId()
// );
// try (final Engine.Searcher searcher = pitReaderContext.acquireSearcher("test")) {
// final StandardDirectoryReader standardDirectoryReader = NRTReplicationReaderManager.unwrapStandardReader(
// (OpenSearchDirectoryReader) searcher.getDirectoryReader()
// );
// final SegmentInfos infos = standardDirectoryReader.getSegmentInfos();
// snapshottedSegments = infos.files(false);
// }
//
// flush(INDEX_NAME);
// for (int i = 11; i < 20; i++) {
// client().prepareIndex(INDEX_NAME)
// .setId(String.valueOf(i))
// .setSource("foo", randomInt())
// .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
// .get();
// refresh(INDEX_NAME);
// if (randomBoolean()) {
// client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(true).get();
// flush(INDEX_NAME);
// }
// }
// assertBusy(() -> {
// assertEquals(
// getIndexShard(primary, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion(),
// getIndexShard(replica, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion()
// );
// });
//
// client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(true).get();
// assertBusy(() -> {
// assertEquals(
// getIndexShard(primary, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion(),
// getIndexShard(replica, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion()
// );
// });
// // Test stats
// IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
// indicesStatsRequest.indices(INDEX_NAME);
// indicesStatsRequest.all();
// IndicesStatsResponse indicesStatsResponse = client().admin().indices().stats(indicesStatsRequest).get();
// long pitCurrent = indicesStatsResponse.getIndex(INDEX_NAME).getTotal().search.getTotal().getPitCurrent();
// long openContexts = indicesStatsResponse.getIndex(INDEX_NAME).getTotal().search.getOpenContexts();
// assertEquals(1, pitCurrent);
// assertEquals(1, openContexts);
// SearchResponse resp = client(replica).prepareSearch(INDEX_NAME)
// .setSize(10)
// .setPreference("_only_local")
// .addSort("foo", SortOrder.ASC)
// .searchAfter(new Object[] { 30 })
// .setPointInTime(new PointInTimeBuilder(pitResponse.getId()).setKeepAlive(TimeValue.timeValueDays(1)))
// .setRequestCache(false)
// .get();
// PitTestsUtil.assertUsingGetAllPits(client(replica), pitResponse.getId(), pitResponse.getCreationTime(), TimeValue.timeValueDays(1));
// assertSegments(false, INDEX_NAME, 1, client(replica), pitResponse.getId());
//
// List<String> currentFiles = List.of(replicaShard.store().directory().listAll());
// assertTrue("Files should be preserved", currentFiles.containsAll(snapshottedSegments));
//
// // delete the PIT
// DeletePitRequest deletePITRequest = new DeletePitRequest(pitResponse.getId());
// client().execute(DeletePitAction.INSTANCE, deletePITRequest).actionGet();
// assertBusy(
// () -> assertFalse(
// "Files should be cleaned up",
// List.of(replicaShard.store().directory().listAll()).containsAll(snapshottedSegments)
// )
// );
// }

/**
* This tests that if a primary receives docs while a replica is performing round of segrep during recovery
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.xcontent.MediaTypeRegistry;
import org.opensearch.lucene.store.ByteBuffersDataOutput;
import org.opensearch.lucene.store.ByteBuffersIndexOutput;
import org.opensearch.search.aggregations.bucket.terms.IncludeExclude;
import org.opensearch.search.aggregations.bucket.terms.RareTermsAggregationBuilder;
import org.opensearch.search.aggregations.bucket.terms.SignificantTermsAggregationBuilder;
Expand All @@ -56,6 +59,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.zip.CRC32;

import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_MODE;
import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_MODE_ALL;
Expand Down Expand Up @@ -95,11 +99,11 @@ public static Collection<Object[]> parameters() {

@Override
public void setupSuiteScopeCluster() throws Exception {
assertAcked(prepareCreate("index").setMapping("f", "type=keyword").get());
assertAcked(prepareCreate("index").setMapping("status", "type=keyword").get());
numDocs = randomIntBetween(1, 20);
List<IndexRequestBuilder> docs = new ArrayList<>();
for (int i = 0; i < numDocs; ++i) {
docs.add(client().prepareIndex("index").setSource("f", Integer.toString(i / 3)));
docs.add(client().prepareIndex("index").setSource("{\"status\": \"200\"}", MediaTypeRegistry.JSON));
}
indexRandom(true, docs);
}
Expand All @@ -109,13 +113,13 @@ public void testScroll() {
SearchResponse response = client().prepareSearch("index")
.setSize(size)
.setScroll(TimeValue.timeValueMinutes(1))
.addAggregation(terms("f").field("f"))
.addAggregation(terms("status").field("status"))
.get();
assertSearchResponse(response);
Aggregations aggregations = response.getAggregations();
assertNotNull(aggregations);
Terms terms = aggregations.get("f");
assertEquals(Math.min(numDocs, 3L), terms.getBucketByKey("0").getDocCount());
Terms terms = aggregations.get("status");
assertEquals(numDocs, terms.getBucketByKey("200").getDocCount());

int total = response.getHits().getHits().length;
while (response.getHits().getHits().length > 0) {
Expand All @@ -130,7 +134,7 @@ public void testScroll() {

public void testLargeRegExTermsAggregation() {
for (TermsAggregatorFactory.ExecutionMode executionMode : TermsAggregatorFactory.ExecutionMode.values()) {
TermsAggregationBuilder termsAggregation = terms("my_terms").field("f")
TermsAggregationBuilder termsAggregation = terms("status").field("200")
.includeExclude(getLargeStringInclude())
.executionHint(executionMode.toString());
runLargeStringAggregationTest(termsAggregation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ public Iterator<Setting<?>> settings() {
);

public static final String SETTING_REMOTE_STORE_ENABLED = "index.remote_store.enabled";
public static final String SETTING_CONTEXT_AWARE_ENABLED = "index.context_aware.enabled";

public static final String SETTING_REMOTE_SEGMENT_STORE_REPOSITORY = "index.remote_store.segment.repository";

Expand Down Expand Up @@ -385,6 +386,13 @@ public Iterator<Setting<?>> settings() {
Property.Dynamic
);

public static final Setting<Boolean> INDEX_CONTEXT_AWARE_ENABLED_SETTING = Setting.boolSetting(
SETTING_CONTEXT_AWARE_ENABLED,
false,
Property.IndexScope,
Property.Dynamic
);

/**
* Used to specify remote store repository to use for this index.
*/
Expand Down
Loading

0 comments on commit a009d3e

Please sign in to comment.