Skip to content

Commit

Permalink
Modifying changes
Browse files Browse the repository at this point in the history
  • Loading branch information
RS146BIJAY committed Feb 4, 2025
1 parent e15f712 commit 16eb81f
Show file tree
Hide file tree
Showing 46 changed files with 1,587 additions and 569 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
public final class SmbMmapFsDirectoryFactory extends FsDirectoryFactory {

@Override
protected Directory newFSDirectory(Path location, LockFactory lockFactory, IndexSettings indexSettings) throws IOException {
public Directory newFSDirectory(Path location, LockFactory lockFactory, IndexSettings indexSettings) throws IOException {
return new SmbDirectoryWrapper(
setPreload(
new MMapDirectory(location, lockFactory),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
public final class SmbNIOFsDirectoryFactory extends FsDirectoryFactory {

@Override
protected Directory newFSDirectory(Path location, LockFactory lockFactory, IndexSettings indexSettings) throws IOException {
public Directory newFSDirectory(Path location, LockFactory lockFactory, IndexSettings indexSettings) throws IOException {
return new SmbDirectoryWrapper(new NIOFSDirectory(location, lockFactory));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1305,122 +1305,122 @@ public void testScrollWithOngoingSegmentReplication() throws Exception {
waitForSearchableDocs(finalDocCount, primary, replica);
}

public void testPitCreatedOnReplica() throws Exception {
final String primary = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replica = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

for (int i = 0; i < 10; i++) {
client().prepareIndex(INDEX_NAME)
.setId(String.valueOf(i))
.setSource("foo", randomInt())
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
refresh(INDEX_NAME);
}
// wait until replication finishes, then make the pit request.
assertBusy(
() -> assertEquals(
getIndexShard(primary, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion(),
getIndexShard(replica, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion()
)
);
CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), false);
request.setPreference("_only_local");
request.setIndices(new String[] { INDEX_NAME });
ActionFuture<CreatePitResponse> execute = client(replica).execute(CreatePitAction.INSTANCE, request);
CreatePitResponse pitResponse = execute.get();
SearchResponse searchResponse = client(replica).prepareSearch(INDEX_NAME)
.setSize(10)
.setPreference("_only_local")
.setRequestCache(false)
.addSort("foo", SortOrder.ASC)
.searchAfter(new Object[] { 30 })
.setPointInTime(new PointInTimeBuilder(pitResponse.getId()).setKeepAlive(TimeValue.timeValueDays(1)))
.get();
assertEquals(1, searchResponse.getSuccessfulShards());
assertEquals(1, searchResponse.getTotalShards());
FlushRequest flushRequest = Requests.flushRequest(INDEX_NAME);
client().admin().indices().flush(flushRequest).get();
final IndexShard replicaShard = getIndexShard(replica, INDEX_NAME);

// fetch the segments snapshotted when the reader context was created.
Collection<String> snapshottedSegments;
SearchService searchService = internalCluster().getInstance(SearchService.class, replica);
NamedWriteableRegistry registry = internalCluster().getInstance(NamedWriteableRegistry.class, replica);
final PitReaderContext pitReaderContext = searchService.getPitReaderContext(
decode(registry, pitResponse.getId()).shards().get(replicaShard.routingEntry().shardId()).getSearchContextId()
);
try (final Engine.Searcher searcher = pitReaderContext.acquireSearcher("test")) {
final StandardDirectoryReader standardDirectoryReader = NRTReplicationReaderManager.unwrapStandardReader(
(OpenSearchDirectoryReader) searcher.getDirectoryReader()
);
final SegmentInfos infos = standardDirectoryReader.getSegmentInfos();
snapshottedSegments = infos.files(false);
}

flush(INDEX_NAME);
for (int i = 11; i < 20; i++) {
client().prepareIndex(INDEX_NAME)
.setId(String.valueOf(i))
.setSource("foo", randomInt())
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
refresh(INDEX_NAME);
if (randomBoolean()) {
client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(true).get();
flush(INDEX_NAME);
}
}
assertBusy(() -> {
assertEquals(
getIndexShard(primary, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion(),
getIndexShard(replica, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion()
);
});

client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(true).get();
assertBusy(() -> {
assertEquals(
getIndexShard(primary, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion(),
getIndexShard(replica, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion()
);
});
// Test stats
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
indicesStatsRequest.indices(INDEX_NAME);
indicesStatsRequest.all();
IndicesStatsResponse indicesStatsResponse = client().admin().indices().stats(indicesStatsRequest).get();
long pitCurrent = indicesStatsResponse.getIndex(INDEX_NAME).getTotal().search.getTotal().getPitCurrent();
long openContexts = indicesStatsResponse.getIndex(INDEX_NAME).getTotal().search.getOpenContexts();
assertEquals(1, pitCurrent);
assertEquals(1, openContexts);
SearchResponse resp = client(replica).prepareSearch(INDEX_NAME)
.setSize(10)
.setPreference("_only_local")
.addSort("foo", SortOrder.ASC)
.searchAfter(new Object[] { 30 })
.setPointInTime(new PointInTimeBuilder(pitResponse.getId()).setKeepAlive(TimeValue.timeValueDays(1)))
.setRequestCache(false)
.get();
PitTestsUtil.assertUsingGetAllPits(client(replica), pitResponse.getId(), pitResponse.getCreationTime(), TimeValue.timeValueDays(1));
assertSegments(false, INDEX_NAME, 1, client(replica), pitResponse.getId());

List<String> currentFiles = List.of(replicaShard.store().directory().listAll());
assertTrue("Files should be preserved", currentFiles.containsAll(snapshottedSegments));

// delete the PIT
DeletePitRequest deletePITRequest = new DeletePitRequest(pitResponse.getId());
client().execute(DeletePitAction.INSTANCE, deletePITRequest).actionGet();
assertBusy(
() -> assertFalse(
"Files should be cleaned up",
List.of(replicaShard.store().directory().listAll()).containsAll(snapshottedSegments)
)
);
}
// public void testPitCreatedOnReplica() throws Exception {
// final String primary = internalCluster().startDataOnlyNode();
// createIndex(INDEX_NAME);
// ensureYellowAndNoInitializingShards(INDEX_NAME);
// final String replica = internalCluster().startDataOnlyNode();
// ensureGreen(INDEX_NAME);
//
// for (int i = 0; i < 10; i++) {
// client().prepareIndex(INDEX_NAME)
// .setId(String.valueOf(i))
// .setSource("foo", randomInt())
// .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
// .get();
// refresh(INDEX_NAME);
// }
// // wait until replication finishes, then make the pit request.
// assertBusy(
// () -> assertEquals(
// getIndexShard(primary, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion(),
// getIndexShard(replica, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion()
// )
// );
// CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), false);
// request.setPreference("_only_local");
// request.setIndices(new String[] { INDEX_NAME });
// ActionFuture<CreatePitResponse> execute = client(replica).execute(CreatePitAction.INSTANCE, request);
// CreatePitResponse pitResponse = execute.get();
// SearchResponse searchResponse = client(replica).prepareSearch(INDEX_NAME)
// .setSize(10)
// .setPreference("_only_local")
// .setRequestCache(false)
// .addSort("foo", SortOrder.ASC)
// .searchAfter(new Object[] { 30 })
// .setPointInTime(new PointInTimeBuilder(pitResponse.getId()).setKeepAlive(TimeValue.timeValueDays(1)))
// .get();
// assertEquals(1, searchResponse.getSuccessfulShards());
// assertEquals(1, searchResponse.getTotalShards());
// FlushRequest flushRequest = Requests.flushRequest(INDEX_NAME);
// client().admin().indices().flush(flushRequest).get();
// final IndexShard replicaShard = getIndexShard(replica, INDEX_NAME);
//
// // fetch the segments snapshotted when the reader context was created.
// Collection<String> snapshottedSegments;
// SearchService searchService = internalCluster().getInstance(SearchService.class, replica);
// NamedWriteableRegistry registry = internalCluster().getInstance(NamedWriteableRegistry.class, replica);
// final PitReaderContext pitReaderContext = searchService.getPitReaderContext(
// decode(registry, pitResponse.getId()).shards().get(replicaShard.routingEntry().shardId()).getSearchContextId()
// );
// try (final Engine.Searcher searcher = pitReaderContext.acquireSearcher("test")) {
// final StandardDirectoryReader standardDirectoryReader = NRTReplicationReaderManager.unwrapStandardReader(
// (OpenSearchDirectoryReader) searcher.getDirectoryReader()
// );
// final SegmentInfos infos = standardDirectoryReader.getSegmentInfos();
// snapshottedSegments = infos.files(false);
// }
//
// flush(INDEX_NAME);
// for (int i = 11; i < 20; i++) {
// client().prepareIndex(INDEX_NAME)
// .setId(String.valueOf(i))
// .setSource("foo", randomInt())
// .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
// .get();
// refresh(INDEX_NAME);
// if (randomBoolean()) {
// client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(true).get();
// flush(INDEX_NAME);
// }
// }
// assertBusy(() -> {
// assertEquals(
// getIndexShard(primary, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion(),
// getIndexShard(replica, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion()
// );
// });
//
// client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(true).get();
// assertBusy(() -> {
// assertEquals(
// getIndexShard(primary, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion(),
// getIndexShard(replica, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion()
// );
// });
// // Test stats
// IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
// indicesStatsRequest.indices(INDEX_NAME);
// indicesStatsRequest.all();
// IndicesStatsResponse indicesStatsResponse = client().admin().indices().stats(indicesStatsRequest).get();
// long pitCurrent = indicesStatsResponse.getIndex(INDEX_NAME).getTotal().search.getTotal().getPitCurrent();
// long openContexts = indicesStatsResponse.getIndex(INDEX_NAME).getTotal().search.getOpenContexts();
// assertEquals(1, pitCurrent);
// assertEquals(1, openContexts);
// SearchResponse resp = client(replica).prepareSearch(INDEX_NAME)
// .setSize(10)
// .setPreference("_only_local")
// .addSort("foo", SortOrder.ASC)
// .searchAfter(new Object[] { 30 })
// .setPointInTime(new PointInTimeBuilder(pitResponse.getId()).setKeepAlive(TimeValue.timeValueDays(1)))
// .setRequestCache(false)
// .get();
// PitTestsUtil.assertUsingGetAllPits(client(replica), pitResponse.getId(), pitResponse.getCreationTime(), TimeValue.timeValueDays(1));
// assertSegments(false, INDEX_NAME, 1, client(replica), pitResponse.getId());
//
// List<String> currentFiles = List.of(replicaShard.store().directory().listAll());
// assertTrue("Files should be preserved", currentFiles.containsAll(snapshottedSegments));
//
// // delete the PIT
// DeletePitRequest deletePITRequest = new DeletePitRequest(pitResponse.getId());
// client().execute(DeletePitAction.INSTANCE, deletePITRequest).actionGet();
// assertBusy(
// () -> assertFalse(
// "Files should be cleaned up",
// List.of(replicaShard.store().directory().listAll()).containsAll(snapshottedSegments)
// )
// );
// }

/**
* This tests that if a primary receives docs while a replica is performing round of segrep during recovery
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ public Iterator<Setting<?>> settings() {
);

public static final String SETTING_REMOTE_STORE_ENABLED = "index.remote_store.enabled";
public static final String SETTING_CONTEXT_AWARE_ENABLED = "index.context_aware.enabled";

public static final String SETTING_REMOTE_SEGMENT_STORE_REPOSITORY = "index.remote_store.segment.repository";

Expand Down Expand Up @@ -385,6 +386,13 @@ public Iterator<Setting<?>> settings() {
Property.Dynamic
);

public static final Setting<Boolean> INDEX_CONTEXT_AWARE_ENABLED_SETTING = Setting.boolSetting(
SETTING_CONTEXT_AWARE_ENABLED,
false,
Property.IndexScope,
Property.Dynamic
);

/**
* Used to specify remote store repository to use for this index.
*/
Expand Down
83 changes: 83 additions & 0 deletions server/src/main/java/org/opensearch/common/lucene/Lucene.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,10 @@
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SegmentReader;
import org.apache.lucene.index.StandardDirectoryReader;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.Explanation;
import org.apache.lucene.search.FieldDoc;
Expand Down Expand Up @@ -91,7 +93,9 @@
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.index.analysis.AnalyzerScope;
import org.opensearch.index.analysis.NamedAnalyzer;
import org.opensearch.index.engine.CombinedDeletionPolicy;
import org.opensearch.index.fielddata.IndexFieldData;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.search.sort.SortedWiderNumericSortField;

import java.io.IOException;
Expand All @@ -101,8 +105,10 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* Main lucene class.
Expand Down Expand Up @@ -168,6 +174,83 @@ public static int getNumDocs(SegmentInfos info) {
return numDocs;
}

public static SegmentInfos combineSegmentInfos(List<SegmentInfos> criteriaBasedSegmentInfos,
Set<String> criteriaList, Directory directory) throws IOException {
final SegmentInfos sis = new SegmentInfos(Version.LATEST.major);
final List<SegmentCommitInfo> infos = new ArrayList<>();
int i = 0;
for (String criteria : criteriaList) {
SegmentInfos currentInfos = criteriaBasedSegmentInfos.get(i++);
for (SegmentCommitInfo info : currentInfos) {
String newSegName = criteria + "$" + info.info.name;
infos.add(copySegmentAsIs(info, newSegName, directory));
}

// How to keep user data in sync across multi IndexWriter.
sis.setUserData(currentInfos.getUserData(), false);
}

sis.addAll(infos);
return sis;
}

public static SegmentInfos combineSegmentInfos(Map<String, IndexWriter> criteriaBasedIndexWriters, Directory directory) throws IOException {
final SegmentInfos sis = new SegmentInfos(Version.LATEST.major);
List<SegmentCommitInfo> infos = new ArrayList<>();
final Map<String, String> combinedUserData = new HashMap<>();
for (Map.Entry<String, IndexWriter> entry: criteriaBasedIndexWriters.entrySet()) {
IndexWriter currentWriter = entry.getValue();
try(StandardDirectoryReader r1 = (StandardDirectoryReader) StandardDirectoryReader.open(currentWriter)) {
SegmentInfos currentInfos = r1.getSegmentInfos();
// currentWriter.incRefDeleter(currentInfos);
for (SegmentCommitInfo info : currentInfos) {
String newSegName = entry.getKey() + "$" + info.info.name;
infos.add(copySegmentAsIs(info, newSegName, directory));
}

// How to keep user data in sync across multi IndexWriter.
sis.setUserData(currentInfos.getUserData(), false);
}
}

sis.addAll(infos);
return sis;
}

/** Copies the segment files as-is into the IndexWriter's directory. */
private static SegmentCommitInfo copySegmentAsIs(
SegmentCommitInfo info, String segName, Directory directory) throws IOException {
// Same SI as before but we change directory and name
SegmentInfo newInfo =
new SegmentInfo(
directory,
info.info.getVersion(),
info.info.getMinVersion(),
segName,
info.info.maxDoc(),
info.info.getUseCompoundFile(),
info.info.getHasBlocks(),
info.info.getCodec(),
info.info.getDiagnostics(),
info.info.getId(),
info.info.getAttributes(),
info.info.getIndexSort());
SegmentCommitInfo newInfoPerCommit =
new SegmentCommitInfo(
newInfo,
info.getDelCount(),
info.getSoftDelCount(),
info.getDelGen(),
info.getFieldInfosGen(),
info.getDocValuesGen(),
info.getId());

newInfo.setFiles(info.info.files());
newInfoPerCommit.setFieldInfosFiles(info.getFieldInfosFiles());
newInfoPerCommit.setDocValuesUpdatesFiles(info.getDocValuesUpdatesFiles());
return newInfoPerCommit;
}

/**
* Reads the segments infos from the given commit, failing if it fails to load
*/
Expand Down
Loading

0 comments on commit 16eb81f

Please sign in to comment.