Skip to content

Commit

Permalink
HSEARCH-2945 Update mass indexer monitor configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
marko-bekhta committed Sep 24, 2024
1 parent 4defce7 commit 11f5813
Show file tree
Hide file tree
Showing 21 changed files with 717 additions and 130 deletions.
4 changes: 4 additions & 0 deletions documentation/src/main/asciidoc/migration/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ But there are next changes:
This was done to address the scenarios where the total number of identifiers to load is not known ahead of time.
- Deprecated `org.hibernate.search.mapper.orm.massindexing.MassIndexingFailureHandler`, `org.hibernate.search.mapper.orm.massindexing.MassIndexingMonitor`
interfaces are removed in this version. They have their alternatives in a `org.hibernate.search.mapper.pojo.massindexing` for a while now.
- `org.hibernate.search.mapper.pojo.massindexing.MassIndexingMonitor#addToTotalCount(..)` gets deprecated for removal.
Instead, we are introducing the `org.hibernate.search.mapper.pojo.massindexing.MassIndexingTypeGroupMonitor`
that can be obtained through `org.hibernate.search.mapper.pojo.massindexing.MassIndexingMonitor#typeGroupMonitor(..)`.
This new type group monitor has more flexibility and also allows implementors to skip total count computations if needed.

[[spi]]
== SPI
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,9 @@ The default, built-in monitor logs progress periodically at the `INFO` level,
but a custom monitor can be set by implementing the `MassIndexingMonitor` interface
and passing an instance using the `monitor` method.

The built-in monitor's behaviour can be customized through `DefaultMassIndexingMonitor` builder,
e.g. `indexer.monitor( DefaultMassIndexingMonitor.builder().countOnStart( false ).build() ) )`

Implementations of `MassIndexingMonitor` must be thread-safe.

|`failureHandler(MassIndexingFailureHandler)`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ public String getAuthor() {
}
}

@SuppressWarnings("removal")
public static class StaticCountersMonitor implements MassIndexingMonitor {

public static StaticCounters.Key ADDED = StaticCounters.createKey();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright Red Hat Inc. and Hibernate Authors
*/
package org.hibernate.search.integrationtest.mapper.pojo.massindexing;

import static org.assertj.core.api.Fail.fail;

import java.lang.invoke.MethodHandles;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

import org.hibernate.search.engine.backend.work.execution.DocumentCommitStrategy;
import org.hibernate.search.engine.backend.work.execution.DocumentRefreshStrategy;
import org.hibernate.search.integrationtest.mapper.pojo.testsupport.loading.PersistenceTypeKey;
import org.hibernate.search.integrationtest.mapper.pojo.testsupport.loading.StubEntityLoadingBinder;
import org.hibernate.search.integrationtest.mapper.pojo.testsupport.loading.StubLoadingContext;
import org.hibernate.search.mapper.pojo.loading.mapping.annotation.EntityLoadingBinderRef;
import org.hibernate.search.mapper.pojo.mapping.definition.annotation.DocumentId;
import org.hibernate.search.mapper.pojo.mapping.definition.annotation.GenericField;
import org.hibernate.search.mapper.pojo.mapping.definition.annotation.Indexed;
import org.hibernate.search.mapper.pojo.mapping.definition.annotation.SearchEntity;
import org.hibernate.search.mapper.pojo.massindexing.DefaultMassIndexingMonitor;
import org.hibernate.search.mapper.pojo.standalone.mapping.SearchMapping;
import org.hibernate.search.mapper.pojo.standalone.massindexing.MassIndexer;
import org.hibernate.search.mapper.pojo.standalone.session.SearchSession;
import org.hibernate.search.util.impl.integrationtest.common.extension.BackendMock;
import org.hibernate.search.util.impl.integrationtest.mapper.pojo.standalone.StandalonePojoMappingSetupHelper;
import org.hibernate.search.util.impl.test.extension.ExpectedLog4jLog;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import org.apache.logging.log4j.Level;

class MassIndexingDefaultMonitorIT {

private static final int COUNT = 100;
@RegisterExtension
public final BackendMock backendMock = BackendMock.create();

@RegisterExtension
public final StandalonePojoMappingSetupHelper setupHelper =
StandalonePojoMappingSetupHelper.withBackendMock( MethodHandles.lookup(), backendMock );

@RegisterExtension
public ExpectedLog4jLog logged = ExpectedLog4jLog.create();

private final StubLoadingContext loadingContext = new StubLoadingContext();

@ValueSource(booleans = { true, false })
@ParameterizedTest
void countOnBeforeType(boolean doCounts) {
actualTest( () -> {
logged.expectEvent( Level.INFO, "Mass indexing complete in ", ". Indexed 100/100 entities" );
if ( doCounts ) {
logged.expectEvent( Level.INFO, "Mass indexing is going to index 100 entities" ).once();
}
else {
logged.expectEvent( Level.INFO, "Mass indexing is going to index 100 entities" ).never();
}
}, indexer -> indexer.monitor( DefaultMassIndexingMonitor.builder().countOnBeforeType( doCounts ).build() ) );
}

@ValueSource(booleans = { true, false })
@ParameterizedTest
void countOnStart(boolean doCounts) {
actualTest( () -> {
logged.expectEvent( Level.INFO, "Mass indexing complete in ", ". Indexed 100/100 entities" );
logged.expectEvent( Level.INFO, "Mass indexing is going to index 100 entities" ).once();
if ( doCounts ) {
logged.expectEvent( Level.INFO,
"Mass indexing is going to index approx. 100 entities ([ Book ]). Actual number may change once the indexing starts." )
.once();
}
else {
logged.expectEvent( Level.INFO,
"Mass indexing is going to index approx. 100 entities ([ Book ]). Actual number may change once the indexing starts." )
.never();
}
}, indexer -> indexer.monitor( DefaultMassIndexingMonitor.builder().countOnStart( doCounts ).build() ) );
}

@Test
void noCountsAtAll() {
actualTest( () -> {
logged.expectEvent( Level.INFO, "Mass indexing complete in ", ". Indexed 100/100 entities" ).once();
logged.expectEvent( Level.INFO, "Mass indexing is going to index 100 entities" ).never();
logged.expectEvent( Level.INFO,
"Mass indexing is going to index approx. 100 entities ([ Book ]). Actual number may change once the indexing starts." )
.never();
}, indexer -> indexer
.monitor( DefaultMassIndexingMonitor.builder().countOnStart( false ).countOnBeforeType( false ).build() ) );
}

private void actualTest(Runnable expectedLogs, Consumer<MassIndexer> massIndexerConfiguration) {
backendMock.expectAnySchema( Book.NAME );

SearchMapping mapping = setupHelper.start()
.expectCustomBeans()
.setup( Book.class );

backendMock.verifyExpectationsMet();

initData();

expectedLogs.run();

try ( SearchSession searchSession = mapping.createSession() ) {
MassIndexer indexer = searchSession.massIndexer()
// Simulate passing information to connect to a DB, ...
.context( StubLoadingContext.class, loadingContext );

CompletableFuture<?> indexingFuture = new CompletableFuture<>();
indexingFuture.completeExceptionally( new SimulatedFailure( "Indexing error" ) );

// add operations on indexes can follow any random order,
// since they are executed by different threads
BackendMock.DocumentWorkCallListContext expectWorks = backendMock.expectWorks(
Book.NAME, DocumentCommitStrategy.NONE, DocumentRefreshStrategy.NONE
);
for ( int i = 0; i < COUNT; i++ ) {
final String id = Integer.toString( i );
expectWorks
.add( id, b -> b
.field( "title", "TITLE_" + id )
.field( "author", "AUTHOR_" + id )
);
}

// purgeAtStart and mergeSegmentsAfterPurge are enabled by default,
// so we expect 1 purge, 1 mergeSegments and 1 flush calls in this order:
backendMock.expectIndexScaleWorks( Book.NAME, searchSession.tenantIdentifierValue() )
.purge()
.mergeSegments()
.flush()
.refresh();

try {
massIndexerConfiguration.accept( indexer );
indexer.startAndWait();
}
catch (InterruptedException e) {
fail( "Unexpected InterruptedException: " + e.getMessage() );
}
}

backendMock.verifyExpectationsMet();
}

private void initData() {
for ( int i = 0; i < COUNT; i++ ) {
persist( new Book( i, "TITLE_" + i, "AUTHOR_" + i ) );
}
}

private void persist(Book book) {
loadingContext.persistenceMap( Book.PERSISTENCE_KEY ).put( book.id, book );
}

@SearchEntity(name = Book.NAME,
loadingBinder = @EntityLoadingBinderRef(type = StubEntityLoadingBinder.class))
@Indexed(index = Book.NAME)
public static class Book {

public static final String NAME = "Book";
public static final PersistenceTypeKey<Book, Integer> PERSISTENCE_KEY =
new PersistenceTypeKey<>( Book.class, Integer.class );

@DocumentId
private Integer id;

@GenericField
private String title;

@GenericField
private String author;

public Book() {
}

public Book(Integer id, String title, String author) {
this.id = id;
this.title = title;
this.author = author;
}

public Integer getId() {
return id;
}

public String getTitle() {
return title;
}

public String getAuthor() {
return author;
}
}

private static class SimulatedFailure extends RuntimeException {
SimulatedFailure(String message) {
super( message );
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import org.hibernate.search.mapper.pojo.mapping.definition.annotation.Indexed;
import org.hibernate.search.mapper.pojo.mapping.definition.annotation.SearchEntity;
import org.hibernate.search.mapper.pojo.massindexing.MassIndexingMonitor;
import org.hibernate.search.mapper.pojo.massindexing.MassIndexingTypeGroupMonitor;
import org.hibernate.search.mapper.pojo.massindexing.MassIndexingTypeGroupMonitorContext;
import org.hibernate.search.mapper.pojo.massindexing.MassIndexingTypeGroupMonitorCreateContext;
import org.hibernate.search.mapper.pojo.standalone.mapping.SearchMapping;
import org.hibernate.search.mapper.pojo.standalone.massindexing.MassIndexer;
import org.hibernate.search.mapper.pojo.standalone.session.SearchSession;
Expand Down Expand Up @@ -128,6 +131,66 @@ void simple() {
assertThat( staticCounters.get( StaticCountersMonitor.INDEXING_COMPLETED ) ).isEqualTo( 1 );
}

@Test
void skipTotalCount() {
SearchMapping mapping = setup( null );

try ( SearchSession searchSession = mapping.createSession() ) {
MassIndexer indexer = searchSession.massIndexer()
// Simulate passing information to connect to a DB, ...
.context( StubLoadingContext.class, loadingContext );

CompletableFuture<?> indexingFuture = new CompletableFuture<>();
indexingFuture.completeExceptionally( new SimulatedFailure( "Indexing error" ) );

// add operations on indexes can follow any random order,
// since they are executed by different threads
backendMock.expectWorks(
Book.NAME, DocumentCommitStrategy.NONE, DocumentRefreshStrategy.NONE
)
.add( "1", b -> b
.field( "title", TITLE_1 )
.field( "author", AUTHOR_1 )
)
.add( "3", b -> b
.field( "title", TITLE_3 )
.field( "author", AUTHOR_3 )
)
.createAndExecuteFollowingWorks( indexingFuture )
.add( "2", b -> b
.field( "title", TITLE_2 )
.field( "author", AUTHOR_2 )
);

// purgeAtStart and mergeSegmentsAfterPurge are enabled by default,
// so we expect 1 purge, 1 mergeSegments and 1 flush calls in this order:
backendMock.expectIndexScaleWorks( Book.NAME, searchSession.tenantIdentifierValue() )
.purge()
.mergeSegments()
.flush()
.refresh();

try {
indexer.monitor( new StaticCountersMonitor( false ) )
.startAndWait();
}
catch (SearchException ignored) {
// Expected, but not relevant to this test
}
catch (InterruptedException e) {
fail( "Unexpected InterruptedException: " + e.getMessage() );
}
}

backendMock.verifyExpectationsMet();

assertThat( staticCounters.get( StaticCountersMonitor.LOADED ) ).isEqualTo( 3 );
assertThat( staticCounters.get( StaticCountersMonitor.BUILT ) ).isEqualTo( 3 );
assertThat( staticCounters.get( StaticCountersMonitor.ADDED ) ).isEqualTo( 2 );
assertThat( staticCounters.get( StaticCountersMonitor.TOTAL ) ).isEqualTo( 0 );
assertThat( staticCounters.get( StaticCountersMonitor.INDEXING_COMPLETED ) ).isEqualTo( 1 );
}

private void initData() {
persist( new Book( 1, TITLE_1, AUTHOR_1 ) );
persist( new Book( 2, TITLE_2, AUTHOR_2 ) );
Expand Down Expand Up @@ -178,6 +241,7 @@ public String getAuthor() {
}
}

@SuppressWarnings("removal")
public static class StaticCountersMonitor implements MassIndexingMonitor {

public static StaticCounters.Key ADDED = StaticCounters.createKey();
Expand All @@ -186,6 +250,42 @@ public static class StaticCountersMonitor implements MassIndexingMonitor {
public static StaticCounters.Key TOTAL = StaticCounters.createKey();
public static StaticCounters.Key INDEXING_COMPLETED = StaticCounters.createKey();


private final boolean requiresTotalCount;

public StaticCountersMonitor() {
this( true );
}

public StaticCountersMonitor(boolean requiresTotalCount) {
this.requiresTotalCount = requiresTotalCount;
}

@Override
public MassIndexingTypeGroupMonitor typeGroupMonitor(MassIndexingTypeGroupMonitorCreateContext context) {
if ( requiresTotalCount ) {
return MassIndexingMonitor.super.typeGroupMonitor( context );
}
else {
return new MassIndexingTypeGroupMonitor() {
@Override
public void documentsIndexed(long increment) {
// do nothing
}

@Override
public void indexingStarted(MassIndexingTypeGroupMonitorContext context) {
// do nothing
}

@Override
public void indexingCompleted(MassIndexingTypeGroupMonitorContext context) {
// do nothing
}
};
}
}

@Override
public void documentsAdded(long increment) {
StaticCounters.get().add( ADDED, (int) increment );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,8 @@ public interface MassIndexer {
* <p>
* With fail-fast option enabled, the mass indexer will request cancelling all internal mass-indexing processes
* right after the first error is reported to the {@link MassIndexingFailureHandler}.
*
* <p>
* Defaults to {@code false}.
* @param failFast Whether to enabled fail fast option for this mass indexer.
*
* @return {@code this} for method chaining
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1041,4 +1041,9 @@ void indexingProgressWithRemainingTime(float estimatePercentileComplete, long do
@LogMessage(level = INFO)
@Message(id = ID_OFFSET + 168, value = "Mass indexing complete in %3$s. Indexed %1$d/%2$d entities.")
void indexingEntitiesCompleted(long indexed, long total, Duration indexingTime);

@LogMessage(level = INFO)
@Message(id = ID_OFFSET + 169,
value = "Mass indexing is going to index approx. %1$d entities (%2$s). Actual number may change once the indexing starts.")
void indexingEntitiesApprox(long count, String types);
}
Loading

0 comments on commit 11f5813

Please sign in to comment.