From 11f58131fa2e9bb3ce587d8f5af7c752e05a8727 Mon Sep 17 00:00:00 2001 From: marko-bekhta Date: Tue, 10 Sep 2024 13:42:21 +0200 Subject: [PATCH] HSEARCH-2945 Update mass indexer monitor configuration --- .../src/main/asciidoc/migration/index.adoc | 4 + .../reference/_indexing-massindexer.adoc | 3 + .../massindexing/MassIndexingMonitorIT.java | 1 + .../MassIndexingDefaultMonitorIT.java | 208 ++++++++++++++++++ .../massindexing/MassIndexingMonitorIT.java | 100 +++++++++ .../mapper/orm/massindexing/MassIndexer.java | 3 +- .../search/mapper/pojo/logging/impl/Log.java | 5 + .../DefaultMassIndexingMonitor.java | 76 +++++++ .../massindexing/MassIndexingMonitor.java | 30 ++- .../MassIndexingTypeGroupMonitor.java | 17 +- .../MassIndexingTypeGroupMonitorContext.java | 18 +- ...IndexingTypeGroupMonitorCreateContext.java | 38 ++++ ...elegatingMassIndexingTypeGroupMonitor.java | 43 ++++ .../impl/MassIndexingTypeGroupContext.java | 113 ++++++++++ .../NoOpMassIndexingTypeGroupMonitor.java | 34 --- ...ojoMassIndexingBatchIndexingWorkspace.java | 48 +--- ...dexingEntityIdentifierLoadingRunnable.java | 24 +- ...PojoMassIndexingEntityLoadingRunnable.java | 2 +- .../impl/PojoMassIndexingLoggingMonitor.java | 70 ++++-- .../impl/PojoMassIndexingNotifier.java | 7 +- .../massindexing/spi/PojoMassIndexer.java | 3 +- 21 files changed, 717 insertions(+), 130 deletions(-) create mode 100644 integrationtest/mapper/pojo-base/src/test/java/org/hibernate/search/integrationtest/mapper/pojo/massindexing/MassIndexingDefaultMonitorIT.java create mode 100644 mapper/pojo-base/src/main/java/org/hibernate/search/mapper/pojo/massindexing/DefaultMassIndexingMonitor.java create mode 100644 mapper/pojo-base/src/main/java/org/hibernate/search/mapper/pojo/massindexing/MassIndexingTypeGroupMonitorCreateContext.java create mode 100644 mapper/pojo-base/src/main/java/org/hibernate/search/mapper/pojo/massindexing/impl/LegacyDelegatingMassIndexingTypeGroupMonitor.java create mode 100644 mapper/pojo-base/src/main/java/org/hibernate/search/mapper/pojo/massindexing/impl/MassIndexingTypeGroupContext.java delete mode 100644 mapper/pojo-base/src/main/java/org/hibernate/search/mapper/pojo/massindexing/impl/NoOpMassIndexingTypeGroupMonitor.java diff --git a/documentation/src/main/asciidoc/migration/index.adoc b/documentation/src/main/asciidoc/migration/index.adoc index 7fce785f1a9..3d22169f98c 100644 --- a/documentation/src/main/asciidoc/migration/index.adoc +++ b/documentation/src/main/asciidoc/migration/index.adoc @@ -95,6 +95,10 @@ But there are next changes: This was done to address the scenarios where the total number of identifiers to load is not known ahead of time. - Deprecated `org.hibernate.search.mapper.orm.massindexing.MassIndexingFailureHandler`, `org.hibernate.search.mapper.orm.massindexing.MassIndexingMonitor` interfaces are removed in this version. They have their alternatives in a `org.hibernate.search.mapper.pojo.massindexing` for a while now. +- `org.hibernate.search.mapper.pojo.massindexing.MassIndexingMonitor#addToTotalCount(..)` gets deprecated for removal. +Instead, we are introducing the `org.hibernate.search.mapper.pojo.massindexing.MassIndexingTypeGroupMonitor` +that can be obtained through `org.hibernate.search.mapper.pojo.massindexing.MassIndexingMonitor#typeGroupMonitor(..)`. +This new type group monitor has more flexibility and also allows implementors to skip total count computations if needed. [[spi]] == SPI diff --git a/documentation/src/main/asciidoc/public/reference/_indexing-massindexer.adoc b/documentation/src/main/asciidoc/public/reference/_indexing-massindexer.adoc index b984a949c75..636e916532d 100644 --- a/documentation/src/main/asciidoc/public/reference/_indexing-massindexer.adoc +++ b/documentation/src/main/asciidoc/public/reference/_indexing-massindexer.adoc @@ -318,6 +318,9 @@ The default, built-in monitor logs progress periodically at the `INFO` level, but a custom monitor can be set by implementing the `MassIndexingMonitor` interface and passing an instance using the `monitor` method. +The built-in monitor's behaviour can be customized through `DefaultMassIndexingMonitor` builder, +e.g. `indexer.monitor( DefaultMassIndexingMonitor.builder().countOnStart( false ).build() ) )` + Implementations of `MassIndexingMonitor` must be thread-safe. |`failureHandler(MassIndexingFailureHandler)` diff --git a/integrationtest/mapper/orm/src/test/java/org/hibernate/search/integrationtest/mapper/orm/massindexing/MassIndexingMonitorIT.java b/integrationtest/mapper/orm/src/test/java/org/hibernate/search/integrationtest/mapper/orm/massindexing/MassIndexingMonitorIT.java index e8724f06185..df96b684264 100644 --- a/integrationtest/mapper/orm/src/test/java/org/hibernate/search/integrationtest/mapper/orm/massindexing/MassIndexingMonitorIT.java +++ b/integrationtest/mapper/orm/src/test/java/org/hibernate/search/integrationtest/mapper/orm/massindexing/MassIndexingMonitorIT.java @@ -167,6 +167,7 @@ public String getAuthor() { } } + @SuppressWarnings("removal") public static class StaticCountersMonitor implements MassIndexingMonitor { public static StaticCounters.Key ADDED = StaticCounters.createKey(); diff --git a/integrationtest/mapper/pojo-base/src/test/java/org/hibernate/search/integrationtest/mapper/pojo/massindexing/MassIndexingDefaultMonitorIT.java b/integrationtest/mapper/pojo-base/src/test/java/org/hibernate/search/integrationtest/mapper/pojo/massindexing/MassIndexingDefaultMonitorIT.java new file mode 100644 index 00000000000..3775fd82d74 --- /dev/null +++ b/integrationtest/mapper/pojo-base/src/test/java/org/hibernate/search/integrationtest/mapper/pojo/massindexing/MassIndexingDefaultMonitorIT.java @@ -0,0 +1,208 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright Red Hat Inc. and Hibernate Authors + */ +package org.hibernate.search.integrationtest.mapper.pojo.massindexing; + +import static org.assertj.core.api.Fail.fail; + +import java.lang.invoke.MethodHandles; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; + +import org.hibernate.search.engine.backend.work.execution.DocumentCommitStrategy; +import org.hibernate.search.engine.backend.work.execution.DocumentRefreshStrategy; +import org.hibernate.search.integrationtest.mapper.pojo.testsupport.loading.PersistenceTypeKey; +import org.hibernate.search.integrationtest.mapper.pojo.testsupport.loading.StubEntityLoadingBinder; +import org.hibernate.search.integrationtest.mapper.pojo.testsupport.loading.StubLoadingContext; +import org.hibernate.search.mapper.pojo.loading.mapping.annotation.EntityLoadingBinderRef; +import org.hibernate.search.mapper.pojo.mapping.definition.annotation.DocumentId; +import org.hibernate.search.mapper.pojo.mapping.definition.annotation.GenericField; +import org.hibernate.search.mapper.pojo.mapping.definition.annotation.Indexed; +import org.hibernate.search.mapper.pojo.mapping.definition.annotation.SearchEntity; +import org.hibernate.search.mapper.pojo.massindexing.DefaultMassIndexingMonitor; +import org.hibernate.search.mapper.pojo.standalone.mapping.SearchMapping; +import org.hibernate.search.mapper.pojo.standalone.massindexing.MassIndexer; +import org.hibernate.search.mapper.pojo.standalone.session.SearchSession; +import org.hibernate.search.util.impl.integrationtest.common.extension.BackendMock; +import org.hibernate.search.util.impl.integrationtest.mapper.pojo.standalone.StandalonePojoMappingSetupHelper; +import org.hibernate.search.util.impl.test.extension.ExpectedLog4jLog; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import org.apache.logging.log4j.Level; + +class MassIndexingDefaultMonitorIT { + + private static final int COUNT = 100; + @RegisterExtension + public final BackendMock backendMock = BackendMock.create(); + + @RegisterExtension + public final StandalonePojoMappingSetupHelper setupHelper = + StandalonePojoMappingSetupHelper.withBackendMock( MethodHandles.lookup(), backendMock ); + + @RegisterExtension + public ExpectedLog4jLog logged = ExpectedLog4jLog.create(); + + private final StubLoadingContext loadingContext = new StubLoadingContext(); + + @ValueSource(booleans = { true, false }) + @ParameterizedTest + void countOnBeforeType(boolean doCounts) { + actualTest( () -> { + logged.expectEvent( Level.INFO, "Mass indexing complete in ", ". Indexed 100/100 entities" ); + if ( doCounts ) { + logged.expectEvent( Level.INFO, "Mass indexing is going to index 100 entities" ).once(); + } + else { + logged.expectEvent( Level.INFO, "Mass indexing is going to index 100 entities" ).never(); + } + }, indexer -> indexer.monitor( DefaultMassIndexingMonitor.builder().countOnBeforeType( doCounts ).build() ) ); + } + + @ValueSource(booleans = { true, false }) + @ParameterizedTest + void countOnStart(boolean doCounts) { + actualTest( () -> { + logged.expectEvent( Level.INFO, "Mass indexing complete in ", ". Indexed 100/100 entities" ); + logged.expectEvent( Level.INFO, "Mass indexing is going to index 100 entities" ).once(); + if ( doCounts ) { + logged.expectEvent( Level.INFO, + "Mass indexing is going to index approx. 100 entities ([ Book ]). Actual number may change once the indexing starts." ) + .once(); + } + else { + logged.expectEvent( Level.INFO, + "Mass indexing is going to index approx. 100 entities ([ Book ]). Actual number may change once the indexing starts." ) + .never(); + } + }, indexer -> indexer.monitor( DefaultMassIndexingMonitor.builder().countOnStart( doCounts ).build() ) ); + } + + @Test + void noCountsAtAll() { + actualTest( () -> { + logged.expectEvent( Level.INFO, "Mass indexing complete in ", ". Indexed 100/100 entities" ).once(); + logged.expectEvent( Level.INFO, "Mass indexing is going to index 100 entities" ).never(); + logged.expectEvent( Level.INFO, + "Mass indexing is going to index approx. 100 entities ([ Book ]). Actual number may change once the indexing starts." ) + .never(); + }, indexer -> indexer + .monitor( DefaultMassIndexingMonitor.builder().countOnStart( false ).countOnBeforeType( false ).build() ) ); + } + + private void actualTest(Runnable expectedLogs, Consumer massIndexerConfiguration) { + backendMock.expectAnySchema( Book.NAME ); + + SearchMapping mapping = setupHelper.start() + .expectCustomBeans() + .setup( Book.class ); + + backendMock.verifyExpectationsMet(); + + initData(); + + expectedLogs.run(); + + try ( SearchSession searchSession = mapping.createSession() ) { + MassIndexer indexer = searchSession.massIndexer() + // Simulate passing information to connect to a DB, ... + .context( StubLoadingContext.class, loadingContext ); + + CompletableFuture indexingFuture = new CompletableFuture<>(); + indexingFuture.completeExceptionally( new SimulatedFailure( "Indexing error" ) ); + + // add operations on indexes can follow any random order, + // since they are executed by different threads + BackendMock.DocumentWorkCallListContext expectWorks = backendMock.expectWorks( + Book.NAME, DocumentCommitStrategy.NONE, DocumentRefreshStrategy.NONE + ); + for ( int i = 0; i < COUNT; i++ ) { + final String id = Integer.toString( i ); + expectWorks + .add( id, b -> b + .field( "title", "TITLE_" + id ) + .field( "author", "AUTHOR_" + id ) + ); + } + + // purgeAtStart and mergeSegmentsAfterPurge are enabled by default, + // so we expect 1 purge, 1 mergeSegments and 1 flush calls in this order: + backendMock.expectIndexScaleWorks( Book.NAME, searchSession.tenantIdentifierValue() ) + .purge() + .mergeSegments() + .flush() + .refresh(); + + try { + massIndexerConfiguration.accept( indexer ); + indexer.startAndWait(); + } + catch (InterruptedException e) { + fail( "Unexpected InterruptedException: " + e.getMessage() ); + } + } + + backendMock.verifyExpectationsMet(); + } + + private void initData() { + for ( int i = 0; i < COUNT; i++ ) { + persist( new Book( i, "TITLE_" + i, "AUTHOR_" + i ) ); + } + } + + private void persist(Book book) { + loadingContext.persistenceMap( Book.PERSISTENCE_KEY ).put( book.id, book ); + } + + @SearchEntity(name = Book.NAME, + loadingBinder = @EntityLoadingBinderRef(type = StubEntityLoadingBinder.class)) + @Indexed(index = Book.NAME) + public static class Book { + + public static final String NAME = "Book"; + public static final PersistenceTypeKey PERSISTENCE_KEY = + new PersistenceTypeKey<>( Book.class, Integer.class ); + + @DocumentId + private Integer id; + + @GenericField + private String title; + + @GenericField + private String author; + + public Book() { + } + + public Book(Integer id, String title, String author) { + this.id = id; + this.title = title; + this.author = author; + } + + public Integer getId() { + return id; + } + + public String getTitle() { + return title; + } + + public String getAuthor() { + return author; + } + } + + private static class SimulatedFailure extends RuntimeException { + SimulatedFailure(String message) { + super( message ); + } + } +} diff --git a/integrationtest/mapper/pojo-base/src/test/java/org/hibernate/search/integrationtest/mapper/pojo/massindexing/MassIndexingMonitorIT.java b/integrationtest/mapper/pojo-base/src/test/java/org/hibernate/search/integrationtest/mapper/pojo/massindexing/MassIndexingMonitorIT.java index 0552a6a56a7..0639adb3881 100644 --- a/integrationtest/mapper/pojo-base/src/test/java/org/hibernate/search/integrationtest/mapper/pojo/massindexing/MassIndexingMonitorIT.java +++ b/integrationtest/mapper/pojo-base/src/test/java/org/hibernate/search/integrationtest/mapper/pojo/massindexing/MassIndexingMonitorIT.java @@ -22,6 +22,9 @@ import org.hibernate.search.mapper.pojo.mapping.definition.annotation.Indexed; import org.hibernate.search.mapper.pojo.mapping.definition.annotation.SearchEntity; import org.hibernate.search.mapper.pojo.massindexing.MassIndexingMonitor; +import org.hibernate.search.mapper.pojo.massindexing.MassIndexingTypeGroupMonitor; +import org.hibernate.search.mapper.pojo.massindexing.MassIndexingTypeGroupMonitorContext; +import org.hibernate.search.mapper.pojo.massindexing.MassIndexingTypeGroupMonitorCreateContext; import org.hibernate.search.mapper.pojo.standalone.mapping.SearchMapping; import org.hibernate.search.mapper.pojo.standalone.massindexing.MassIndexer; import org.hibernate.search.mapper.pojo.standalone.session.SearchSession; @@ -128,6 +131,66 @@ void simple() { assertThat( staticCounters.get( StaticCountersMonitor.INDEXING_COMPLETED ) ).isEqualTo( 1 ); } + @Test + void skipTotalCount() { + SearchMapping mapping = setup( null ); + + try ( SearchSession searchSession = mapping.createSession() ) { + MassIndexer indexer = searchSession.massIndexer() + // Simulate passing information to connect to a DB, ... + .context( StubLoadingContext.class, loadingContext ); + + CompletableFuture indexingFuture = new CompletableFuture<>(); + indexingFuture.completeExceptionally( new SimulatedFailure( "Indexing error" ) ); + + // add operations on indexes can follow any random order, + // since they are executed by different threads + backendMock.expectWorks( + Book.NAME, DocumentCommitStrategy.NONE, DocumentRefreshStrategy.NONE + ) + .add( "1", b -> b + .field( "title", TITLE_1 ) + .field( "author", AUTHOR_1 ) + ) + .add( "3", b -> b + .field( "title", TITLE_3 ) + .field( "author", AUTHOR_3 ) + ) + .createAndExecuteFollowingWorks( indexingFuture ) + .add( "2", b -> b + .field( "title", TITLE_2 ) + .field( "author", AUTHOR_2 ) + ); + + // purgeAtStart and mergeSegmentsAfterPurge are enabled by default, + // so we expect 1 purge, 1 mergeSegments and 1 flush calls in this order: + backendMock.expectIndexScaleWorks( Book.NAME, searchSession.tenantIdentifierValue() ) + .purge() + .mergeSegments() + .flush() + .refresh(); + + try { + indexer.monitor( new StaticCountersMonitor( false ) ) + .startAndWait(); + } + catch (SearchException ignored) { + // Expected, but not relevant to this test + } + catch (InterruptedException e) { + fail( "Unexpected InterruptedException: " + e.getMessage() ); + } + } + + backendMock.verifyExpectationsMet(); + + assertThat( staticCounters.get( StaticCountersMonitor.LOADED ) ).isEqualTo( 3 ); + assertThat( staticCounters.get( StaticCountersMonitor.BUILT ) ).isEqualTo( 3 ); + assertThat( staticCounters.get( StaticCountersMonitor.ADDED ) ).isEqualTo( 2 ); + assertThat( staticCounters.get( StaticCountersMonitor.TOTAL ) ).isEqualTo( 0 ); + assertThat( staticCounters.get( StaticCountersMonitor.INDEXING_COMPLETED ) ).isEqualTo( 1 ); + } + private void initData() { persist( new Book( 1, TITLE_1, AUTHOR_1 ) ); persist( new Book( 2, TITLE_2, AUTHOR_2 ) ); @@ -178,6 +241,7 @@ public String getAuthor() { } } + @SuppressWarnings("removal") public static class StaticCountersMonitor implements MassIndexingMonitor { public static StaticCounters.Key ADDED = StaticCounters.createKey(); @@ -186,6 +250,42 @@ public static class StaticCountersMonitor implements MassIndexingMonitor { public static StaticCounters.Key TOTAL = StaticCounters.createKey(); public static StaticCounters.Key INDEXING_COMPLETED = StaticCounters.createKey(); + + private final boolean requiresTotalCount; + + public StaticCountersMonitor() { + this( true ); + } + + public StaticCountersMonitor(boolean requiresTotalCount) { + this.requiresTotalCount = requiresTotalCount; + } + + @Override + public MassIndexingTypeGroupMonitor typeGroupMonitor(MassIndexingTypeGroupMonitorCreateContext context) { + if ( requiresTotalCount ) { + return MassIndexingMonitor.super.typeGroupMonitor( context ); + } + else { + return new MassIndexingTypeGroupMonitor() { + @Override + public void documentsIndexed(long increment) { + // do nothing + } + + @Override + public void indexingStarted(MassIndexingTypeGroupMonitorContext context) { + // do nothing + } + + @Override + public void indexingCompleted(MassIndexingTypeGroupMonitorContext context) { + // do nothing + } + }; + } + } + @Override public void documentsAdded(long increment) { StaticCounters.get().add( ADDED, (int) increment ); diff --git a/mapper/orm/src/main/java/org/hibernate/search/mapper/orm/massindexing/MassIndexer.java b/mapper/orm/src/main/java/org/hibernate/search/mapper/orm/massindexing/MassIndexer.java index ac9f952604d..c479f5ef113 100644 --- a/mapper/orm/src/main/java/org/hibernate/search/mapper/orm/massindexing/MassIndexer.java +++ b/mapper/orm/src/main/java/org/hibernate/search/mapper/orm/massindexing/MassIndexer.java @@ -228,7 +228,8 @@ public interface MassIndexer { *

* With fail-fast option enabled, the mass indexer will request cancelling all internal mass-indexing processes * right after the first error is reported to the {@link MassIndexingFailureHandler}. - * + *

+ * Defaults to {@code false}. * @param failFast Whether to enabled fail fast option for this mass indexer. * * @return {@code this} for method chaining diff --git a/mapper/pojo-base/src/main/java/org/hibernate/search/mapper/pojo/logging/impl/Log.java b/mapper/pojo-base/src/main/java/org/hibernate/search/mapper/pojo/logging/impl/Log.java index be64e4bc50c..52bca587bd9 100644 --- a/mapper/pojo-base/src/main/java/org/hibernate/search/mapper/pojo/logging/impl/Log.java +++ b/mapper/pojo-base/src/main/java/org/hibernate/search/mapper/pojo/logging/impl/Log.java @@ -1041,4 +1041,9 @@ void indexingProgressWithRemainingTime(float estimatePercentileComplete, long do @LogMessage(level = INFO) @Message(id = ID_OFFSET + 168, value = "Mass indexing complete in %3$s. Indexed %1$d/%2$d entities.") void indexingEntitiesCompleted(long indexed, long total, Duration indexingTime); + + @LogMessage(level = INFO) + @Message(id = ID_OFFSET + 169, + value = "Mass indexing is going to index approx. %1$d entities (%2$s). Actual number may change once the indexing starts.") + void indexingEntitiesApprox(long count, String types); } diff --git a/mapper/pojo-base/src/main/java/org/hibernate/search/mapper/pojo/massindexing/DefaultMassIndexingMonitor.java b/mapper/pojo-base/src/main/java/org/hibernate/search/mapper/pojo/massindexing/DefaultMassIndexingMonitor.java new file mode 100644 index 00000000000..f05d71d8f33 --- /dev/null +++ b/mapper/pojo-base/src/main/java/org/hibernate/search/mapper/pojo/massindexing/DefaultMassIndexingMonitor.java @@ -0,0 +1,76 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright Red Hat Inc. and Hibernate Authors + */ +package org.hibernate.search.mapper.pojo.massindexing; + +import org.hibernate.search.mapper.pojo.massindexing.impl.PojoMassIndexingLoggingMonitor; +import org.hibernate.search.util.common.annotation.Incubating; + +/** + * A simple builder class that allows configuring the built-in logging mass indexer monitor. + *

+ * To customize the monitor pass it to the mass indexer as: + *

{@code
+ * massIndexer.monitor( DefaultMassIndexingMonitor.builder()
+ *     .countOnStart( true )
+ *     .countOnBeforeType( false )
+ *     .build()
+ * );
+ * }
+ * 
+ */ +@Incubating +public final class DefaultMassIndexingMonitor { + + public static DefaultMassIndexingMonitor builder() { + return new DefaultMassIndexingMonitor(); + } + + private boolean countOnStart = false; + private boolean countOnBeforeType = true; + + private DefaultMassIndexingMonitor() { + } + + public MassIndexingMonitor build() { + return new PojoMassIndexingLoggingMonitor( countOnStart, countOnBeforeType ); + } + + /** + * Allows specifying whether the mass indexer should try obtaining the total number of all entities to index before the indexing even starts. + *

+ * This means that the default monitor will make an attempt to get the counts in the main thread and only then start the indexing. + * Then, at index time, the mass indexer may attempt to recalculate the total for a currently indexed type (see {@link #countOnBeforeType(boolean)}. + *

+ * Defaults to {@code false}. + * @param countOnStart If {@code true}, the mass indexer will try determining the total number of all entities to index + * before the actual indexing starts. + * + * @return {@code this} for method chaining + */ + public DefaultMassIndexingMonitor countOnStart(boolean countOnStart) { + this.countOnStart = countOnStart; + return this; + } + + /** + * Allows specifying whether to try determining the total number of entities of the particular type to index + * and logging that information. + *

+ * This count attempt happens right before fetching the IDs to index, and should provide the + * number of entities to fetch. + *

+ * It may be helpful to skip the counting of entities and start the ID fetching right away to save some time. + *

+ * Defaults to {@code true}. + * @param countOnBeforeType If {@code true}, the mass indexer will try determining the total number of entities, + * otherwise the mass indexer will not try obtaining the total count. + * + * @return {@code this} for method chaining + */ + public DefaultMassIndexingMonitor countOnBeforeType(boolean countOnBeforeType) { + this.countOnBeforeType = countOnBeforeType; + return this; + } +} diff --git a/mapper/pojo-base/src/main/java/org/hibernate/search/mapper/pojo/massindexing/MassIndexingMonitor.java b/mapper/pojo-base/src/main/java/org/hibernate/search/mapper/pojo/massindexing/MassIndexingMonitor.java index 313ad1ebebd..b7777fe8c64 100644 --- a/mapper/pojo-base/src/main/java/org/hibernate/search/mapper/pojo/massindexing/MassIndexingMonitor.java +++ b/mapper/pojo-base/src/main/java/org/hibernate/search/mapper/pojo/massindexing/MassIndexingMonitor.java @@ -4,7 +4,7 @@ */ package org.hibernate.search.mapper.pojo.massindexing; -import org.hibernate.search.mapper.pojo.massindexing.impl.NoOpMassIndexingTypeGroupMonitor; +import org.hibernate.search.mapper.pojo.massindexing.impl.LegacyDelegatingMassIndexingTypeGroupMonitor; import org.hibernate.search.util.common.annotation.Incubating; /** @@ -21,9 +21,23 @@ */ public interface MassIndexingMonitor { + /** + * Creates a type-group-specific monitor. + *

+ * The mass indexer may group some of the types it has to index or index them separately. + * The type group represents this combination of types that are retrieved for indexing + * in the same pipeline. + *

+ * The mass indexer will request to create a type group monitor in the main indexing thread + * when initializing the mass indexing environment. + * + * @param context Describes the type group for which the monitor is requested. + * @return A type group mass indexing monitor. By default, a no-op monitor is returned. + * @see MassIndexingTypeGroupMonitor + */ @Incubating - default MassIndexingTypeGroupMonitor typeGroupMonitor(MassIndexingTypeGroupMonitorContext context) { - return NoOpMassIndexingTypeGroupMonitor.INSTANCE; + default MassIndexingTypeGroupMonitor typeGroupMonitor(MassIndexingTypeGroupMonitorCreateContext context) { + return new LegacyDelegatingMassIndexingTypeGroupMonitor( this, context ); } /** @@ -94,8 +108,16 @@ default MassIndexingTypeGroupMonitor typeGroupMonitor(MassIndexingTypeGroupMonit * This method can be invoked from several threads thus implementors are required to be thread-safe. * * @param increment additional number of entities that will be indexed + * @deprecated Use {@link MassIndexingTypeGroupMonitor#indexingStarted(MassIndexingTypeGroupMonitorContext)} + * and get the total count, if available, from the {@link MassIndexingTypeGroupMonitorContext#totalCount()}. + * Alternatively, use the {@link #typeGroupMonitor(MassIndexingTypeGroupMonitorCreateContext)} + * and obtain the count from {@link MassIndexingTypeGroupMonitorCreateContext#totalCount()} if + * a count is needed before any indexing processes are started. */ - void addToTotalCount(long increment); + @Deprecated(forRemoval = true, since = "8.0") + default void addToTotalCount(long increment) { + // do nothing; + } /** * Notify the monitor that indexing is complete. diff --git a/mapper/pojo-base/src/main/java/org/hibernate/search/mapper/pojo/massindexing/MassIndexingTypeGroupMonitor.java b/mapper/pojo-base/src/main/java/org/hibernate/search/mapper/pojo/massindexing/MassIndexingTypeGroupMonitor.java index 15257c4ddd3..1916226e81d 100644 --- a/mapper/pojo-base/src/main/java/org/hibernate/search/mapper/pojo/massindexing/MassIndexingTypeGroupMonitor.java +++ b/mapper/pojo-base/src/main/java/org/hibernate/search/mapper/pojo/massindexing/MassIndexingTypeGroupMonitor.java @@ -4,8 +4,6 @@ */ package org.hibernate.search.mapper.pojo.massindexing; -import java.util.OptionalLong; - import org.hibernate.search.util.common.annotation.Incubating; /** @@ -29,22 +27,25 @@ public interface MassIndexingTypeGroupMonitor { *

* This method can be invoked from several threads thus implementors are required to be thread-safe. * - * @param increment additional number of documents built + * @param increment The additional number of documents built and added to the index. */ - void documentsAdded(long increment); + void documentsIndexed(long increment); /** * Notify the monitor that indexing of the type group is starting * and provide the expected number of entities in the group, if known. * - * @param totalCount An optional containing the expected number of entities in the group to index, if known, - * otherwise an empty optional is supplied. + * @param context A context object exposing additional information and operations + * that may be relevant for the implementors of this monitor. */ - void indexingStarted(OptionalLong totalCount); + void indexingStarted(MassIndexingTypeGroupMonitorContext context); /** * Notify the monitor that indexing of the type group is completed. + * + * @param context A context object exposing additional information and operations + * that may be relevant for the implementors of this monitor. */ - void indexingCompleted(); + void indexingCompleted(MassIndexingTypeGroupMonitorContext context); } diff --git a/mapper/pojo-base/src/main/java/org/hibernate/search/mapper/pojo/massindexing/MassIndexingTypeGroupMonitorContext.java b/mapper/pojo-base/src/main/java/org/hibernate/search/mapper/pojo/massindexing/MassIndexingTypeGroupMonitorContext.java index e67961f0817..f39f457ca10 100644 --- a/mapper/pojo-base/src/main/java/org/hibernate/search/mapper/pojo/massindexing/MassIndexingTypeGroupMonitorContext.java +++ b/mapper/pojo-base/src/main/java/org/hibernate/search/mapper/pojo/massindexing/MassIndexingTypeGroupMonitorContext.java @@ -4,13 +4,27 @@ */ package org.hibernate.search.mapper.pojo.massindexing; -import java.util.Set; +import java.util.OptionalLong; import org.hibernate.search.util.common.annotation.Incubating; @Incubating public interface MassIndexingTypeGroupMonitorContext { - Set includedTypes(); + /** + * Provides a total count of entities within the type group that should be indexed if obtaining such count is possible. + *

+ * Warning: This operation is not cached and a count from the underlying loading strategy + * will be requested on each call to get the total count. + *

+ * The loaders used to calculate the count provided by this context are reused by the + * indexing process, which means that, in general, the number returned by this context + * should match the number of entities to index. + * + * @return The total count of entities to be indexed within the current type group, or an empty optional + * if the count cannot be determined by the underlying loading strategy, e.g. when the strategy is based on a stream data + * and obtaining count is not possible until all elements of the stream are consumed. + */ + OptionalLong totalCount(); } diff --git a/mapper/pojo-base/src/main/java/org/hibernate/search/mapper/pojo/massindexing/MassIndexingTypeGroupMonitorCreateContext.java b/mapper/pojo-base/src/main/java/org/hibernate/search/mapper/pojo/massindexing/MassIndexingTypeGroupMonitorCreateContext.java new file mode 100644 index 00000000000..918eb040004 --- /dev/null +++ b/mapper/pojo-base/src/main/java/org/hibernate/search/mapper/pojo/massindexing/MassIndexingTypeGroupMonitorCreateContext.java @@ -0,0 +1,38 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright Red Hat Inc. and Hibernate Authors + */ +package org.hibernate.search.mapper.pojo.massindexing; + +import java.util.OptionalLong; +import java.util.Set; + +import org.hibernate.search.util.common.annotation.Incubating; + +@Incubating +public interface MassIndexingTypeGroupMonitorCreateContext { + + /** + * Describes the entity types included in the type group. + * + * @return The set of entity types included in the type group. + */ + Set includedTypes(); + + /** + * Provides a total count of entities within the type group that should be indexed if obtaining such a count is possible. + *

+ * Warning: This operation is not cached and a count from the underlying loading strategy + * will be requested on each call to get the total count. + *

+ * The loaders used to calculate the count provided by this context are not reused by the + * indexing process, which means that, in general, the number returned by this context + * may not match the number of entities to index once the actual indexing starts. + * This can happen when new entities are added/existing ones are removed before the indexing process starts. + * + * @return The total count of entities to be indexed within the current type group, or an empty optional + * if the count cannot be determined by the underlying loading strategy, e.g. when the strategy is based on a stream data + * and obtaining count is not possible until all elements of the stream are consumed. + */ + OptionalLong totalCount(); +} diff --git a/mapper/pojo-base/src/main/java/org/hibernate/search/mapper/pojo/massindexing/impl/LegacyDelegatingMassIndexingTypeGroupMonitor.java b/mapper/pojo-base/src/main/java/org/hibernate/search/mapper/pojo/massindexing/impl/LegacyDelegatingMassIndexingTypeGroupMonitor.java new file mode 100644 index 00000000000..ad916ab7589 --- /dev/null +++ b/mapper/pojo-base/src/main/java/org/hibernate/search/mapper/pojo/massindexing/impl/LegacyDelegatingMassIndexingTypeGroupMonitor.java @@ -0,0 +1,43 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright Red Hat Inc. and Hibernate Authors + */ +package org.hibernate.search.mapper.pojo.massindexing.impl; + +import java.util.OptionalLong; + +import org.hibernate.search.mapper.pojo.massindexing.MassIndexingMonitor; +import org.hibernate.search.mapper.pojo.massindexing.MassIndexingTypeGroupMonitor; +import org.hibernate.search.mapper.pojo.massindexing.MassIndexingTypeGroupMonitorContext; +import org.hibernate.search.mapper.pojo.massindexing.MassIndexingTypeGroupMonitorCreateContext; +import org.hibernate.search.util.common.annotation.Incubating; + +@Incubating +public final class LegacyDelegatingMassIndexingTypeGroupMonitor implements MassIndexingTypeGroupMonitor { + + private final MassIndexingMonitor delegate; + + public LegacyDelegatingMassIndexingTypeGroupMonitor(MassIndexingMonitor delegate, + MassIndexingTypeGroupMonitorCreateContext context) { + this.delegate = delegate; + } + + @Override + public void documentsIndexed(long increment) { + // do nothing + } + + @SuppressWarnings("removal") + @Override + public void indexingStarted(MassIndexingTypeGroupMonitorContext context) { + OptionalLong count = context.totalCount(); + if ( count.isPresent() ) { + delegate.addToTotalCount( count.getAsLong() ); + } + } + + @Override + public void indexingCompleted(MassIndexingTypeGroupMonitorContext context) { + // do nothing + } +} diff --git a/mapper/pojo-base/src/main/java/org/hibernate/search/mapper/pojo/massindexing/impl/MassIndexingTypeGroupContext.java b/mapper/pojo-base/src/main/java/org/hibernate/search/mapper/pojo/massindexing/impl/MassIndexingTypeGroupContext.java new file mode 100644 index 00000000000..97043a8a983 --- /dev/null +++ b/mapper/pojo-base/src/main/java/org/hibernate/search/mapper/pojo/massindexing/impl/MassIndexingTypeGroupContext.java @@ -0,0 +1,113 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright Red Hat Inc. and Hibernate Authors + */ +package org.hibernate.search.mapper.pojo.massindexing.impl; + +import java.util.List; +import java.util.OptionalLong; +import java.util.Set; +import java.util.stream.Collectors; + +import org.hibernate.search.mapper.pojo.loading.spi.PojoLoadingTypeContext; +import org.hibernate.search.mapper.pojo.loading.spi.PojoMassIdentifierLoader; +import org.hibernate.search.mapper.pojo.loading.spi.PojoMassIdentifierLoadingContext; +import org.hibernate.search.mapper.pojo.loading.spi.PojoMassIdentifierSink; +import org.hibernate.search.mapper.pojo.loading.spi.PojoMassLoadingContext; +import org.hibernate.search.mapper.pojo.massindexing.MassIndexingType; +import org.hibernate.search.mapper.pojo.massindexing.MassIndexingTypeGroupMonitorContext; +import org.hibernate.search.mapper.pojo.massindexing.MassIndexingTypeGroupMonitorCreateContext; +import org.hibernate.search.mapper.pojo.massindexing.spi.PojoMassIndexingContext; +import org.hibernate.search.util.common.AssertionFailure; + +class MassIndexingTypeGroupContext + implements MassIndexingTypeGroupMonitorCreateContext, MassIndexingTypeGroupMonitorContext { + + private final Set includedTypes; + private final PojoMassIndexingIndexedTypeGroup typeGroup; + private final PojoMassIndexingContext massIndexingContext; + private final String tenantId; + + public MassIndexingTypeGroupContext(PojoMassIndexingIndexedTypeGroup typeGroup, + PojoMassIndexingContext massIndexingContext, String tenantId) { + this.includedTypes = typeGroup.includedTypes().stream().map( PojoLoadingTypeContext::entityName ) + .map( MassIndexingTypeImpl::new ) + .collect( Collectors.toSet() ); + this.typeGroup = typeGroup; + this.massIndexingContext = massIndexingContext; + this.tenantId = tenantId; + } + + PojoMassIndexingContext massIndexingContext() { + return massIndexingContext; + } + + @Override + public Set includedTypes() { + return includedTypes; + } + + @Override + public OptionalLong totalCount() { + try ( PojoMassIdentifierLoader loader = createLoader() ) { + return loader.totalCount(); + } + } + + private PojoMassIdentifierLoader createLoader() { + return typeGroup.loadingStrategy().createIdentifierLoader( + typeGroup.includedTypes(), + new DummyIdentifierLoadingContext<>() + ); + } + + public String tenantIdentifier() { + return tenantId; + } + + public MassIndexingTypeGroupMonitorContext withIdentifierLoader(PojoMassIdentifierLoader loader) { + return loader::totalCount; + } + + private static class MassIndexingTypeImpl implements MassIndexingType { + private final String entityName; + + private MassIndexingTypeImpl(String entityName) { + this.entityName = entityName; + } + + @Override + public String entityName() { + return entityName; + } + } + + private class DummyIdentifierLoadingContext implements PojoMassIdentifierLoadingContext { + + @Override + public PojoMassLoadingContext parent() { + return massIndexingContext; + } + + @Override + public PojoMassIdentifierSink createSink() { + // this sink should never be called by Hibernate Search, or anyone else for that matter: + return new PojoMassIdentifierSink<>() { + @Override + public void accept(List batch) throws InterruptedException { + throw new AssertionFailure( "An unexpected call to a sink method." ); + } + + @Override + public void complete() { + throw new AssertionFailure( "An unexpected call to a sink method." ); + } + }; + } + + @Override + public String tenantIdentifier() { + return tenantId; + } + } +} diff --git a/mapper/pojo-base/src/main/java/org/hibernate/search/mapper/pojo/massindexing/impl/NoOpMassIndexingTypeGroupMonitor.java b/mapper/pojo-base/src/main/java/org/hibernate/search/mapper/pojo/massindexing/impl/NoOpMassIndexingTypeGroupMonitor.java deleted file mode 100644 index 62d21b0623c..00000000000 --- a/mapper/pojo-base/src/main/java/org/hibernate/search/mapper/pojo/massindexing/impl/NoOpMassIndexingTypeGroupMonitor.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * Copyright Red Hat Inc. and Hibernate Authors - */ -package org.hibernate.search.mapper.pojo.massindexing.impl; - -import java.util.OptionalLong; - -import org.hibernate.search.mapper.pojo.massindexing.MassIndexingTypeGroupMonitor; -import org.hibernate.search.util.common.annotation.Incubating; - -@Incubating -public final class NoOpMassIndexingTypeGroupMonitor implements MassIndexingTypeGroupMonitor { - - public static final NoOpMassIndexingTypeGroupMonitor INSTANCE = new NoOpMassIndexingTypeGroupMonitor(); - - private NoOpMassIndexingTypeGroupMonitor() { - } - - @Override - public void documentsAdded(long increment) { - // do nothing - } - - @Override - public void indexingStarted(OptionalLong totalCount) { - // do nothing - } - - @Override - public void indexingCompleted() { - // do nothing - } -} diff --git a/mapper/pojo-base/src/main/java/org/hibernate/search/mapper/pojo/massindexing/impl/PojoMassIndexingBatchIndexingWorkspace.java b/mapper/pojo-base/src/main/java/org/hibernate/search/mapper/pojo/massindexing/impl/PojoMassIndexingBatchIndexingWorkspace.java index cca8a20d0bb..4287252c63d 100644 --- a/mapper/pojo-base/src/main/java/org/hibernate/search/mapper/pojo/massindexing/impl/PojoMassIndexingBatchIndexingWorkspace.java +++ b/mapper/pojo-base/src/main/java/org/hibernate/search/mapper/pojo/massindexing/impl/PojoMassIndexingBatchIndexingWorkspace.java @@ -7,19 +7,14 @@ import java.lang.invoke.MethodHandles; import java.util.ArrayList; import java.util.List; -import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; -import java.util.stream.Collectors; -import org.hibernate.search.mapper.pojo.loading.spi.PojoLoadingTypeContext; import org.hibernate.search.mapper.pojo.loading.spi.PojoMassLoadingStrategy; import org.hibernate.search.mapper.pojo.logging.impl.Log; import org.hibernate.search.mapper.pojo.massindexing.MassIndexingEnvironment; -import org.hibernate.search.mapper.pojo.massindexing.MassIndexingType; import org.hibernate.search.mapper.pojo.massindexing.MassIndexingTypeGroupMonitor; -import org.hibernate.search.mapper.pojo.massindexing.MassIndexingTypeGroupMonitorContext; import org.hibernate.search.mapper.pojo.massindexing.spi.PojoMassIndexingContext; import org.hibernate.search.mapper.pojo.massindexing.spi.PojoMassIndexingMappingContext; import org.hibernate.search.util.common.AssertionFailure; @@ -45,11 +40,11 @@ public class PojoMassIndexingBatchIndexingWorkspace extends PojoMassIndexi private final PojoMassIndexingMappingContext mappingContext; private final PojoMassIndexingIndexedTypeGroup typeGroup; private final PojoMassLoadingStrategy loadingStrategy; - private final PojoMassIndexingContext massIndexingContext; private final int entityExtractingThreads; private final String tenantId; private final MassIndexingTypeGroupMonitor typeGroupMonitor; + private final MassIndexingTypeGroupContext massIndexingTypeGroupContext; PojoMassIndexingBatchIndexingWorkspace(PojoMassIndexingMappingContext mappingContext, PojoMassIndexingNotifier notifier, @@ -62,10 +57,10 @@ public class PojoMassIndexingBatchIndexingWorkspace extends PojoMassIndexi this.mappingContext = mappingContext; this.typeGroup = typeGroup; this.loadingStrategy = loadingStrategy; - this.massIndexingContext = massIndexingContext; this.entityExtractingThreads = entityExtractingThreads; this.tenantId = tenantId; - this.typeGroupMonitor = notifier.typeGroupMonitor( new MassIndexingTypeGroupMonitorContextImpl( typeGroup ) ); + this.massIndexingTypeGroupContext = new MassIndexingTypeGroupContext<>( typeGroup, massIndexingContext, tenantId ); + this.typeGroupMonitor = notifier.typeGroupMonitor( massIndexingTypeGroupContext ); } @Override @@ -111,9 +106,9 @@ private void startProducingPrimaryKeys(PojoProducerConsumerQueue> identi final Runnable runnable = new PojoMassIndexingEntityIdentifierLoadingRunnable<>( getNotifier(), typeGroupMonitor, - massIndexingContext, getMassIndexingEnvironment(), + massIndexingTypeGroupContext, getMassIndexingEnvironment(), typeGroup, loadingStrategy, - identifierQueue, tenantId + identifierQueue ); //execIdentifiersLoader has size 1 and is not configurable: ensures the list is consistent as produced by one transaction final ThreadPoolExecutor identifierProducingExecutor = mappingContext.threadPoolProvider().newFixedThreadPool( @@ -132,7 +127,7 @@ private void startIndexing(PojoProducerConsumerQueue> identifierQueue) { final Runnable runnable = new PojoMassIndexingEntityLoadingRunnable<>( getNotifier(), typeGroupMonitor, - massIndexingContext, getMassIndexingEnvironment(), + massIndexingTypeGroupContext.massIndexingContext(), getMassIndexingEnvironment(), typeGroup, loadingStrategy, identifierQueue, tenantId ); @@ -145,40 +140,11 @@ massIndexingContext, getMassIndexingEnvironment(), indexingFutures.add( Futures.runAsync( runnable, indexingExecutor ) ); } CompletableFuture.allOf( indexingFutures.toArray( CompletableFuture[]::new ) ) - .thenRun( typeGroupMonitor::indexingCompleted ); + .thenRun( () -> typeGroupMonitor.indexingCompleted( massIndexingTypeGroupContext ) ); } finally { indexingExecutor.shutdown(); } } - private static class MassIndexingTypeGroupMonitorContextImpl implements MassIndexingTypeGroupMonitorContext { - - private final Set includedTypes; - - public MassIndexingTypeGroupMonitorContextImpl(PojoMassIndexingIndexedTypeGroup typeGroup) { - includedTypes = typeGroup.includedTypes().stream().map( PojoLoadingTypeContext::entityName ) - .map( MassIndexingTypeImpl::new ) - .collect( Collectors.toSet() ); - } - - @Override - public Set includedTypes() { - return includedTypes; - } - } - - - private static class MassIndexingTypeImpl implements MassIndexingType { - private final String entityName; - - private MassIndexingTypeImpl(String entityName) { - this.entityName = entityName; - } - - @Override - public String entityName() { - return entityName; - } - } } diff --git a/mapper/pojo-base/src/main/java/org/hibernate/search/mapper/pojo/massindexing/impl/PojoMassIndexingEntityIdentifierLoadingRunnable.java b/mapper/pojo-base/src/main/java/org/hibernate/search/mapper/pojo/massindexing/impl/PojoMassIndexingEntityIdentifierLoadingRunnable.java index a3da128697f..c429476a9a1 100644 --- a/mapper/pojo-base/src/main/java/org/hibernate/search/mapper/pojo/massindexing/impl/PojoMassIndexingEntityIdentifierLoadingRunnable.java +++ b/mapper/pojo-base/src/main/java/org/hibernate/search/mapper/pojo/massindexing/impl/PojoMassIndexingEntityIdentifierLoadingRunnable.java @@ -7,7 +7,6 @@ import java.lang.invoke.MethodHandles; import java.util.ArrayList; import java.util.List; -import java.util.OptionalLong; import org.hibernate.search.mapper.pojo.loading.spi.PojoMassIdentifierLoader; import org.hibernate.search.mapper.pojo.loading.spi.PojoMassIdentifierLoadingContext; @@ -17,7 +16,6 @@ import org.hibernate.search.mapper.pojo.logging.impl.Log; import org.hibernate.search.mapper.pojo.massindexing.MassIndexingEnvironment; import org.hibernate.search.mapper.pojo.massindexing.MassIndexingTypeGroupMonitor; -import org.hibernate.search.mapper.pojo.massindexing.spi.PojoMassIndexingContext; import org.hibernate.search.util.common.logging.impl.LoggerFactory; public class PojoMassIndexingEntityIdentifierLoadingRunnable @@ -26,26 +24,24 @@ public class PojoMassIndexingEntityIdentifierLoadingRunnable private static final Log log = LoggerFactory.make( Log.class, MethodHandles.lookup() ); private final MassIndexingTypeGroupMonitor typeGroupMonitor; - private final PojoMassIndexingContext massIndexingContext; + private final MassIndexingTypeGroupContext massIndexingTypeGroupContext; private final PojoMassIndexingIndexedTypeGroup typeGroup; private final PojoMassLoadingStrategy loadingStrategy; private final PojoProducerConsumerQueue> identifierQueue; - private final String tenantId; private final MassIndexingEnvironment.EntityIdentifierLoadingContext identifierLoadingContext; public PojoMassIndexingEntityIdentifierLoadingRunnable(PojoMassIndexingNotifier notifier, MassIndexingTypeGroupMonitor typeGroupMonitor, - PojoMassIndexingContext massIndexingContext, MassIndexingEnvironment environment, + MassIndexingTypeGroupContext massIndexingTypeGroupContext, MassIndexingEnvironment environment, PojoMassIndexingIndexedTypeGroup typeGroup, PojoMassLoadingStrategy loadingStrategy, - PojoProducerConsumerQueue> identifierQueue, String tenantId) { + PojoProducerConsumerQueue> identifierQueue) { super( notifier, environment ); this.typeGroupMonitor = typeGroupMonitor; - this.massIndexingContext = massIndexingContext; + this.massIndexingTypeGroupContext = massIndexingTypeGroupContext; this.loadingStrategy = loadingStrategy; this.typeGroup = typeGroup; this.identifierQueue = identifierQueue; - this.tenantId = tenantId; this.identifierLoadingContext = new EntityIdentifierLoadingContextImpl(); } @@ -55,12 +51,7 @@ protected void runWithFailureHandler() throws InterruptedException { log.trace( "started" ); LoadingContext context = new LoadingContext(); try ( PojoMassIdentifierLoader loader = loadingStrategy.createIdentifierLoader( typeGroup.includedTypes(), context ) ) { - OptionalLong count = loader.totalCount(); - typeGroupMonitor.indexingStarted( count ); - if ( count.isPresent() ) { - getNotifier().reportAddedTotalCount( count.getAsLong() ); - } - + typeGroupMonitor.indexingStarted( massIndexingTypeGroupContext.withIdentifierLoader( loader ) ); do { loader.loadNext(); } @@ -104,7 +95,7 @@ private class LoadingContext implements PojoMassIdentifierLoadingContext { @Override public PojoMassLoadingContext parent() { - return massIndexingContext; + return massIndexingTypeGroupContext.massIndexingContext(); } @Override @@ -126,11 +117,12 @@ public void complete() { @Override public String tenantIdentifier() { - return tenantId; + return massIndexingTypeGroupContext.tenantIdentifier(); } } private static final class EntityIdentifierLoadingContextImpl implements MassIndexingEnvironment.EntityIdentifierLoadingContext { } + } diff --git a/mapper/pojo-base/src/main/java/org/hibernate/search/mapper/pojo/massindexing/impl/PojoMassIndexingEntityLoadingRunnable.java b/mapper/pojo-base/src/main/java/org/hibernate/search/mapper/pojo/massindexing/impl/PojoMassIndexingEntityLoadingRunnable.java index e0418118cb5..4880574c880 100644 --- a/mapper/pojo-base/src/main/java/org/hibernate/search/mapper/pojo/massindexing/impl/PojoMassIndexingEntityLoadingRunnable.java +++ b/mapper/pojo-base/src/main/java/org/hibernate/search/mapper/pojo/massindexing/impl/PojoMassIndexingEntityLoadingRunnable.java @@ -223,7 +223,7 @@ private void waitForIndexingEndAndReport() throws InterruptedException { } getNotifier().reportDocumentsAdded( successfulEntities ); - typeGroupMonitor.documentsAdded( successfulEntities ); + typeGroupMonitor.documentsIndexed( successfulEntities ); this.sessionContext = null; this.entities = null; diff --git a/mapper/pojo-base/src/main/java/org/hibernate/search/mapper/pojo/massindexing/impl/PojoMassIndexingLoggingMonitor.java b/mapper/pojo-base/src/main/java/org/hibernate/search/mapper/pojo/massindexing/impl/PojoMassIndexingLoggingMonitor.java index 0a2fe864288..afe8054a093 100644 --- a/mapper/pojo-base/src/main/java/org/hibernate/search/mapper/pojo/massindexing/impl/PojoMassIndexingLoggingMonitor.java +++ b/mapper/pojo-base/src/main/java/org/hibernate/search/mapper/pojo/massindexing/impl/PojoMassIndexingLoggingMonitor.java @@ -11,11 +11,14 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.LongAdder; import java.util.function.BinaryOperator; +import java.util.stream.Collectors; import org.hibernate.search.mapper.pojo.logging.impl.Log; import org.hibernate.search.mapper.pojo.massindexing.MassIndexingMonitor; +import org.hibernate.search.mapper.pojo.massindexing.MassIndexingType; import org.hibernate.search.mapper.pojo.massindexing.MassIndexingTypeGroupMonitor; import org.hibernate.search.mapper.pojo.massindexing.MassIndexingTypeGroupMonitorContext; +import org.hibernate.search.mapper.pojo.massindexing.MassIndexingTypeGroupMonitorCreateContext; import org.hibernate.search.util.common.logging.impl.LoggerFactory; /** @@ -32,6 +35,8 @@ public class PojoMassIndexingLoggingMonitor implements MassIndexingMonitor { private final LongAdder totalCounter = new LongAdder(); private volatile long startTime; private final int logAfterNumberOfDocuments; + private boolean countOnStart; + private boolean countOnBeforeType; private final AtomicLong typesToIndex = new AtomicLong(); private final AtomicLong groupsWithUnknownTotal = new AtomicLong(); @@ -50,13 +55,24 @@ public PojoMassIndexingLoggingMonitor() { * @param logAfterNumberOfDocuments log each time the specified number of documents has been added */ public PojoMassIndexingLoggingMonitor(int logAfterNumberOfDocuments) { + this( logAfterNumberOfDocuments, false, true ); + } + + public PojoMassIndexingLoggingMonitor(boolean countOnStart, boolean countOnBeforeType) { + this( 50, countOnStart, countOnBeforeType ); + } + + public PojoMassIndexingLoggingMonitor(int logAfterNumberOfDocuments, boolean countOnStart, + boolean countOnBeforeType) { this.logAfterNumberOfDocuments = logAfterNumberOfDocuments; + this.countOnStart = countOnStart; + this.countOnBeforeType = countOnBeforeType; } @Override - public MassIndexingTypeGroupMonitor typeGroupMonitor(MassIndexingTypeGroupMonitorContext context) { + public MassIndexingTypeGroupMonitor typeGroupMonitor(MassIndexingTypeGroupMonitorCreateContext context) { typesToIndex.addAndGet( context.includedTypes().size() ); - return new MassIndexingTypeGroupMonitorImpl(); + return new MassIndexingTypeGroupMonitorImpl( context ); } @Override @@ -96,13 +112,7 @@ public void documentsBuilt(long number) { @Override public void entitiesLoaded(long size) { - //not used - } - @Override - public void addToTotalCount(long count) { - totalCounter.add( count ); - log.indexingEntities( count ); } @Override @@ -185,26 +195,54 @@ public boolean isMoreUpToDateThan(StatusMessageInfo other) { private class MassIndexingTypeGroupMonitorImpl implements MassIndexingTypeGroupMonitor { - private boolean totalUnknown = false; + private final long numberOfTypes; + private final OptionalLong totalBefore; + private boolean totalUnknown = true; + + public MassIndexingTypeGroupMonitorImpl(MassIndexingTypeGroupMonitorCreateContext context) { + this.numberOfTypes = context.includedTypes().size(); + if ( countOnStart ) { + totalBefore = context.totalCount(); + if ( totalBefore.isPresent() ) { + totalUnknown = false; + long count = totalBefore.getAsLong(); + totalCounter.add( count ); + log.indexingEntitiesApprox( count, context.includedTypes().stream().map( MassIndexingType::entityName ) + .collect( Collectors.joining( ", ", "[ ", " ]" ) ) ); + } + } + else { + totalBefore = OptionalLong.empty(); + } + } @Override - public void documentsAdded(long increment) { + public void documentsIndexed(long increment) { if ( totalUnknown ) { totalCounter.add( increment ); } } @Override - public void indexingStarted(OptionalLong totalCount) { - typesToIndex.decrementAndGet(); - if ( totalCount.isEmpty() ) { - groupsWithUnknownTotal.incrementAndGet(); - totalUnknown = true; + public void indexingStarted(MassIndexingTypeGroupMonitorContext context) { + typesToIndex.addAndGet( -numberOfTypes ); + + if ( countOnBeforeType ) { + OptionalLong totalCount = context.totalCount(); + if ( totalCount.isEmpty() ) { + groupsWithUnknownTotal.incrementAndGet(); + } + else { + totalUnknown = false; + long actual = totalCount.getAsLong(); + totalCounter.add( actual - totalBefore.orElse( 0 ) ); + log.indexingEntities( actual ); + } } } @Override - public void indexingCompleted() { + public void indexingCompleted(MassIndexingTypeGroupMonitorContext context) { if ( totalUnknown ) { groupsWithUnknownTotal.decrementAndGet(); } diff --git a/mapper/pojo-base/src/main/java/org/hibernate/search/mapper/pojo/massindexing/impl/PojoMassIndexingNotifier.java b/mapper/pojo-base/src/main/java/org/hibernate/search/mapper/pojo/massindexing/impl/PojoMassIndexingNotifier.java index fd51711284a..0e64d304c64 100644 --- a/mapper/pojo-base/src/main/java/org/hibernate/search/mapper/pojo/massindexing/impl/PojoMassIndexingNotifier.java +++ b/mapper/pojo-base/src/main/java/org/hibernate/search/mapper/pojo/massindexing/impl/PojoMassIndexingNotifier.java @@ -21,7 +21,6 @@ import org.hibernate.search.mapper.pojo.massindexing.MassIndexingFailureHandler; import org.hibernate.search.mapper.pojo.massindexing.MassIndexingMonitor; import org.hibernate.search.mapper.pojo.massindexing.MassIndexingTypeGroupMonitor; -import org.hibernate.search.mapper.pojo.massindexing.MassIndexingTypeGroupMonitorContext; import org.hibernate.search.mapper.pojo.massindexing.spi.PojoMassIndexingSessionContext; import org.hibernate.search.util.common.logging.impl.LoggerFactory; @@ -50,11 +49,7 @@ public PojoMassIndexingNotifier( .orElseGet( failureHandler::failureFloodingThreshold ); } - void reportAddedTotalCount(long totalCount) { - monitor.addToTotalCount( totalCount ); - } - - public MassIndexingTypeGroupMonitor typeGroupMonitor(MassIndexingTypeGroupMonitorContext context) { + MassIndexingTypeGroupMonitor typeGroupMonitor(MassIndexingTypeGroupContext context) { return monitor.typeGroupMonitor( context ); } diff --git a/mapper/pojo-base/src/main/java/org/hibernate/search/mapper/pojo/massindexing/spi/PojoMassIndexer.java b/mapper/pojo-base/src/main/java/org/hibernate/search/mapper/pojo/massindexing/spi/PojoMassIndexer.java index 725bf6fe679..8a46d8d1830 100644 --- a/mapper/pojo-base/src/main/java/org/hibernate/search/mapper/pojo/massindexing/spi/PojoMassIndexer.java +++ b/mapper/pojo-base/src/main/java/org/hibernate/search/mapper/pojo/massindexing/spi/PojoMassIndexer.java @@ -165,7 +165,8 @@ public interface PojoMassIndexer { *

* With fail-fast option enabled, the mass indexer will request cancelling all internal mass-indexing processes * right after the first error is reported to the {@link MassIndexingFailureHandler}. - * + *

+ * Defaults to {@code false}. * @param failFast Whether to enabled fail fast option for this mass indexer. * * @return {@code this} for method chaining