Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/publish old image #25

Open
wants to merge 35 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
8bba04c
Add config constants for role arn and define them in the config def, …
breecoffey-fetch May 12, 2022
5b9dae7
Pass in the assumeRoleArn field from the config file to use it for pr…
breecoffey-fetch May 12, 2022
8b1d686
Add unit tests for getCredentials with different combinations of inputs
breecoffey-fetch May 12, 2022
6ea540c
Update options with new config parameter
breecoffey-fetch May 12, 2022
1f0dc53
Merge pull request #1 from fetch-rewards/add-assumed-role-arn
ryanjclark Jun 10, 2022
2f53869
removed hardcoded table name
nrinaldi-fetch Jun 10, 2022
5817ebf
changed back to test
nrinaldi-fetch Jun 10, 2022
bf5f430
removed hardcode
nrinaldi-fetch Jun 10, 2022
f3dcc60
Merge pull request #2 from fetch-rewards/remove-table-hardcode
nrinaldi-fetch Jun 10, 2022
6fb0de4
updated test-cases and jar filename as per build
gurjit-sandhu Jun 10, 2022
cabd9f2
Merge pull request #3 from fetch-rewards/remove-table-hardcode
gurjit-sandhu Jun 13, 2022
4d8f24a
unMarshalling Dynamo json records
gurjit-sandhu Sep 11, 2022
90e852b
updated test cases
gurjit-sandhu Sep 12, 2022
635aa53
Merge pull request #4 from fetch-rewards/unMarshall-Dynamo-Documents
gurjit-sandhu Sep 12, 2022
5b04a64
updated test cases
gurjit-sandhu Sep 12, 2022
f6ed6e0
updating format to JSON before pushing records to kafka
gurjit-sandhu Sep 13, 2022
679b092
Merge pull request #5 from fetch-rewards/convert-dyanmo-documents-to-…
gurjit-sandhu Sep 13, 2022
8cc06aa
updating document to JSON and updating test cases
gurjit-sandhu Sep 13, 2022
2270874
Merge pull request #6 from fetch-rewards/convert-dyanmo-documents-to-…
gurjit-sandhu Sep 13, 2022
ca565ca
updated to send unmarshalled record with schema to kafka topic and up…
gurjit-sandhu Sep 14, 2022
e8d0240
Merge pull request #7 from fetch-rewards/convert-dyanmo-documents-to-…
gurjit-sandhu Sep 14, 2022
91f063b
Merge branch 'unMarshall-Dynamo-Documents' of https://github.com/fetc…
gurjit-sandhu Sep 14, 2022
b3686e2
unMarshalling dynamo documents before sending to kafka topic
gurjit-sandhu Sep 14, 2022
05ec398
Merge pull request #9 from fetch-rewards/unMarshall-Dynamo-Documents
gurjit-sandhu Sep 14, 2022
3e89caf
Merge remote-tracking branch 'upstream/master' into feature/init-sync
ryanjclark Feb 21, 2023
9dcbf42
add init sync skip with new status and initial position in stream
ryanjclark Mar 1, 2023
7665279
sourceInfo skipInitSync method
ryanjclark Mar 2, 2023
b1f839c
init sync flag to not restart if skipped
ryanjclark Mar 2, 2023
1bf58c4
changed offset time of init sync sourceInfo
ryanjclark Mar 3, 2023
3e63b73
remove comment
ryanjclark Mar 8, 2023
3f4c270
Merge pull request #11 from fetch-rewards/feature/init-sync
ryanjclark Mar 8, 2023
8b646ac
handle no offset init.sync.skip
ryanjclark Mar 27, 2023
fb3411f
Merge pull request #12 from fetch-rewards/bugfix/init-sync-skip-no-of…
ryanjclark Mar 27, 2023
de864e1
initial commit for testing
samcorzineatfetch Jun 15, 2023
9f65351
task is working as desired
samcorzineatfetch Jun 16, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ In our implementation we opted to use Amazon Kinesis Client with DynamoDB Stream
* Java 8
* Gradlew 5.3.1
* Kafka Connect Framework >= 2.1.1
* Amazon Kinesis Client 1.9.1
* DynamoDB Streams Kinesis Adapter 1.5.2
* Amazon Kinesis Client 1.13.1
* DynamoDB Streams Kinesis Adapter 1.5.3

## Documentation
* [Getting started](docs/getting-started.md)
Expand All @@ -44,6 +44,7 @@ In our implementation we opted to use Amazon Kinesis Client with DynamoDB Stream
* However you will only encounter this issue by running lots of tasks on one machine with really high load.

* Synced(Source) DynamoDB table unit capacity must be large enough to ensure `INIT_SYNC` to be finished in around 16 hours. Otherwise there is a risk `INIT_SYNC` being restarted just as soon as it's finished because DynamoDB Streams store change events only for 24 hours.
* `INIT_SYNC` can be skipped with `init.sync.skip=true` configuration

* Required AWS roles:
```json
Expand Down
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ task userWrapper(type: Wrapper) {

description = "kafka-connect-dynamodb"

shadowJar.archiveName = "${project.name}.jar"

allprojects {
group "com.trustpilot.kafka.connect.dynamodb"
version = gitVersion()
Expand Down
2 changes: 2 additions & 0 deletions docs/details.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ This connector can sync multiple DynamoDB tables at the same time and it does so

`INIT_SYNC` is a process when all existing table data is scanned and pushed into Kafka destination topic. Usually this happens only once after source task for specific table is started for the first time. But it can be repeated in case of unexpected issues, e.g. if source connector was down for long period of time and it is possible that it has missed some of the change events from the table stream (DynamoDB streams store data for 24 hours only).

Using `init.sync.skip` will skip this process and the connector will only ever read from the LATEST position in the stream.

### 3. "SYNC"

Once `INIT_SYNC` is finished source task switches into DynamoDB Streams consumer state. There all changes that happen to the source table are represented in this stream and copied over to the Kafka's destination topic. Consumers of this topic can recreate full state of the source table at any given time.
Expand Down
6 changes: 6 additions & 0 deletions docs/options.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"aws.region": "eu-west-1",
"aws.access.key.id": "",
"aws.secret.key": "",
"aws.assume.role.arn": "",

"dynamodb.table.env.tag.key": "environment",
"dynamodb.table.env.tag.value": "dev",
Expand All @@ -35,9 +36,12 @@
"tasks.max": "1",

"init.sync.delay.period": 60,
"init.sync.skip": false,
"connect.dynamodb.rediscovery.period": "60000"
}
```
`aws.assume.role.arn` - ARN identifier of an IAM role that the KCL and Dynamo Clients can assume for cross account access

`dynamodb.table.env.tag.key` - tag key used to define environment. Useful if you have `staging` and `production` under same AWS account. Or if you want to use different Kafka Connect clusters to sync different tables.

`dynamodb.table.env.tag.value` - defines from which environment to ingest tables. For e.g. 'staging' or 'production'...
Expand All @@ -50,6 +54,8 @@

`init.sync.delay.period` - time interval in seconds. Defines how long `INIT_SYNC` should delay execution before starting. This is used to give time for Kafka Connect tasks to calm down after rebalance (Since multiple tasks rebalances can happen in quick succession and this would mean more duplicated data since `INIT_SYNC` process won't have time mark it's progress).

`init.sync.skip` - boolean to determine whether to start the connector reading the entire table or from the latest offset.

`connect.dynamodb.rediscovery.period` - time interval in milliseconds. Defines how often connector should try to find new DynamoDB tables (or detect removed ones). If changes are found tasks are automatically reconfigured.

`dynamodb.service.endpoint` - AWS DynamoDB API Endpoint. Will use default AWS if not set.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,14 @@ public void start(Map<String, String> properties) {
AwsClients.buildAWSResourceGroupsTaggingAPIClient(config.getAwsRegion(),
config.getResourceTaggingServiceEndpoint(),
config.getAwsAccessKeyIdValue(),
config.getAwsSecretKeyValue());
config.getAwsSecretKeyValue(),
config.getAwsAssumeRoleArn());

AmazonDynamoDB dynamoDBClient = AwsClients.buildDynamoDbClient(config.getAwsRegion(),
config.getDynamoDBServiceEndpoint(),
config.getAwsAccessKeyIdValue(),
config.getAwsSecretKeyValue());
config.getAwsSecretKeyValue(),
config.getAwsAssumeRoleArn());

if (tablesProvider == null) {
if (config.getWhitelistTables() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ public class DynamoDBSourceConnectorConfig extends AbstractConfig {
public static final String SRC_INIT_SYNC_DELAY_DISPLAY = "INIT_SYNC delay";
public static final int SRC_INIT_SYNC_DELAY_DEFAULT = 60;

public static final String SRC_INIT_SYNC_SKIP_CONFIG = "init.sync.skip";
public static final String SRC_INIT_SYNC_SKIP_DOC = "Define whether to skip INIT_SYNC of table.";
public static final String SRC_INIT_SYNC_SKIP_DISPLAY = "Skip INIT_SYNC";
public static final boolean SRC_INIT_SYNC_SKIP_DEFAULT = false;

public static final String AWS_REGION_CONFIG = "aws.region";
public static final String AWS_REGION_DOC = "Define AWS region.";
public static final String AWS_REGION_DISPLAY = "Region";
Expand Down Expand Up @@ -57,6 +62,11 @@ public class DynamoDBSourceConnectorConfig extends AbstractConfig {
public static final String SRC_KCL_TABLE_BILLING_MODE_DISPLAY = "KCL table billing mode";
public static final String SRC_KCL_TABLE_BILLING_MODE_DEFAULT = "PROVISIONED";

public static final String AWS_ASSUME_ROLE_ARN_CONFIG = "aws.assume.role.arn";
public static final String AWS_ASSUME_ROLE_ARN_DOC = "Define which role arn the KCL/Dynamo Client should assume.";
public static final String AWS_ASSUME_ROLE_ARN_DISPLAY = "Assume Role Arn";
public static final String AWS_ASSUME_ROLE_ARN_DEFAULT = null;

public static final String DST_TOPIC_PREFIX_CONFIG = "kafka.topic.prefix";
public static final String DST_TOPIC_PREFIX_DOC = "Define Kafka topic destination prefix. End will be the name of a table.";
public static final String DST_TOPIC_PREFIX_DISPLAY = "Topic prefix";
Expand Down Expand Up @@ -181,6 +191,15 @@ public static ConfigDef baseConfigDef() {
ConfigDef.Width.MEDIUM,
SRC_KCL_TABLE_BILLING_MODE_DISPLAY)

.define(AWS_ASSUME_ROLE_ARN_CONFIG,
ConfigDef.Type.STRING,
AWS_ASSUME_ROLE_ARN_DEFAULT,
ConfigDef.Importance.LOW,
AWS_ASSUME_ROLE_ARN_DOC,
AWS_GROUP, 10,
ConfigDef.Width.LONG,
AWS_ASSUME_ROLE_ARN_DISPLAY)

.define(DST_TOPIC_PREFIX_CONFIG,
ConfigDef.Type.STRING,
DST_TOPIC_PREFIX_DEFAULT,
Expand All @@ -190,12 +209,21 @@ public static ConfigDef baseConfigDef() {
ConfigDef.Width.MEDIUM,
DST_TOPIC_PREFIX_DISPLAY)

.define(SRC_INIT_SYNC_SKIP_CONFIG,
ConfigDef.Type.BOOLEAN,
SRC_INIT_SYNC_SKIP_DEFAULT,
ConfigDef.Importance.LOW,
SRC_INIT_SYNC_SKIP_DOC,
CONNECTOR_GROUP, 2,
ConfigDef.Width.MEDIUM,
SRC_INIT_SYNC_SKIP_DISPLAY)

.define(SRC_INIT_SYNC_DELAY_CONFIG,
ConfigDef.Type.INT,
SRC_INIT_SYNC_DELAY_DEFAULT,
ConfigDef.Importance.LOW,
SRC_INIT_SYNC_DELAY_DOC,
CONNECTOR_GROUP, 2,
CONNECTOR_GROUP, 3,
ConfigDef.Width.MEDIUM,
SRC_INIT_SYNC_DELAY_DISPLAY)

Expand Down Expand Up @@ -253,6 +281,10 @@ public long getRediscoveryPeriod() {
return getLong(REDISCOVERY_PERIOD_CONFIG);
}

public boolean getInitSyncSkip() {
return (boolean)get(SRC_INIT_SYNC_SKIP_CONFIG);
}

public int getInitSyncDelay() {
return (int)get(SRC_INIT_SYNC_DELAY_CONFIG);
}
Expand All @@ -272,4 +304,8 @@ public List<String> getWhitelistTables() {
public BillingMode getKCLTableBillingMode() {
return BillingMode.fromValue(getString(SRC_KCL_TABLE_BILLING_MODE_CONFIG));
}

public String getAwsAssumeRoleArn() {
return getString(AWS_ASSUME_ROLE_ARN_CONFIG);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public class DynamoDBSourceTask extends SourceTask {
private SourceInfo sourceInfo;
private TableDescription tableDesc;
private int initSyncDelay;
private boolean initSyncSkip;

@SuppressWarnings("unused")
//Used by Confluent platform to initialize connector
Expand Down Expand Up @@ -118,11 +119,13 @@ public void start(Map<String, String> configProperties) {
config.getAwsRegion(),
config.getDynamoDBServiceEndpoint(),
config.getAwsAccessKeyIdValue(),
config.getAwsSecretKeyValue());
config.getAwsSecretKeyValue(),
config.getAwsAssumeRoleArn());
}
tableDesc = client.describeTable(config.getTableName()).getTable();

initSyncDelay = config.getInitSyncDelay();
initSyncSkip = config.getInitSyncSkip();

LOGGER.debug("Getting offset for table: {}", tableDesc.getTableName());
setStateFromOffset();
Expand All @@ -142,15 +145,23 @@ public void start(Map<String, String> configProperties) {
config.getAwsRegion(),
config.getDynamoDBServiceEndpoint(),
config.getAwsAccessKeyIdValue(),
config.getAwsSecretKeyValue());
config.getAwsSecretKeyValue(),
config.getAwsAssumeRoleArn());

if (kclWorker == null) {
kclWorker = new KclWorkerImpl(
AwsClients.getCredentials(config.getAwsAccessKeyIdValue(), config.getAwsSecretKeyValue()),
AwsClients.getCredentials(config.getAwsAccessKeyIdValue(), config.getAwsSecretKeyValue(), config.getAwsAssumeRoleArn()),
eventsQueue,
shardRegister);
}
kclWorker.start(client, dynamoDBStreamsClient, tableDesc.getTableName(), config.getTaskID(), config.getDynamoDBServiceEndpoint(), config.getKCLTableBillingMode());
kclWorker.start(client,
dynamoDBStreamsClient,
tableDesc.getTableName(),
config.getTaskID(),
config.getDynamoDBServiceEndpoint(),
config.getInitSyncSkip(),
config.getKCLTableBillingMode()
);

shutdown = false;
}
Expand All @@ -160,10 +171,17 @@ private void setStateFromOffset() {
.offset(Collections.singletonMap("table_name", tableDesc.getTableName()));
if (offset != null) {
sourceInfo = SourceInfo.fromOffset(offset, clock);
if (initSyncSkip) {
sourceInfo.skipInitSync();
}
} else {
LOGGER.debug("No stored offset found for table: {}", tableDesc.getTableName());
sourceInfo = new SourceInfo(tableDesc.getTableName(), clock);
sourceInfo.startInitSync(); // InitSyncStatus always needs to run after adding new table
if (initSyncSkip) {
sourceInfo.skipInitSync();
} else {
sourceInfo.startInitSync();
}
}
}

Expand Down Expand Up @@ -194,6 +212,9 @@ public List<SourceRecord> poll() throws InterruptedException {
if (sourceInfo.initSyncStatus == InitSyncStatus.FINISHED) {
return sync();
}
if (sourceInfo.initSyncStatus == InitSyncStatus.SKIPPED) {
return sync();
}
throw new Exception("Invalid SourceInfo InitSyncStatus state: " + sourceInfo.initSyncStatus);
} catch (InterruptedException ex) {
LOGGER.error("Failed to handle incoming records. Records dropped!", ex);
Expand Down Expand Up @@ -236,6 +257,7 @@ private LinkedList<SourceRecord> initSync() throws Exception {
result.add(converter.toSourceRecord(sourceInfo,
Envelope.Operation.READ,
record,
null,
sourceInfo.lastInitSyncStart,
null,
null));
Expand All @@ -254,6 +276,7 @@ private LinkedList<SourceRecord> initSync() throws Exception {
result.add(converter.toSourceRecord(sourceInfo,
Envelope.Operation.READ,
lastRecord,
null,
sourceInfo.lastInitSyncStart,
null,
null));
Expand All @@ -277,6 +300,7 @@ private List<SourceRecord> sync() throws Exception {
LOGGER.debug("Waiting for records from eventsQueue for table: {}", tableDesc.getTableName());
KclRecordsWrapper dynamoDBRecords = eventsQueue.poll(500, TimeUnit.MILLISECONDS);
if (dynamoDBRecords == null) {
LOGGER.debug("null dynamoDBRecords");
return null; // returning thread control at regular intervals
}

Expand Down Expand Up @@ -310,12 +334,12 @@ private List<SourceRecord> sync() throws Exception {
}

// Received record which is behind "safe" zone. Indicating that "potentially" we lost some records.
// Need to resync...
// Need to resync if sync hasn't been skipped...
// This happens if:
// * connector was down for some time
// * connector is lagging
// * connector failed to finish init sync in acceptable time frame
if (recordIsInDangerZone(arrivalTimestamp)) {
if (recordIsInDangerZone(arrivalTimestamp) && sourceInfo.initSyncStatus != InitSyncStatus.SKIPPED) {
sourceInfo.startInitSync();

LOGGER.info(
Expand All @@ -342,9 +366,12 @@ private List<SourceRecord> sync() throws Exception {
attributes = dynamoDbRecord.getDynamodb().getKeys();
}

Map<String, AttributeValue> oldImage = dynamoDbRecord.getDynamodb().getOldImage();

SourceRecord sourceRecord = converter.toSourceRecord(sourceInfo,
op,
attributes,
oldImage,
arrivalTimestamp.toInstant(),
dynamoDBRecords.getShardId(),
record.getSequenceNumber());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ public static final class FieldName {
* The {@code after} field is used to store the state of a record after an operation.
*/
public static final String DOCUMENT = "document";
/**
* The {@code oldDocument} field is used to store the state of a record before an operation.
*/
public static final String OLD_DOCUMENT = "old_document";
/**
* The {@code op} field is used to store the kind of operation on a record.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@
public enum InitSyncStatus {
UNDEFINED,
RUNNING,
FINISHED
FINISHED,
SKIPPED
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ public void endInitSync() {
lastInitSyncEnd = Instant.now(clock);
}

public void skipInitSync() {
initSyncStatus = InitSyncStatus.SKIPPED;
lastInitSyncStart = Instant.ofEpochSecond(0);
lastInitSyncEnd = Instant.ofEpochSecond(0);;
exclusiveStartKey = null;
initSyncCount = 0L;
}

private static final Schema STRUCT_SCHEMA = SchemaBuilder.struct()
.name(SchemaNameAdjuster
.defaultAdjuster()
Expand Down
Loading