Skip to content

Commit

Permalink
add functionality to retry an InvalidArgumentException (awslabs#1270)
Browse files Browse the repository at this point in the history
* add functionality to retry an InvalidArgumentException with a new iterator

* include DEFAULT_MAX_RECORDS value in IllegalArgumentException messages
  • Loading branch information
vincentvilo-aws authored Mar 8, 2024
1 parent 63e0fe7 commit 1280325
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -977,6 +977,10 @@ public KinesisClientLibConfiguration withShardSyncIntervalMillis(long shardSyncI
*/
public KinesisClientLibConfiguration withMaxRecords(int maxRecords) {
checkIsValuePositive("MaxRecords", (long) maxRecords);
if (maxRecords > DEFAULT_MAX_RECORDS) {
throw new IllegalArgumentException(
"maxRecords must be less than or equal to " + DEFAULT_MAX_RECORDS + " but current value is " + maxRecords);
}
this.maxRecords = maxRecords;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public class PollingConfig implements RetrievalSpecificConfig {

public static final Duration DEFAULT_REQUEST_TIMEOUT = Duration.ofSeconds(30);

public static final int DEFAULT_MAX_RECORDS = 10000;

/**
* Configurable functional interface to override the existing DataFetcher.
*/
Expand Down Expand Up @@ -73,7 +75,7 @@ public PollingConfig(KinesisAsyncClient kinesisClient) {
* Default value: 10000
* </p>
*/
private int maxRecords = 10000;
private int maxRecords = DEFAULT_MAX_RECORDS;

/**
* @param streamName Name of Kinesis stream.
Expand Down Expand Up @@ -144,6 +146,14 @@ public PollingConfig idleTimeBetweenReadsInMillis(long idleTimeBetweenReadsInMil
return this;
}

public void maxRecords(int maxRecords) {
if (maxRecords > DEFAULT_MAX_RECORDS) {
throw new IllegalArgumentException(
"maxRecords must be less than or equal to " + DEFAULT_MAX_RECORDS + " but current value is " + maxRecords());
}
this.maxRecords = maxRecords;
}

/**
* The maximum time to wait for a future request from Kinesis to complete
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import software.amazon.awssdk.services.cloudwatch.model.StandardUnit;
import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.InvalidArgumentException;
import software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
Expand Down Expand Up @@ -102,7 +103,6 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
private final PublisherSession publisherSession;
private final ReentrantReadWriteLock resetLock = new ReentrantReadWriteLock();
private boolean wasReset = false;

private Instant lastEventDeliveryTime = Instant.EPOCH;
private final RequestDetails lastSuccessfulRequestDetails = new RequestDetails();

Expand Down Expand Up @@ -512,6 +512,9 @@ private void makeRetrievalAttempt() {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.info("{} : Thread was interrupted, indicating shutdown was called on the cache.", streamAndShardId);
} catch (InvalidArgumentException e) {
log.info("{} : records threw InvalidArgumentException - iterator will be refreshed before retrying", streamAndShardId, e);
publisherSession.dataFetcher().restartIterator();
} catch (ExpiredIteratorException e) {
log.info("{} : records threw ExpiredIteratorException - restarting"
+ " after greatest seqNum passed to customer", streamAndShardId, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,9 @@ public void testInvalidStateMultiWithStreamName() {
config.validateState(true);
}

@Test(expected = IllegalArgumentException.class)
public void testInvalidRecordLimit() {
config.maxRecords(PollingConfig.DEFAULT_MAX_RECORDS + 1);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import software.amazon.awssdk.services.kinesis.model.ChildShard;
import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.InvalidArgumentException;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.leases.ShardObjectHelper;
Expand Down Expand Up @@ -451,6 +452,18 @@ public void testRetryableRetrievalExceptionContinues() {
assertEquals(records.processRecordsInput().millisBehindLatest(), response.millisBehindLatest());
}

@Test
public void testInvalidArgumentExceptionIsRetried() {
when(getRecordsRetrievalStrategy.getRecords(MAX_RECORDS_PER_CALL))
.thenThrow(InvalidArgumentException.builder().build())
.thenReturn(getRecordsResponse);

getRecordsCache.start(sequenceNumber, initialPosition);
blockUntilConditionSatisfied(() -> getRecordsCache.getPublisherSession().prefetchRecordsQueue().size() == MAX_SIZE, 300);

verify(dataFetcher, times(1)).restartIterator();
}

@Test(timeout = 10000L)
public void testNoDeadlockOnFullQueue() {
//
Expand Down

0 comments on commit 1280325

Please sign in to comment.