Skip to content

Commit

Permalink
HSEARCH-4487 Simplify Jakarta Batch test utils
Browse files Browse the repository at this point in the history
  • Loading branch information
yrodiere committed Oct 2, 2023
1 parent b15cb10 commit da756ee
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 151 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,12 @@
*/
package org.hibernate.search.integrationtest.jakarta.batch.massindexing;

import static org.hibernate.search.integrationtest.jakarta.batch.util.JobTestUtil.JOB_TIMEOUT_MS;
import static org.hibernate.search.util.impl.integrationtest.mapper.orm.OrmUtils.with;
import static org.junit.Assert.assertEquals;

import java.util.ArrayList;
import java.util.List;

import jakarta.batch.operations.JobOperator;
import jakarta.batch.runtime.JobExecution;
import jakarta.persistence.EntityManagerFactory;
import jakarta.persistence.Persistence;

Expand Down Expand Up @@ -49,12 +46,10 @@ public class EntityManagerFactoryRetrievalIT {

private static final String SESSION_FACTORY_NAME = "primary_session_factory";

protected JobOperator jobOperator;
protected EntityManagerFactory emf;

@Before
public void setup() {
jobOperator = JobTestUtil.getAndCheckRuntime();
List<Company> companies = new ArrayList<>();
List<Person> people = new ArrayList<>();
List<WhoAmI> whos = new ArrayList<>();
Expand Down Expand Up @@ -97,16 +92,13 @@ public void defaultNamespace() throws Exception {
List<Company> companies = JobTestUtil.findIndexedResults( emf, Company.class, "name", "Google" );
assertEquals( 0, companies.size() );

long executionId = jobOperator.start(
MassIndexingJob.NAME,
JobTestUtil.startJobAndWaitForSuccessNoRetry(
MassIndexingJob.parameters()
.forEntity( Company.class )
.checkpointInterval( CHECKPOINT_INTERVAL )
.entityManagerFactoryReference( getPersistenceUnitName() )
.build()
);
JobExecution jobExecution = jobOperator.getJobExecution( executionId );
JobTestUtil.waitForTermination( jobOperator, jobExecution, JOB_TIMEOUT_MS );

companies = JobTestUtil.findIndexedResults( emf, Company.class, "name", "Google" );
assertEquals( INSTANCES_PER_DATA_TEMPLATE, companies.size() );
Expand All @@ -117,17 +109,14 @@ public void persistenceUnitNamespace() throws Exception {
List<Company> companies = JobTestUtil.findIndexedResults( emf, Company.class, "name", "Google" );
assertEquals( 0, companies.size() );

long executionId = jobOperator.start(
MassIndexingJob.NAME,
JobTestUtil.startJobAndWaitForSuccessNoRetry(
MassIndexingJob.parameters()
.forEntity( Company.class )
.checkpointInterval( CHECKPOINT_INTERVAL )
.entityManagerFactoryNamespace( "persistence-unit-name" )
.entityManagerFactoryReference( getPersistenceUnitName() )
.build()
);
JobExecution jobExecution = jobOperator.getJobExecution( executionId );
JobTestUtil.waitForTermination( jobOperator, jobExecution, JOB_TIMEOUT_MS );

companies = JobTestUtil.findIndexedResults( emf, Company.class, "name", "Google" );
assertEquals( INSTANCES_PER_DATA_TEMPLATE, companies.size() );
Expand All @@ -138,17 +127,14 @@ public void sessionFactoryNamespace() throws Exception {
List<Company> companies = JobTestUtil.findIndexedResults( emf, Company.class, "name", "Google" );
assertEquals( 0, companies.size() );

long executionId = jobOperator.start(
MassIndexingJob.NAME,
JobTestUtil.startJobAndWaitForSuccessNoRetry(
MassIndexingJob.parameters()
.forEntity( Company.class )
.checkpointInterval( CHECKPOINT_INTERVAL )
.entityManagerFactoryNamespace( "session-factory-name" )
.entityManagerFactoryReference( SESSION_FACTORY_NAME )
.build()
);
JobExecution jobExecution = jobOperator.getJobExecution( executionId );
JobTestUtil.waitForTermination( jobOperator, jobExecution, JOB_TIMEOUT_MS );

companies = JobTestUtil.findIndexedResults( emf, Company.class, "name", "Google" );
assertEquals( INSTANCES_PER_DATA_TEMPLATE, companies.size() );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,14 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.entry;
import static org.hibernate.search.integrationtest.jakarta.batch.util.JobTestUtil.JOB_TIMEOUT_MS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assume.assumeTrue;

import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import jakarta.batch.operations.JobOperator;
import jakarta.batch.runtime.BatchStatus;
import jakarta.batch.runtime.JobExecution;
import jakarta.batch.runtime.StepExecution;
import jakarta.persistence.EntityManagerFactory;
Expand All @@ -36,15 +32,13 @@
import org.hibernate.search.integrationtest.jakarta.batch.massindexing.entity.WhoAmI;
import org.hibernate.search.integrationtest.jakarta.batch.util.BackendConfigurations;
import org.hibernate.search.integrationtest.jakarta.batch.util.JobTestUtil;
import org.hibernate.search.jakarta.batch.core.logging.impl.Log;
import org.hibernate.search.jakarta.batch.core.massindexing.MassIndexingJob;
import org.hibernate.search.jakarta.batch.core.massindexing.step.impl.StepProgress;
import org.hibernate.search.mapper.orm.Search;
import org.hibernate.search.mapper.orm.cfg.HibernateOrmMapperSettings;
import org.hibernate.search.mapper.orm.session.SearchSession;
import org.hibernate.search.mapper.orm.work.SearchIndexingPlan;
import org.hibernate.search.util.common.AssertionFailure;
import org.hibernate.search.util.common.logging.impl.LoggerFactory;
import org.hibernate.search.util.impl.integrationtest.mapper.orm.OrmSetupHelper;
import org.hibernate.search.util.impl.integrationtest.mapper.orm.ReusableOrmSetupHolder;
import org.hibernate.search.util.impl.test.annotation.TestForIssue;
Expand All @@ -60,8 +54,6 @@
*/
public class MassIndexingJobIT {

private static final Log log = LoggerFactory.make( Log.class, MethodHandles.lookup() );

protected static final int INSTANCES_PER_DATA_TEMPLATE = 100;

// We have three data templates per entity type (see setup)
Expand All @@ -82,7 +74,6 @@ public class MassIndexingJobIT {
public MethodRule setupHolderMethodRule = setupHolder.methodRule();

private EntityManagerFactory emf;
private JobOperator jobOperator;

@ReusableOrmSetupHolder.Setup
public void setup(OrmSetupHelper.SetupContext setupContext, ReusableOrmSetupHolder.DataClearConfig dataClearConfig) {
Expand All @@ -95,7 +86,6 @@ public void setup(OrmSetupHelper.SetupContext setupContext, ReusableOrmSetupHold
@Before
public void initData() {
emf = setupHolder.entityManagerFactory();
jobOperator = JobTestUtil.getAndCheckRuntime();
List<Company> companies = new ArrayList<>();
List<Person> people = new ArrayList<>();
List<WhoAmI> whos = new ArrayList<>();
Expand Down Expand Up @@ -145,19 +135,15 @@ public void simple()
assertEquals( 0, people.size() );
assertEquals( 0, whos.size() );

long executionId = jobOperator.start(
MassIndexingJob.NAME,
JobExecution execution = JobTestUtil.startJobAndWaitForSuccessNoRetry(
MassIndexingJob.parameters()
.forEntities( Company.class, Person.class, WhoAmI.class )
.checkpointInterval( CHECKPOINT_INTERVAL )
.build()
);
JobExecution jobExecution = jobOperator.getJobExecution( executionId );
JobTestUtil.waitForTermination( jobOperator, jobExecution, JOB_TIMEOUT_MS );
assertCompletion( executionId );
assertProgress( executionId, Person.class, INSTANCE_PER_ENTITY_TYPE );
assertProgress( executionId, Company.class, INSTANCE_PER_ENTITY_TYPE );
assertProgress( executionId, WhoAmI.class, INSTANCE_PER_ENTITY_TYPE );
assertProgress( execution, Person.class, INSTANCE_PER_ENTITY_TYPE );
assertProgress( execution, Company.class, INSTANCE_PER_ENTITY_TYPE );
assertProgress( execution, WhoAmI.class, INSTANCE_PER_ENTITY_TYPE );

companies = JobTestUtil.findIndexedResults( emf, Company.class, "name", "Google" );
people = JobTestUtil.findIndexedResults( emf, Person.class, "firstName", "Linus" );
Expand All @@ -178,18 +164,14 @@ public void simple_defaultCheckpointInterval()
assertEquals( 0, people.size() );
assertEquals( 0, whos.size() );

long executionId = jobOperator.start(
MassIndexingJob.NAME,
JobExecution execution = JobTestUtil.startJobAndWaitForSuccessNoRetry(
MassIndexingJob.parameters()
.forEntities( Company.class, Person.class, WhoAmI.class )
.build()
);
JobExecution jobExecution = jobOperator.getJobExecution( executionId );
JobTestUtil.waitForTermination( jobOperator, jobExecution, JOB_TIMEOUT_MS );
assertCompletion( executionId );
assertProgress( executionId, Person.class, INSTANCE_PER_ENTITY_TYPE );
assertProgress( executionId, Company.class, INSTANCE_PER_ENTITY_TYPE );
assertProgress( executionId, WhoAmI.class, INSTANCE_PER_ENTITY_TYPE );
assertProgress( execution, Person.class, INSTANCE_PER_ENTITY_TYPE );
assertProgress( execution, Company.class, INSTANCE_PER_ENTITY_TYPE );
assertProgress( execution, WhoAmI.class, INSTANCE_PER_ENTITY_TYPE );

companies = JobTestUtil.findIndexedResults( emf, Company.class, "name", "Google" );
people = JobTestUtil.findIndexedResults( emf, Person.class, "firstName", "Linus" );
Expand All @@ -212,17 +194,13 @@ public void indexedEmbeddedCollection() throws InterruptedException {
assertEquals( 0, groupsContainingRedHat.size() );
assertEquals( 0, groupsContainingMicrosoft.size() );

long executionId = jobOperator.start(
MassIndexingJob.NAME,
JobExecution execution = JobTestUtil.startJobAndWaitForSuccessNoRetry(
MassIndexingJob.parameters()
.forEntities( CompanyGroup.class )
.checkpointInterval( CHECKPOINT_INTERVAL )
.build()
);
JobExecution jobExecution = jobOperator.getJobExecution( executionId );
JobTestUtil.waitForTermination( jobOperator, jobExecution, JOB_TIMEOUT_MS );
assertCompletion( executionId );
assertProgress( executionId, CompanyGroup.class, INSTANCE_PER_ENTITY_TYPE );
assertProgress( execution, CompanyGroup.class, INSTANCE_PER_ENTITY_TYPE );

groupsContainingGoogle = JobTestUtil.findIndexedResults( emf, CompanyGroup.class, "companies.name", "Google" );
groupsContainingRedHat = JobTestUtil.findIndexedResults( emf, CompanyGroup.class, "companies.name", "Red Hat" );
Expand Down Expand Up @@ -251,19 +229,15 @@ public void indexedEmbeddedCollection_idFetchSize_entityFetchSize_mysql() throws
assertEquals( 0, groupsContainingRedHat.size() );
assertEquals( 0, groupsContainingMicrosoft.size() );

long executionId = jobOperator.start(
MassIndexingJob.NAME,
JobExecution execution = JobTestUtil.startJobAndWaitForSuccessNoRetry(
MassIndexingJob.parameters()
.forEntities( CompanyGroup.class )
.checkpointInterval( CHECKPOINT_INTERVAL )
// For MySQL, this is the only way to get proper scrolling
.idFetchSize( Integer.MIN_VALUE )
.build()
);
JobExecution jobExecution = jobOperator.getJobExecution( executionId );
JobTestUtil.waitForTermination( jobOperator, jobExecution, JOB_TIMEOUT_MS );
assertCompletion( executionId );
assertProgress( executionId, CompanyGroup.class, INSTANCE_PER_ENTITY_TYPE );
assertProgress( execution, CompanyGroup.class, INSTANCE_PER_ENTITY_TYPE );

groupsContainingGoogle = JobTestUtil.findIndexedResults( emf, CompanyGroup.class, "companies.name", "Google" );
groupsContainingRedHat = JobTestUtil.findIndexedResults( emf, CompanyGroup.class, "companies.name", "Red Hat" );
Expand All @@ -285,16 +259,13 @@ public void purge() throws InterruptedException, IOException {
* Request a mass indexing with a filter matching nothing,
* which should effectively amount to a simple purge.
*/
long executionId = jobOperator.start(
MassIndexingJob.NAME,
JobTestUtil.startJobAndWaitForSuccessNoRetry(
MassIndexingJob.parameters()
.forEntity( Company.class )
.purgeAllOnStart( true )
.reindexOnly( "name like :name", Map.of( "name", "NEVER_MATCH" ) )
.build()
);
JobExecution jobExecution = jobOperator.getJobExecution( executionId );
JobTestUtil.waitForTermination( jobOperator, jobExecution, JOB_TIMEOUT_MS );

assertEquals( 0, JobTestUtil.nbDocumentsInIndex( emf, Company.class ) );
}
Expand All @@ -311,16 +282,13 @@ public void noPurge() throws InterruptedException, IOException {
* Request a mass indexing with a filter matching nothing, and requesting no purge at all,
* which should effectively amount to a no-op.
*/
long executionId = jobOperator.start(
MassIndexingJob.NAME,
JobTestUtil.startJobAndWaitForSuccessNoRetry(
MassIndexingJob.parameters()
.forEntity( Company.class )
.purgeAllOnStart( false )
.reindexOnly( "name like :name", Map.of( "name", "NEVER_MATCH" ) )
.build()
);
JobExecution jobExecution = jobOperator.getJobExecution( executionId );
JobTestUtil.waitForTermination( jobOperator, jobExecution, JOB_TIMEOUT_MS );

assertEquals( expectedCount, JobTestUtil.nbDocumentsInIndex( emf, Company.class ) );
}
Expand All @@ -335,16 +303,13 @@ public void reindexOnly()
assertEquals( 0, JobTestUtil.findIndexedResults( emf, Company.class, "name", "Red Hat" ).size() );
assertEquals( 0, JobTestUtil.findIndexedResults( emf, Company.class, "name", "Microsoft" ).size() );

long executionId = jobOperator.start(
MassIndexingJob.NAME,
JobTestUtil.startJobAndWaitForSuccessNoRetry(
MassIndexingJob.parameters()
.forEntity( Company.class )
.checkpointInterval( CHECKPOINT_INTERVAL )
.reindexOnly( "name like 'Google%' or name like 'Red Hat%'", Map.of() )
.build()
);
JobExecution jobExecution = jobOperator.getJobExecution( executionId );
JobTestUtil.waitForTermination( jobOperator, jobExecution, JOB_TIMEOUT_MS );

assertEquals( INSTANCES_PER_DATA_TEMPLATE,
JobTestUtil.findIndexedResults( emf, Company.class, "name", "Google" ).size() );
Expand All @@ -363,17 +328,14 @@ public void reindexOnly_maxResults()

int maxResults = CHECKPOINT_INTERVAL + 1;

long executionId = jobOperator.start(
MassIndexingJob.NAME,
JobTestUtil.startJobAndWaitForSuccessNoRetry(
MassIndexingJob.parameters()
.forEntity( Company.class )
.checkpointInterval( CHECKPOINT_INTERVAL )
.reindexOnly( "name like 'Google%' or name like 'Red Hat%'", Map.of() )
.maxResultsPerEntity( maxResults )
.build()
);
JobExecution jobExecution = jobOperator.getJobExecution( executionId );
JobTestUtil.waitForTermination( jobOperator, jobExecution, JOB_TIMEOUT_MS );

assertEquals( maxResults, JobTestUtil.nbDocumentsInIndex( emf, Company.class ) );
}
Expand All @@ -389,22 +351,18 @@ public void partitioned()
assertEquals( 0, people.size() );
assertEquals( 0, whos.size() );

long executionId = jobOperator.start(
MassIndexingJob.NAME,
JobExecution execution = JobTestUtil.startJobAndWaitForSuccessNoRetry(
MassIndexingJob.parameters()
.forEntities( Company.class, Person.class, WhoAmI.class )
.checkpointInterval( CHECKPOINT_INTERVAL )
.rowsPerPartition( INSTANCE_PER_ENTITY_TYPE - 1 )
.build()
);
JobExecution jobExecution = jobOperator.getJobExecution( executionId );
JobTestUtil.waitForTermination( jobOperator, jobExecution, JOB_TIMEOUT_MS );
assertCompletion( executionId );
assertProgress( executionId, Person.class, INSTANCE_PER_ENTITY_TYPE );
assertProgress( executionId, Company.class, INSTANCE_PER_ENTITY_TYPE );
assertProgress( executionId, WhoAmI.class, INSTANCE_PER_ENTITY_TYPE );

StepProgress progress = getMainStepProgress( executionId );
assertProgress( execution, Person.class, INSTANCE_PER_ENTITY_TYPE );
assertProgress( execution, Company.class, INSTANCE_PER_ENTITY_TYPE );
assertProgress( execution, WhoAmI.class, INSTANCE_PER_ENTITY_TYPE );

StepProgress progress = getMainStepProgress( execution );
Map<Integer, Long> partitionProgress = progress.getPartitionProgress();
assertThat( partitionProgress )
.as( "Entities processed per partition" )
Expand All @@ -429,29 +387,21 @@ public void partitioned()
assertEquals( INSTANCES_PER_DATA_TEMPLATE, whos.size() );
}

private void assertCompletion(long executionId) {
List<StepExecution> stepExecutions = jobOperator.getStepExecutions( executionId );
for ( StepExecution stepExecution : stepExecutions ) {
BatchStatus batchStatus = stepExecution.getBatchStatus();
log.infof( "step %s executed.", stepExecution.getStepName() );
assertEquals( BatchStatus.COMPLETED, batchStatus );
}
}

private void assertProgress(long executionId, Class<?> entityType, int progressValue) {
private void assertProgress(JobExecution execution, Class<?> entityType, int progressValue) {
/*
* We cannot check the metrics, which in JBatch are set to 0
* for partitioned steps (the metrics are handled separately for
* each partition).
* Thus we check our own object.
*/
StepProgress progress = getMainStepProgress( executionId );
StepProgress progress = getMainStepProgress( execution );
assertEquals( Long.valueOf( progressValue ),
progress.getEntityProgress().get( emf.getMetamodel().entity( entityType ).getName() ) );
}

private StepProgress getMainStepProgress(long executionId) {
List<StepExecution> stepExecutions = jobOperator.getStepExecutions( executionId );
private StepProgress getMainStepProgress(JobExecution execution) {
List<StepExecution> stepExecutions = JobTestUtil.getOperator()
.getStepExecutions( execution.getExecutionId() );
for ( StepExecution stepExecution : stepExecutions ) {
switch ( stepExecution.getStepName() ) {
case MAIN_STEP_NAME:
Expand Down
Loading

0 comments on commit da756ee

Please sign in to comment.