Skip to content

Commit

Permalink
Add config to enable PITR (awslabs#1365)
Browse files Browse the repository at this point in the history
* Add config to enable PITR
  • Loading branch information
lucienlu-aws authored Jul 15, 2024
1 parent 5878ba8 commit 1c0c41c
Show file tree
Hide file tree
Showing 7 changed files with 268 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,8 @@ public class KinesisClientLibConfiguration {
private AwsCredentialsProvider cloudWatchCredentialsProvider;
private long failoverTimeMillis;
private boolean enablePriorityLeaseAssignment;
private boolean leaseTableDeletionProtectionEnabled;
private boolean leaseTablePitrEnabled;
private String workerIdentifier;
private long shardSyncIntervalMillis;
private int maxRecords;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@ public void setWorkerId(String workerId) {
@ConfigurationSettable(configurationClass = LeaseManagementConfig.class)
private Boolean enablePriorityLeaseAssignment;

@ConfigurationSettable(configurationClass = LeaseManagementConfig.class)
private Boolean leaseTableDeletionProtectionEnabled;

@ConfigurationSettable(configurationClass = LeaseManagementConfig.class)
private Boolean leaseTablePitrEnabled;

@ConfigurationSettable(configurationClass = LeaseManagementConfig.class)
private long shardSyncIntervalMillis;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

Expand Down Expand Up @@ -99,6 +100,50 @@ public void testSetEnablePriorityLeaseAssignment() {
assertThat(resolvedConfiguration.leaseManagementConfig.enablePriorityLeaseAssignment(), equalTo(false));
}

@Test
public void testSetLeaseTableDeletionProtectionEnabledToTrue() {
MultiLangDaemonConfiguration configuration = baseConfiguration();
configuration.setLeaseTableDeletionProtectionEnabled(true);

MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration =
configuration.resolvedConfiguration(shardRecordProcessorFactory);

assertTrue(resolvedConfiguration.leaseManagementConfig.leaseTableDeletionProtectionEnabled());
}

@Test
public void testSetLeaseTablePitrEnabledToTrue() {
MultiLangDaemonConfiguration configuration = baseConfiguration();
configuration.setLeaseTablePitrEnabled(true);

MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration =
configuration.resolvedConfiguration(shardRecordProcessorFactory);

assertTrue(resolvedConfiguration.leaseManagementConfig.leaseTablePitrEnabled());
}

@Test
public void testSetLeaseTableDeletionProtectionEnabledToFalse() {
MultiLangDaemonConfiguration configuration = baseConfiguration();
configuration.setLeaseTableDeletionProtectionEnabled(false);

MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration =
configuration.resolvedConfiguration(shardRecordProcessorFactory);

assertFalse(resolvedConfiguration.leaseManagementConfig.leaseTableDeletionProtectionEnabled());
}

@Test
public void testSetLeaseTablePitrEnabledToFalse() {
MultiLangDaemonConfiguration configuration = baseConfiguration();
configuration.setLeaseTablePitrEnabled(false);

MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration =
configuration.resolvedConfiguration(shardRecordProcessorFactory);

assertFalse(resolvedConfiguration.leaseManagementConfig.leaseTablePitrEnabled());
}

@Test
public void testDefaultRetrievalConfig() {
MultiLangDaemonConfiguration configuration = baseConfiguration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ public class LeaseManagementConfig {
public static final long DEFAULT_GARBAGE_LEASE_CLEANUP_INTERVAL_MILLIS =
Duration.ofMinutes(30).toMillis();
public static final long DEFAULT_PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 2 * 60 * 1000L;
public static final boolean DEFAULT_LEASE_TABLE_DELETION_PROTECTION_ENABLED = false;
public static final boolean DEFAULT_LEASE_TABLE_PITR_ENABLED = false;
public static final boolean DEFAULT_ENABLE_PRIORITY_LEASE_ASSIGNMENT = true;
public static final int DEFAULT_CONSECUTIVE_HOLES_FOR_TRIGGERING_LEASE_RECOVERY = 3;

Expand Down Expand Up @@ -208,11 +210,20 @@ public class LeaseManagementConfig {
private BillingMode billingMode = BillingMode.PAY_PER_REQUEST;

/**
* Whether to enabled deletion protection on the DynamoDB lease table created by KCL.
* Whether to enable deletion protection on the DynamoDB lease table created by KCL. This does not update
* already existing tables.
*
* <p>Default value: false
*/
private boolean leaseTableDeletionProtectionEnabled = false;
private boolean leaseTableDeletionProtectionEnabled = DEFAULT_LEASE_TABLE_DELETION_PROTECTION_ENABLED;

/**
* Whether to enable PITR (point in time recovery) on the DynamoDB lease table created by KCL. If true, this can
* update existing table's PITR.
*
* <p>Default value: false
*/
private boolean leaseTablePitrEnabled = DEFAULT_LEASE_TABLE_PITR_ENABLED;

/**
* The list of tags to be applied to the DynamoDB table created for lease management.
Expand Down Expand Up @@ -424,6 +435,7 @@ public LeaseManagementFactory leaseManagementFactory(
dynamoDbRequestTimeout(),
billingMode(),
leaseTableDeletionProtectionEnabled(),
leaseTablePitrEnabled(),
tags(),
leaseSerializer,
customShardDetectorProvider(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
private final Duration dynamoDbRequestTimeout;
private final BillingMode billingMode;
private final boolean leaseTableDeletionProtectionEnabled;
private final boolean leaseTablePitrEnabled;
private final Collection<Tag> tags;
private final boolean isMultiStreamMode;
private final LeaseCleanupConfig leaseCleanupConfig;
Expand Down Expand Up @@ -707,7 +708,7 @@ private DynamoDBLeaseManagementFactory(
tableCreatorCallback,
dynamoDbRequestTimeout,
billingMode,
false,
LeaseManagementConfig.DEFAULT_LEASE_TABLE_DELETION_PROTECTION_ENABLED,
DefaultSdkAutoConstructList.getInstance(),
leaseSerializer);
}
Expand Down Expand Up @@ -945,6 +946,76 @@ public DynamoDBLeaseManagementFactory(
* @param isMultiStreamMode
* @param leaseCleanupConfig
*/
@Deprecated
public DynamoDBLeaseManagementFactory(
final KinesisAsyncClient kinesisClient,
final DynamoDbAsyncClient dynamoDBClient,
final String tableName,
final String workerIdentifier,
final ExecutorService executorService,
final long failoverTimeMillis,
final boolean enablePriorityLeaseAssignment,
final long epsilonMillis,
final int maxLeasesForWorker,
final int maxLeasesToStealAtOneTime,
final int maxLeaseRenewalThreads,
final boolean cleanupLeasesUponShardCompletion,
final boolean ignoreUnexpectedChildShards,
final long shardSyncIntervalMillis,
final boolean consistentReads,
final long listShardsBackoffTimeMillis,
final int maxListShardsRetryAttempts,
final int maxCacheMissesBeforeReload,
final long listShardsCacheAllowedAgeInSeconds,
final int cacheMissWarningModulus,
final long initialLeaseTableReadCapacity,
final long initialLeaseTableWriteCapacity,
final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer,
final TableCreatorCallback tableCreatorCallback,
Duration dynamoDbRequestTimeout,
BillingMode billingMode,
final boolean leaseTableDeletionProtectionEnabled,
Collection<Tag> tags,
LeaseSerializer leaseSerializer,
Function<StreamConfig, ShardDetector> customShardDetectorProvider,
boolean isMultiStreamMode,
LeaseCleanupConfig leaseCleanupConfig) {
this(
kinesisClient,
dynamoDBClient,
tableName,
workerIdentifier,
executorService,
failoverTimeMillis,
enablePriorityLeaseAssignment,
epsilonMillis,
maxLeasesForWorker,
maxLeasesToStealAtOneTime,
maxLeaseRenewalThreads,
cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards,
shardSyncIntervalMillis,
consistentReads,
listShardsBackoffTimeMillis,
maxListShardsRetryAttempts,
maxCacheMissesBeforeReload,
listShardsCacheAllowedAgeInSeconds,
cacheMissWarningModulus,
initialLeaseTableReadCapacity,
initialLeaseTableWriteCapacity,
deprecatedHierarchicalShardSyncer,
tableCreatorCallback,
dynamoDbRequestTimeout,
billingMode,
leaseTableDeletionProtectionEnabled,
LeaseManagementConfig.DEFAULT_LEASE_TABLE_PITR_ENABLED,
tags,
leaseSerializer,
customShardDetectorProvider,
isMultiStreamMode,
leaseCleanupConfig);
}

public DynamoDBLeaseManagementFactory(
final KinesisAsyncClient kinesisClient,
final DynamoDbAsyncClient dynamoDBClient,
Expand Down Expand Up @@ -973,6 +1044,7 @@ public DynamoDBLeaseManagementFactory(
Duration dynamoDbRequestTimeout,
BillingMode billingMode,
final boolean leaseTableDeletionProtectionEnabled,
final boolean leaseTablePitrEnabled,
Collection<Tag> tags,
LeaseSerializer leaseSerializer,
Function<StreamConfig, ShardDetector> customShardDetectorProvider,
Expand Down Expand Up @@ -1005,6 +1077,7 @@ public DynamoDBLeaseManagementFactory(
this.dynamoDbRequestTimeout = dynamoDbRequestTimeout;
this.billingMode = billingMode;
this.leaseTableDeletionProtectionEnabled = leaseTableDeletionProtectionEnabled;
this.leaseTablePitrEnabled = leaseTablePitrEnabled;
this.leaseSerializer = leaseSerializer;
this.customShardDetectorProvider = customShardDetectorProvider;
this.isMultiStreamMode = isMultiStreamMode;
Expand Down Expand Up @@ -1091,6 +1164,7 @@ public DynamoDBLeaseRefresher createLeaseRefresher() {
dynamoDbRequestTimeout,
billingMode,
leaseTableDeletionProtectionEnabled,
leaseTablePitrEnabled,
tags);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
import software.amazon.awssdk.services.dynamodb.model.TableStatus;
import software.amazon.awssdk.services.dynamodb.model.Tag;
import software.amazon.awssdk.services.dynamodb.model.UpdateContinuousBackupsRequest;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
Expand Down Expand Up @@ -81,6 +82,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
private final Duration dynamoDbRequestTimeout;
private final BillingMode billingMode;
private final boolean leaseTableDeletionProtectionEnabled;
private final boolean leaseTablePitrEnabled;
private final Collection<Tag> tags;

private boolean newTableCreated = false;
Expand Down Expand Up @@ -159,7 +161,7 @@ public DynamoDBLeaseRefresher(
tableCreatorCallback,
dynamoDbRequestTimeout,
BillingMode.PAY_PER_REQUEST,
false);
LeaseManagementConfig.DEFAULT_LEASE_TABLE_DELETION_PROTECTION_ENABLED);
}

/**
Expand Down Expand Up @@ -207,6 +209,41 @@ public DynamoDBLeaseRefresher(
* @param leaseTableDeletionProtectionEnabled
* @param tags
*/
@Deprecated
public DynamoDBLeaseRefresher(
final String table,
final DynamoDbAsyncClient dynamoDBClient,
final LeaseSerializer serializer,
final boolean consistentReads,
@NonNull final TableCreatorCallback tableCreatorCallback,
Duration dynamoDbRequestTimeout,
final BillingMode billingMode,
final boolean leaseTableDeletionProtectionEnabled,
final Collection<Tag> tags) {
this(
table,
dynamoDBClient,
serializer,
consistentReads,
tableCreatorCallback,
dynamoDbRequestTimeout,
billingMode,
leaseTableDeletionProtectionEnabled,
LeaseManagementConfig.DEFAULT_LEASE_TABLE_PITR_ENABLED,
tags);
}

/**
* Constructor.
* @param table
* @param dynamoDBClient
* @param serializer
* @param consistentReads
* @param tableCreatorCallback
* @param dynamoDbRequestTimeout
* @param billingMode
* @param leaseTableDeletionProtectionEnabled
*/
public DynamoDBLeaseRefresher(
final String table,
final DynamoDbAsyncClient dynamoDBClient,
Expand All @@ -216,6 +253,7 @@ public DynamoDBLeaseRefresher(
Duration dynamoDbRequestTimeout,
final BillingMode billingMode,
final boolean leaseTableDeletionProtectionEnabled,
final boolean leaseTablePitrEnabled,
final Collection<Tag> tags) {
this.table = table;
this.dynamoDBClient = dynamoDBClient;
Expand All @@ -225,6 +263,7 @@ public DynamoDBLeaseRefresher(
this.dynamoDbRequestTimeout = dynamoDbRequestTimeout;
this.billingMode = billingMode;
this.leaseTableDeletionProtectionEnabled = leaseTableDeletionProtectionEnabled;
this.leaseTablePitrEnabled = leaseTablePitrEnabled;
this.tags = tags;
}

Expand Down Expand Up @@ -252,7 +291,33 @@ public boolean createLeaseTableIfNotExists(@NonNull final Long readCapacity, @No
public boolean createLeaseTableIfNotExists() throws ProvisionedThroughputException, DependencyException {
final CreateTableRequest request = createTableRequestBuilder().build();

return createTableIfNotExists(request);
boolean tableExists = createTableIfNotExists(request);

if (leaseTablePitrEnabled) {
enablePitr();
log.info("Enabled PITR on table {}", table);
}

return tableExists;
}

private void enablePitr() throws DependencyException {
final UpdateContinuousBackupsRequest request = UpdateContinuousBackupsRequest.builder()
.tableName(table)
.pointInTimeRecoverySpecification(builder -> builder.pointInTimeRecoveryEnabled(true))
.build();

final AWSExceptionManager exceptionManager = createExceptionManager();
exceptionManager.add(ResourceNotFoundException.class, t -> t);
exceptionManager.add(ProvisionedThroughputExceededException.class, t -> t);

try {
FutureUtils.resolveOrCancelFuture(dynamoDBClient.updateContinuousBackups(request), dynamoDbRequestTimeout);
} catch (ExecutionException e) {
throw exceptionManager.apply(e.getCause());
} catch (InterruptedException | DynamoDbException | TimeoutException e) {
throw new DependencyException(e);
}
}

private boolean createTableIfNotExists(CreateTableRequest request)
Expand Down
Loading

0 comments on commit 1c0c41c

Please sign in to comment.