diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java index b057f087412e..51bd2caa45c3 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java @@ -56,7 +56,6 @@ import jakarta.annotation.Nullable; import jakarta.persistence.EntityManager; import jakarta.persistence.LockModeType; -import jakarta.persistence.Query; import org.apache.commons.collections4.ListUtils; import org.apache.commons.lang3.Validate; import org.slf4j.Logger; @@ -381,15 +380,10 @@ public WorkChunkStatusEnum onWorkChunkError(WorkChunkErrorEvent theParameters) { return myTransactionService.withSystemRequestOnDefaultPartition().execute(() -> { int changeCount = myWorkChunkRepository.updateChunkStatusAndIncrementErrorCountForEndError( chunkId, new Date(), errorMessage, WorkChunkStatusEnum.ERRORED); - Validate.isTrue(changeCount > 0, "changed chunk matching %s", chunkId); - - Query query = myEntityManager.createQuery("update Batch2WorkChunkEntity " + "set myStatus = :failed " - + ",myErrorMessage = CONCAT('Too many errors: ', CAST(myErrorCount as string), '. Last error msg was ', myErrorMessage) " - + "where myId = :chunkId and myErrorCount > :maxCount"); - query.setParameter("chunkId", chunkId); - query.setParameter("failed", WorkChunkStatusEnum.FAILED); - query.setParameter("maxCount", MAX_CHUNK_ERROR_COUNT); - int failChangeCount = query.executeUpdate(); + Validate.isTrue(changeCount > 0, "No changed chunk matching %s", chunkId); + + int failChangeCount = myWorkChunkRepository.updateChunkForTooManyErrors( + WorkChunkStatusEnum.FAILED, chunkId, MAX_CHUNK_ERROR_COUNT, ERROR_MSG_MAX_LENGTH); if (failChangeCount > 0) { return WorkChunkStatusEnum.FAILED; diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IBatch2WorkChunkRepository.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IBatch2WorkChunkRepository.java index c75f82139b69..fedb57c02484 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IBatch2WorkChunkRepository.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IBatch2WorkChunkRepository.java @@ -113,6 +113,26 @@ int updateChunkStatusAndIncrementErrorCountForEndError( @Param("em") String theErrorMessage, @Param("status") WorkChunkStatusEnum theInProgress); + /** + * Updates the workchunk error count and error message for WorkChunks that have failed after multiple retries. + * + * @param theStatus - the new status of the workchunk + * @param theChunkId - the id of the workchunk to update + * @param theMaxErrorCount - maximum error count (# of errors allowed for retry) + * @param theMaxErrorSize - max error size (maximum number of characters) + * @return - the number of updated chunks (should be 1) + */ + @Modifying + @Query("UPDATE Batch2WorkChunkEntity e " + + "SET e.myStatus = :failed, " + + "e.myErrorMessage = LEFT(CONCAT('Too many errors (', CAST(e.myErrorCount as string), '). Last err msg ', e.myErrorMessage), :maxErrorSize) " + + "WHERE e.myId = :chunkId and e.myErrorCount > :maxCount") + int updateChunkForTooManyErrors( + @Param("failed") WorkChunkStatusEnum theStatus, + @Param("chunkId") String theChunkId, + @Param("maxCount") int theMaxErrorCount, + @Param("maxErrorSize") int theMaxErrorSize); + @Modifying @Query( "UPDATE Batch2WorkChunkEntity e SET e.myStatus = :status, e.myStartTime = :st WHERE e.myId = :id AND e.myStatus IN :startStatuses") diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelFactoryTest.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelFactoryTest.java index 33d5a8d4bd46..053f392ed02a 100644 --- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelFactoryTest.java +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelFactoryTest.java @@ -2,6 +2,7 @@ import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver; import ca.uhn.fhir.jpa.subscription.channel.impl.LinkedBlockingChannelFactory; +import ca.uhn.fhir.jpa.subscription.channel.impl.RetryPolicyProvider; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -39,7 +40,7 @@ public class SubscriptionChannelFactoryTest { @BeforeEach public void before() { when(myChannelNamer.getChannelName(any(), any())).thenReturn("CHANNEL_NAME"); - mySvc = new SubscriptionChannelFactory(new LinkedBlockingChannelFactory(myChannelNamer)); + mySvc = new SubscriptionChannelFactory(new LinkedBlockingChannelFactory(myChannelNamer, new RetryPolicyProvider())); } /** diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/BaseSubscriptionTest.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/BaseSubscriptionTest.java index 5629a8e29929..ed4faf9853a0 100644 --- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/BaseSubscriptionTest.java +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/BaseSubscriptionTest.java @@ -8,6 +8,7 @@ import ca.uhn.fhir.jpa.searchparam.registry.SearchParamRegistryImpl; import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory; import ca.uhn.fhir.jpa.subscription.channel.impl.LinkedBlockingChannelFactory; +import ca.uhn.fhir.jpa.subscription.channel.impl.RetryPolicyProvider; import ca.uhn.fhir.jpa.subscription.channel.subscription.IChannelNamer; import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory; import ca.uhn.fhir.jpa.subscription.match.config.SubscriptionProcessorConfig; @@ -73,6 +74,9 @@ public void initSearchParamRegistry(IBaseResource theReadResource) { @Configuration public static class MyConfig { + // would normally be a bean; but this is a test + private RetryPolicyProvider myRetryPolicyProvider = new RetryPolicyProvider(); + @Bean public JpaStorageSettings jpaStorageSettings() { return new JpaStorageSettings(); @@ -85,12 +89,12 @@ public SubscriptionSettings subscriptionSettings() { @Bean public IChannelFactory channelFactory(IChannelNamer theNamer) { - return new LinkedBlockingChannelFactory(theNamer); + return new LinkedBlockingChannelFactory(theNamer, myRetryPolicyProvider); } @Bean public SubscriptionChannelFactory mySubscriptionChannelFactory(IChannelNamer theChannelNamer) { - return new SubscriptionChannelFactory(new LinkedBlockingChannelFactory(theChannelNamer)); + return new SubscriptionChannelFactory(new LinkedBlockingChannelFactory(theChannelNamer, myRetryPolicyProvider)); } @Bean diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/SubscriptionTestConfig.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/SubscriptionTestConfig.java index 5a565162f8fa..2fdafa190d9c 100644 --- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/SubscriptionTestConfig.java +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/SubscriptionTestConfig.java @@ -25,6 +25,7 @@ import ca.uhn.fhir.jpa.searchparam.config.SearchParamConfig; import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory; import ca.uhn.fhir.jpa.subscription.channel.impl.LinkedBlockingChannelFactory; +import ca.uhn.fhir.jpa.subscription.channel.impl.RetryPolicyProvider; import ca.uhn.fhir.jpa.subscription.channel.subscription.IChannelNamer; import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -43,6 +44,8 @@ public class SubscriptionTestConfig { private FhirContext myFhirContext; @Autowired private IChannelNamer myChannelNamer; + @Autowired + private RetryPolicyProvider myRetryPolicyProvider; @Primary @Bean(name = "myJpaValidationSupportChain") @@ -52,7 +55,7 @@ public IValidationSupport validationSupportChainR4() { @Bean public IChannelFactory subscribableChannelFactory() { - return new LinkedBlockingChannelFactory(myChannelNamer); + return new LinkedBlockingChannelFactory(myChannelNamer, myRetryPolicyProvider); } @Bean diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/Batch2CoordinatorIT.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/Batch2CoordinatorIT.java index c2ccbf7e5a0a..7e80d1e491eb 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/Batch2CoordinatorIT.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/Batch2CoordinatorIT.java @@ -1,9 +1,5 @@ package ca.uhn.fhir.jpa.batch2; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.assertNotNull; import ca.uhn.fhir.batch2.api.ChunkExecutionDetails; import ca.uhn.fhir.batch2.api.IJobCompletionHandler; import ca.uhn.fhir.batch2.api.IJobCoordinator; @@ -25,15 +21,19 @@ import ca.uhn.fhir.batch2.model.JobInstanceStartRequest; import ca.uhn.fhir.batch2.model.JobWorkNotificationJsonMessage; import ca.uhn.fhir.batch2.model.StatusEnum; +import ca.uhn.fhir.batch2.model.WorkChunk; +import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum; import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest; import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse; import ca.uhn.fhir.jpa.subscription.channel.api.ChannelConsumerSettings; import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory; import ca.uhn.fhir.jpa.subscription.channel.impl.LinkedBlockingChannel; +import ca.uhn.fhir.jpa.subscription.channel.impl.RetryPolicyProvider; import ca.uhn.fhir.jpa.test.BaseJpaR4Test; import ca.uhn.fhir.jpa.test.Batch2JobHelper; import ca.uhn.fhir.jpa.test.config.Batch2FastSchedulerConfig; import ca.uhn.fhir.jpa.test.config.TestR4Config; +import ca.uhn.fhir.jpa.util.RandomTextUtils; import ca.uhn.fhir.model.api.IModelJson; import ca.uhn.fhir.rest.api.server.SystemRequestDetails; import ca.uhn.fhir.test.utilities.UnregisterScheduledProcessor; @@ -51,9 +51,16 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; import org.springframework.data.domain.Page; import org.springframework.data.domain.Sort; import org.springframework.messaging.MessageHandler; +import org.springframework.retry.RetryPolicy; +import org.springframework.retry.backoff.BackOffPolicy; +import org.springframework.retry.backoff.NoBackOffPolicy; +import org.springframework.retry.policy.MaxAttemptsRetryPolicy; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.TestPropertySource; import org.testcontainers.shaded.org.awaitility.Awaitility; @@ -73,19 +80,71 @@ import static ca.uhn.fhir.batch2.config.BaseBatch2Config.CHANNEL_NAME; import static ca.uhn.fhir.batch2.coordinator.WorkChunkProcessor.MAX_CHUNK_ERROR_COUNT; +import static ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity.ERROR_MSG_MAX_LENGTH; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.fail; +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @ContextConfiguration(classes = { - Batch2FastSchedulerConfig.class + Batch2FastSchedulerConfig.class, + Batch2CoordinatorIT.RPConfig.class }) @TestPropertySource(properties = { // These tests require scheduling to work UnregisterScheduledProcessor.SCHEDULING_DISABLED_EQUALS_FALSE }) public class Batch2CoordinatorIT extends BaseJpaR4Test { + + /*** + * Our internal configuration of Retry Mechanism is + * with exponential backoff, and infinite retries. + * + * This isn't ideal for tests; so we will override + * the retry mechanism for tests that require it to + * make them run faster and more 'predictably' + */ + public static class RetryProviderOverride extends RetryPolicyProvider { + + private RetryPolicy myRetryPolicy; + + private BackOffPolicy myBackOffPolicy; + + public void setPolicies(RetryPolicy theRetryPolicy, BackOffPolicy theBackOffPolicy) { + myRetryPolicy = theRetryPolicy; + myBackOffPolicy = theBackOffPolicy; + } + + @Override + protected RetryPolicy retryPolicy() { + if (myRetryPolicy != null) { + return myRetryPolicy; + } + return super.retryPolicy(); + } + + @Override + protected BackOffPolicy backOffPolicy() { + if (myBackOffPolicy != null) { + return myBackOffPolicy; + } + return super.backOffPolicy(); + } + } + + @Configuration + public static class RPConfig { + @Primary + @Bean + public RetryPolicyProvider retryPolicyProvider() { + return new RetryProviderOverride(); + } + } + private static final Logger ourLog = LoggerFactory.getLogger(Batch2CoordinatorIT.class); public static final int TEST_JOB_VERSION = 1; @@ -106,6 +165,9 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test { @Autowired IJobPersistence myJobPersistence; + @Autowired + private RetryPolicyProvider myRetryPolicyProvider; + @RegisterExtension LogbackTestExtension myLogbackTestExtension = new LogbackTestExtension(); @@ -132,6 +194,11 @@ public void before() throws Exception { }; myWorkChannel = (LinkedBlockingChannel) myChannelFactory.getOrCreateReceiver(CHANNEL_NAME, JobWorkNotificationJsonMessage.class, new ChannelConsumerSettings()); myStorageSettings.setJobFastTrackingEnabled(true); + + // reset + if (myRetryPolicyProvider instanceof RetryProviderOverride rp) { + rp.setPolicies(null, null); + } } @AfterEach @@ -589,6 +656,84 @@ private void complete( assertEquals(1.0, jobInstance.getProgress()); } + @Test + public void failingWorkChunks_withLargeErrorMsgs_shouldNotErrorOutTheJob() { + // setup + assertTrue(myRetryPolicyProvider instanceof RetryProviderOverride); + + String jobId = getMethodNameForJobId(); + AtomicInteger counter = new AtomicInteger(); + + // we want an error message larger than can be contained in the db + String errorMsg = RandomTextUtils.newSecureRandomAlphaNumericString(ERROR_MSG_MAX_LENGTH + 100); + + // we want 1 more error than the allowed maximum + // otherwise we won't be updating the error chunk to have + // "Too many errors" error + int errorCount = MAX_CHUNK_ERROR_COUNT + 1; + + MaxAttemptsRetryPolicy retryPolicy = new MaxAttemptsRetryPolicy(); + retryPolicy.setMaxAttempts(errorCount); + RetryProviderOverride overrideRetryProvider = (RetryProviderOverride) myRetryPolicyProvider; + overrideRetryProvider.setPolicies(retryPolicy, new NoBackOffPolicy()); + + // create a job that fails and throws a large error + IJobStepWorker first = (step, sink) -> { + counter.getAndIncrement(); + throw new RuntimeException(errorMsg); + }; + IJobStepWorker last = (step, sink) -> { + // we don't care; we'll never get here + return RunOutcome.SUCCESS; + }; + + JobDefinition jd = JobDefinition.newBuilder() + .setJobDefinitionId(jobId) + .setJobDescription("test job") + .setJobDefinitionVersion(TEST_JOB_VERSION) + .setParametersType(TestJobParameters.class) + .gatedExecution() + .addFirstStep( + FIRST_STEP_ID, + "First step", + FirstStepOutput.class, + first + ) + .addLastStep( + LAST_STEP_ID, + "Final step", + last + ) + .completionHandler(myCompletionHandler) + .build(); + myJobDefinitionRegistry.addJobDefinition(jd); + + // test + JobInstanceStartRequest request = buildRequest(jobId); + Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), request); + String instanceId = startResponse.getInstanceId(); + + // waiting for the multitude of failures + await().until(() -> { + myJobMaintenanceService.runMaintenancePass(); + JobInstance instance = myJobCoordinator.getInstance(instanceId); + ourLog.info("Attempt " + counter.get() + " for " + + instance.getInstanceId() + ". Status: " + instance.getStatus()); + return counter.get() > errorCount - 1; + }); + + // verify + Iterator iterator = myJobPersistence.fetchAllWorkChunksIterator(instanceId, true); + List listOfChunks = new ArrayList<>(); + iterator.forEachRemaining(listOfChunks::add); + assertEquals(1, listOfChunks.size()); + WorkChunk workChunk = listOfChunks.get(0); + assertEquals(WorkChunkStatusEnum.FAILED, workChunk.getStatus()); + // should contain some of the original error msg, but not all + assertTrue(workChunk.getErrorMessage().contains(errorMsg.substring(0, 100))); + assertTrue(workChunk.getErrorMessage().startsWith("Too many errors")); + } + @Test public void testJobWithLongPollingStep() throws InterruptedException { // create job definition @@ -745,7 +890,7 @@ public void testUnknownException_KeepsInProgress_CanCancelManually() throws Inte callLatch(myFirstStepLatch, step); throw new RuntimeException("Expected Test Exception"); }; - IJobStepWorker lastStep = (step, sink) -> fail(); + IJobStepWorker lastStep = (step, sink) -> fail(); String jobDefId = getMethodNameForJobId(); JobDefinition definition = buildGatedJobDefinition(jobDefId, firstStep, lastStep); diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/delete/job/DeleteExpungeJobTest.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/delete/job/DeleteExpungeJobTest.java index 7f7ae0736487..34482eb988f2 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/delete/job/DeleteExpungeJobTest.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/delete/job/DeleteExpungeJobTest.java @@ -10,6 +10,7 @@ import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; import ca.uhn.fhir.jpa.test.BaseJpaR4Test; import ca.uhn.fhir.jpa.test.Batch2JobHelper; +import ca.uhn.fhir.rest.api.server.SystemRequestDetails; import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.r4.model.DiagnosticReport; @@ -79,7 +80,7 @@ public void testDeleteExpunge() { startRequest.setJobDefinitionId(DeleteExpungeAppCtx.JOB_DELETE_EXPUNGE); // execute - Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(startRequest); + Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), startRequest); myBatch2JobHelper.awaitJobCompletion(startResponse); // validate diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/channel/config/SubscriptionChannelConfig.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/channel/config/SubscriptionChannelConfig.java index c63b5b1e4f12..f5372247344f 100644 --- a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/channel/config/SubscriptionChannelConfig.java +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/channel/config/SubscriptionChannelConfig.java @@ -21,20 +21,34 @@ import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory; import ca.uhn.fhir.jpa.subscription.channel.impl.LinkedBlockingChannelFactory; +import ca.uhn.fhir.jpa.subscription.channel.impl.RetryPolicyProvider; import ca.uhn.fhir.jpa.subscription.channel.subscription.IChannelNamer; import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class SubscriptionChannelConfig { + /** + * We are autowiring this because we need to override retry policy + * in some tests + */ + @Autowired + private RetryPolicyProvider myRetryPolicyProvider; + /** * Create a @Primary @Bean if you need a different implementation */ @Bean public IChannelFactory queueChannelFactory(IChannelNamer theChannelNamer) { - return new LinkedBlockingChannelFactory(theChannelNamer); + return new LinkedBlockingChannelFactory(theChannelNamer, myRetryPolicyProvider); + } + + @Bean + public RetryPolicyProvider retryPolicyProvider() { + return new RetryPolicyProvider(); } @Bean diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/channel/impl/LinkedBlockingChannel.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/channel/impl/LinkedBlockingChannel.java index 856ed30560c7..89f4ab9c3a55 100644 --- a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/channel/impl/LinkedBlockingChannel.java +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/channel/impl/LinkedBlockingChannel.java @@ -37,10 +37,17 @@ public class LinkedBlockingChannel extends ExecutorSubscribableChannel implement private final String myName; private final Supplier myQueueSizeSupplier; - public LinkedBlockingChannel(String theName, Executor theExecutor, Supplier theQueueSizeSupplier) { + private final RetryPolicyProvider myRetryPolicyProvider; + + public LinkedBlockingChannel( + String theName, + Executor theExecutor, + Supplier theQueueSizeSupplier, + RetryPolicyProvider theRetryPolicyProvider) { super(theExecutor); myName = theName; myQueueSizeSupplier = theQueueSizeSupplier; + myRetryPolicyProvider = theRetryPolicyProvider; } public int getQueueSizeForUnitTest() { @@ -65,7 +72,7 @@ public boolean hasSubscription(@Nonnull MessageHandler handler) { @Override public boolean subscribe(@Nonnull MessageHandler theHandler) { - return super.subscribe(new RetryingMessageHandlerWrapper(theHandler, getName())); + return super.subscribe(new RetryingMessageHandlerWrapper(theHandler, getName(), myRetryPolicyProvider)); } @Override @@ -86,7 +93,7 @@ public void destroy() { /** * Creates a synchronous channel, mostly intended for testing */ - public static LinkedBlockingChannel newSynchronous(String theName) { - return new LinkedBlockingChannel(theName, null, () -> 0); + public static LinkedBlockingChannel newSynchronous(String theName, RetryPolicyProvider theRetryPolicyProvider) { + return new LinkedBlockingChannel(theName, null, () -> 0, theRetryPolicyProvider); } } diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/channel/impl/LinkedBlockingChannelFactory.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/channel/impl/LinkedBlockingChannelFactory.java index d94f70dbf603..e04adf38f2c6 100644 --- a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/channel/impl/LinkedBlockingChannelFactory.java +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/channel/impl/LinkedBlockingChannelFactory.java @@ -41,8 +41,11 @@ public class LinkedBlockingChannelFactory implements IChannelFactory { private final IChannelNamer myChannelNamer; private final Map myChannels = Collections.synchronizedMap(new HashMap<>()); - public LinkedBlockingChannelFactory(IChannelNamer theChannelNamer) { + private RetryPolicyProvider myRetryPolicyProvider; + + public LinkedBlockingChannelFactory(IChannelNamer theChannelNamer, RetryPolicyProvider theRetryPolicyProvider) { myChannelNamer = theChannelNamer; + myRetryPolicyProvider = theRetryPolicyProvider; } @Override @@ -80,7 +83,8 @@ private LinkedBlockingChannel buildLinkedBlockingChannel(int theConcurrentConsum threadNamePrefix, SubscriptionConstants.DELIVERY_EXECUTOR_QUEUE_SIZE); - return new LinkedBlockingChannel(theChannelName, threadPoolExecutor, threadPoolExecutor::getQueueSize); + return new LinkedBlockingChannel( + theChannelName, threadPoolExecutor, threadPoolExecutor::getQueueSize, myRetryPolicyProvider); } @PreDestroy diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/channel/impl/RetryPolicyProvider.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/channel/impl/RetryPolicyProvider.java new file mode 100644 index 000000000000..472b630784a0 --- /dev/null +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/channel/impl/RetryPolicyProvider.java @@ -0,0 +1,33 @@ +package ca.uhn.fhir.jpa.subscription.channel.impl; + +import org.apache.commons.lang3.time.DateUtils; +import org.springframework.retry.RetryPolicy; +import org.springframework.retry.backoff.BackOffPolicy; +import org.springframework.retry.backoff.ExponentialBackOffPolicy; +import org.springframework.retry.policy.TimeoutRetryPolicy; +import org.springframework.retry.support.RetryTemplate; + +public class RetryPolicyProvider { + + public RetryTemplate getRetryTemplate() { + RetryTemplate retryTemplate = new RetryTemplate(); + retryTemplate.setBackOffPolicy(backOffPolicy()); + retryTemplate.setRetryPolicy(retryPolicy()); + retryTemplate.setThrowLastExceptionOnExhausted(true); + + return retryTemplate; + } + + protected RetryPolicy retryPolicy() { + TimeoutRetryPolicy retryPolicy = new TimeoutRetryPolicy(); + retryPolicy.setTimeout(DateUtils.MILLIS_PER_MINUTE); + return retryPolicy; + } + + protected BackOffPolicy backOffPolicy() { + ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy(); + backOffPolicy.setInitialInterval(1000); + backOffPolicy.setMultiplier(1.1d); + return backOffPolicy; + } +} diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/channel/impl/RetryingMessageHandlerWrapper.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/channel/impl/RetryingMessageHandlerWrapper.java index 46fd0c3c85eb..24c191698ec0 100644 --- a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/channel/impl/RetryingMessageHandlerWrapper.java +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/channel/impl/RetryingMessageHandlerWrapper.java @@ -22,7 +22,6 @@ import ca.uhn.fhir.util.BaseUnrecoverableRuntimeException; import jakarta.annotation.Nonnull; import org.apache.commons.lang3.exception.ExceptionUtils; -import org.apache.commons.lang3.time.DateUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.messaging.Message; @@ -41,23 +40,18 @@ class RetryingMessageHandlerWrapper implements MessageHandler { private final MessageHandler myWrap; private final String myChannelName; - RetryingMessageHandlerWrapper(MessageHandler theWrap, String theChannelName) { + private RetryPolicyProvider myRetryPolicyProvider; + + RetryingMessageHandlerWrapper( + MessageHandler theWrap, String theChannelName, RetryPolicyProvider theRetryPolicyProvider) { myWrap = theWrap; myChannelName = theChannelName; + myRetryPolicyProvider = theRetryPolicyProvider; } @Override public void handleMessage(@Nonnull Message theMessage) throws MessagingException { - RetryTemplate retryTemplate = new RetryTemplate(); - final ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy(); - backOffPolicy.setInitialInterval(1000); - backOffPolicy.setMultiplier(1.1d); - retryTemplate.setBackOffPolicy(backOffPolicy); - - final TimeoutRetryPolicy retryPolicy = new TimeoutRetryPolicy(); - retryPolicy.setTimeout(DateUtils.MILLIS_PER_MINUTE); - retryTemplate.setRetryPolicy(retryPolicy); - retryTemplate.setThrowLastExceptionOnExhausted(true); + RetryTemplate retryTemplate = myRetryPolicyProvider.getRetryTemplate(); RetryListener retryListener = new RetryListener() { @Override public void onError( diff --git a/hapi-fhir-storage/src/test/java/ca/uhn/fhir/jpa/subscription/channel/impl/LinkedBlockingChannelFactoryTest.java b/hapi-fhir-storage/src/test/java/ca/uhn/fhir/jpa/subscription/channel/impl/LinkedBlockingChannelFactoryTest.java index 8968125ef42e..6f8050173149 100644 --- a/hapi-fhir-storage/src/test/java/ca/uhn/fhir/jpa/subscription/channel/impl/LinkedBlockingChannelFactoryTest.java +++ b/hapi-fhir-storage/src/test/java/ca/uhn/fhir/jpa/subscription/channel/impl/LinkedBlockingChannelFactoryTest.java @@ -28,7 +28,7 @@ class LinkedBlockingChannelFactoryTest { private static final String TEST_CHANNEL_NAME = "test-channel-name"; private static final String TEST_PAYLOAD = "payload"; - LinkedBlockingChannelFactory myChannelFactory = new LinkedBlockingChannelFactory((name, settings) -> name); + LinkedBlockingChannelFactory myChannelFactory = new LinkedBlockingChannelFactory((name, settings) -> name, new RetryPolicyProvider()); private List myReceivedPayloads; private PointcutLatch[] myHandlerCanProceedLatch = { new PointcutLatch("first delivery"),