Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

6716 fixing error msg update code #6764

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import jakarta.annotation.Nullable;
import jakarta.persistence.EntityManager;
import jakarta.persistence.LockModeType;
import jakarta.persistence.Query;
import org.apache.commons.collections4.ListUtils;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
Expand Down Expand Up @@ -381,15 +380,10 @@ public WorkChunkStatusEnum onWorkChunkError(WorkChunkErrorEvent theParameters) {
return myTransactionService.withSystemRequestOnDefaultPartition().execute(() -> {
int changeCount = myWorkChunkRepository.updateChunkStatusAndIncrementErrorCountForEndError(
chunkId, new Date(), errorMessage, WorkChunkStatusEnum.ERRORED);
Validate.isTrue(changeCount > 0, "changed chunk matching %s", chunkId);

Query query = myEntityManager.createQuery("update Batch2WorkChunkEntity " + "set myStatus = :failed "
+ ",myErrorMessage = CONCAT('Too many errors: ', CAST(myErrorCount as string), '. Last error msg was ', myErrorMessage) "
+ "where myId = :chunkId and myErrorCount > :maxCount");
query.setParameter("chunkId", chunkId);
query.setParameter("failed", WorkChunkStatusEnum.FAILED);
query.setParameter("maxCount", MAX_CHUNK_ERROR_COUNT);
int failChangeCount = query.executeUpdate();
Validate.isTrue(changeCount > 0, "No changed chunk matching %s", chunkId);

int failChangeCount = myWorkChunkRepository.updateChunkForTooManyErrors(
WorkChunkStatusEnum.FAILED, chunkId, MAX_CHUNK_ERROR_COUNT, ERROR_MSG_MAX_LENGTH);

if (failChangeCount > 0) {
return WorkChunkStatusEnum.FAILED;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,26 @@ int updateChunkStatusAndIncrementErrorCountForEndError(
@Param("em") String theErrorMessage,
@Param("status") WorkChunkStatusEnum theInProgress);

/**
* Updates the workchunk error count and error message for WorkChunks that have failed after multiple retries.
*
* @param theStatus - the new status of the workchunk
* @param theChunkId - the id of the workchunk to update
* @param theMaxErrorCount - maximum error count (# of errors allowed for retry)
* @param theMaxErrorSize - max error size (maximum number of characters)
* @return - the number of updated chunks (should be 1)
*/
@Modifying
@Query("UPDATE Batch2WorkChunkEntity e "
+ "SET e.myStatus = :failed, "
+ "e.myErrorMessage = LEFT(CONCAT('Too many errors (', CAST(e.myErrorCount as string), '). Last err msg ', e.myErrorMessage), :maxErrorSize) "
+ "WHERE e.myId = :chunkId and e.myErrorCount > :maxCount")
int updateChunkForTooManyErrors(
@Param("failed") WorkChunkStatusEnum theStatus,
@Param("chunkId") String theChunkId,
@Param("maxCount") int theMaxErrorCount,
@Param("maxErrorSize") int theMaxErrorSize);

@Modifying
@Query(
"UPDATE Batch2WorkChunkEntity e SET e.myStatus = :status, e.myStartTime = :st WHERE e.myId = :id AND e.myStatus IN :startStatuses")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver;
import ca.uhn.fhir.jpa.subscription.channel.impl.LinkedBlockingChannelFactory;
import ca.uhn.fhir.jpa.subscription.channel.impl.RetryPolicyProvider;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand Down Expand Up @@ -39,7 +40,7 @@ public class SubscriptionChannelFactoryTest {
@BeforeEach
public void before() {
when(myChannelNamer.getChannelName(any(), any())).thenReturn("CHANNEL_NAME");
mySvc = new SubscriptionChannelFactory(new LinkedBlockingChannelFactory(myChannelNamer));
mySvc = new SubscriptionChannelFactory(new LinkedBlockingChannelFactory(myChannelNamer, new RetryPolicyProvider()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import ca.uhn.fhir.jpa.searchparam.registry.SearchParamRegistryImpl;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory;
import ca.uhn.fhir.jpa.subscription.channel.impl.LinkedBlockingChannelFactory;
import ca.uhn.fhir.jpa.subscription.channel.impl.RetryPolicyProvider;
import ca.uhn.fhir.jpa.subscription.channel.subscription.IChannelNamer;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory;
import ca.uhn.fhir.jpa.subscription.match.config.SubscriptionProcessorConfig;
Expand Down Expand Up @@ -73,6 +74,9 @@ public void initSearchParamRegistry(IBaseResource theReadResource) {
@Configuration
public static class MyConfig {

// would normally be a bean; but this is a test
private RetryPolicyProvider myRetryPolicyProvider = new RetryPolicyProvider();

@Bean
public JpaStorageSettings jpaStorageSettings() {
return new JpaStorageSettings();
Expand All @@ -85,12 +89,12 @@ public SubscriptionSettings subscriptionSettings() {

@Bean
public IChannelFactory channelFactory(IChannelNamer theNamer) {
return new LinkedBlockingChannelFactory(theNamer);
return new LinkedBlockingChannelFactory(theNamer, myRetryPolicyProvider);
}

@Bean
public SubscriptionChannelFactory mySubscriptionChannelFactory(IChannelNamer theChannelNamer) {
return new SubscriptionChannelFactory(new LinkedBlockingChannelFactory(theChannelNamer));
return new SubscriptionChannelFactory(new LinkedBlockingChannelFactory(theChannelNamer, myRetryPolicyProvider));
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import ca.uhn.fhir.jpa.searchparam.config.SearchParamConfig;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory;
import ca.uhn.fhir.jpa.subscription.channel.impl.LinkedBlockingChannelFactory;
import ca.uhn.fhir.jpa.subscription.channel.impl.RetryPolicyProvider;
import ca.uhn.fhir.jpa.subscription.channel.subscription.IChannelNamer;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -43,6 +44,8 @@ public class SubscriptionTestConfig {
private FhirContext myFhirContext;
@Autowired
private IChannelNamer myChannelNamer;
@Autowired
private RetryPolicyProvider myRetryPolicyProvider;

@Primary
@Bean(name = "myJpaValidationSupportChain")
Expand All @@ -52,7 +55,7 @@ public IValidationSupport validationSupportChainR4() {

@Bean
public IChannelFactory subscribableChannelFactory() {
return new LinkedBlockingChannelFactory(myChannelNamer);
return new LinkedBlockingChannelFactory(myChannelNamer, myRetryPolicyProvider);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package ca.uhn.fhir.jpa.batch2;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import ca.uhn.fhir.batch2.api.ChunkExecutionDetails;
import ca.uhn.fhir.batch2.api.IJobCompletionHandler;
import ca.uhn.fhir.batch2.api.IJobCoordinator;
Expand All @@ -25,15 +21,19 @@
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
import ca.uhn.fhir.batch2.model.JobWorkNotificationJsonMessage;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest;
import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse;
import ca.uhn.fhir.jpa.subscription.channel.api.ChannelConsumerSettings;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory;
import ca.uhn.fhir.jpa.subscription.channel.impl.LinkedBlockingChannel;
import ca.uhn.fhir.jpa.subscription.channel.impl.RetryPolicyProvider;
import ca.uhn.fhir.jpa.test.BaseJpaR4Test;
import ca.uhn.fhir.jpa.test.Batch2JobHelper;
import ca.uhn.fhir.jpa.test.config.Batch2FastSchedulerConfig;
import ca.uhn.fhir.jpa.test.config.TestR4Config;
import ca.uhn.fhir.jpa.util.RandomTextUtils;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.test.utilities.UnregisterScheduledProcessor;
Expand All @@ -51,9 +51,16 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Sort;
import org.springframework.messaging.MessageHandler;
import org.springframework.retry.RetryPolicy;
import org.springframework.retry.backoff.BackOffPolicy;
import org.springframework.retry.backoff.NoBackOffPolicy;
import org.springframework.retry.policy.MaxAttemptsRetryPolicy;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.TestPropertySource;
import org.testcontainers.shaded.org.awaitility.Awaitility;
Expand All @@ -73,19 +80,71 @@

import static ca.uhn.fhir.batch2.config.BaseBatch2Config.CHANNEL_NAME;
import static ca.uhn.fhir.batch2.coordinator.WorkChunkProcessor.MAX_CHUNK_ERROR_COUNT;
import static ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity.ERROR_MSG_MAX_LENGTH;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.fail;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;


@ContextConfiguration(classes = {
Batch2FastSchedulerConfig.class
Batch2FastSchedulerConfig.class,
Batch2CoordinatorIT.RPConfig.class
})
@TestPropertySource(properties = {
// These tests require scheduling to work
UnregisterScheduledProcessor.SCHEDULING_DISABLED_EQUALS_FALSE
})
public class Batch2CoordinatorIT extends BaseJpaR4Test {

/***
* Our internal configuration of Retry Mechanism is
* with exponential backoff, and infinite retries.
*
* This isn't ideal for tests; so we will override
* the retry mechanism for tests that require it to
* make them run faster and more 'predictably'
*/
public static class RetryProviderOverride extends RetryPolicyProvider {

private RetryPolicy myRetryPolicy;

private BackOffPolicy myBackOffPolicy;

public void setPolicies(RetryPolicy theRetryPolicy, BackOffPolicy theBackOffPolicy) {
myRetryPolicy = theRetryPolicy;
myBackOffPolicy = theBackOffPolicy;
}

@Override
protected RetryPolicy retryPolicy() {
if (myRetryPolicy != null) {
return myRetryPolicy;
}
return super.retryPolicy();
}

@Override
protected BackOffPolicy backOffPolicy() {
if (myBackOffPolicy != null) {
return myBackOffPolicy;
}
return super.backOffPolicy();
}
}

@Configuration
public static class RPConfig {
@Primary
@Bean
public RetryPolicyProvider retryPolicyProvider() {
return new RetryProviderOverride();
}
}

private static final Logger ourLog = LoggerFactory.getLogger(Batch2CoordinatorIT.class);

public static final int TEST_JOB_VERSION = 1;
Expand All @@ -106,6 +165,9 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
@Autowired
IJobPersistence myJobPersistence;

@Autowired
private RetryPolicyProvider myRetryPolicyProvider;

@RegisterExtension
LogbackTestExtension myLogbackTestExtension = new LogbackTestExtension();

Expand All @@ -132,6 +194,11 @@ public void before() throws Exception {
};
myWorkChannel = (LinkedBlockingChannel) myChannelFactory.getOrCreateReceiver(CHANNEL_NAME, JobWorkNotificationJsonMessage.class, new ChannelConsumerSettings());
myStorageSettings.setJobFastTrackingEnabled(true);

// reset
if (myRetryPolicyProvider instanceof RetryProviderOverride rp) {
rp.setPolicies(null, null);
}
}

@AfterEach
Expand Down Expand Up @@ -589,6 +656,84 @@ private void complete(
assertEquals(1.0, jobInstance.getProgress());
}

@Test
public void failingWorkChunks_withLargeErrorMsgs_shouldNotErrorOutTheJob() {
// setup
assertTrue(myRetryPolicyProvider instanceof RetryProviderOverride);

String jobId = getMethodNameForJobId();
AtomicInteger counter = new AtomicInteger();

// we want an error message larger than can be contained in the db
String errorMsg = RandomTextUtils.newSecureRandomAlphaNumericString(ERROR_MSG_MAX_LENGTH + 100);

// we want 1 more error than the allowed maximum
// otherwise we won't be updating the error chunk to have
// "Too many errors" error
int errorCount = MAX_CHUNK_ERROR_COUNT + 1;

MaxAttemptsRetryPolicy retryPolicy = new MaxAttemptsRetryPolicy();
retryPolicy.setMaxAttempts(errorCount);
RetryProviderOverride overrideRetryProvider = (RetryProviderOverride) myRetryPolicyProvider;
overrideRetryProvider.setPolicies(retryPolicy, new NoBackOffPolicy());

// create a job that fails and throws a large error
IJobStepWorker<TestJobParameters, VoidModel, FirstStepOutput> first = (step, sink) -> {
counter.getAndIncrement();
throw new RuntimeException(errorMsg);
};
IJobStepWorker<TestJobParameters, FirstStepOutput, VoidModel> last = (step, sink) -> {
// we don't care; we'll never get here
return RunOutcome.SUCCESS;
};

JobDefinition<? extends IModelJson> jd = JobDefinition.newBuilder()
.setJobDefinitionId(jobId)
.setJobDescription("test job")
.setJobDefinitionVersion(TEST_JOB_VERSION)
.setParametersType(TestJobParameters.class)
.gatedExecution()
.addFirstStep(
FIRST_STEP_ID,
"First step",
FirstStepOutput.class,
first
)
.addLastStep(
LAST_STEP_ID,
"Final step",
last
)
.completionHandler(myCompletionHandler)
.build();
myJobDefinitionRegistry.addJobDefinition(jd);

// test
JobInstanceStartRequest request = buildRequest(jobId);
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), request);
String instanceId = startResponse.getInstanceId();

// waiting for the multitude of failures
await().until(() -> {
myJobMaintenanceService.runMaintenancePass();
JobInstance instance = myJobCoordinator.getInstance(instanceId);
ourLog.info("Attempt " + counter.get() + " for "
+ instance.getInstanceId() + ". Status: " + instance.getStatus());
return counter.get() > errorCount - 1;
});

// verify
Iterator<WorkChunk> iterator = myJobPersistence.fetchAllWorkChunksIterator(instanceId, true);
List<WorkChunk> listOfChunks = new ArrayList<>();
iterator.forEachRemaining(listOfChunks::add);
assertEquals(1, listOfChunks.size());
WorkChunk workChunk = listOfChunks.get(0);
assertEquals(WorkChunkStatusEnum.FAILED, workChunk.getStatus());
// should contain some of the original error msg, but not all
assertTrue(workChunk.getErrorMessage().contains(errorMsg.substring(0, 100)));
assertTrue(workChunk.getErrorMessage().startsWith("Too many errors"));
}

@Test
public void testJobWithLongPollingStep() throws InterruptedException {
// create job definition
Expand Down Expand Up @@ -745,7 +890,7 @@ public void testUnknownException_KeepsInProgress_CanCancelManually() throws Inte
callLatch(myFirstStepLatch, step);
throw new RuntimeException("Expected Test Exception");
};
IJobStepWorker<TestJobParameters, FirstStepOutput, VoidModel> lastStep = (step, sink) -> fail();
IJobStepWorker<TestJobParameters, FirstStepOutput, VoidModel> lastStep = (step, sink) -> fail();

String jobDefId = getMethodNameForJobId();
JobDefinition<? extends IModelJson> definition = buildGatedJobDefinition(jobDefId, firstStep, lastStep);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.test.BaseJpaR4Test;
import ca.uhn.fhir.jpa.test.Batch2JobHelper;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.model.DiagnosticReport;
Expand Down Expand Up @@ -79,7 +80,7 @@ public void testDeleteExpunge() {
startRequest.setJobDefinitionId(DeleteExpungeAppCtx.JOB_DELETE_EXPUNGE);

// execute
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(startRequest);
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), startRequest);
myBatch2JobHelper.awaitJobCompletion(startResponse);

// validate
Expand Down
Loading
Loading