From 8bba04cbb6650520b3c33ad66b71c5d3058353c8 Mon Sep 17 00:00:00 2001 From: Bree Coffey Date: Thu, 12 May 2022 16:02:51 -0500 Subject: [PATCH 01/23] Add config constants for role arn and define them in the config def, add a getter for the new field --- .../DynamoDBSourceConnectorConfig.java | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceConnectorConfig.java b/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceConnectorConfig.java index 078d6fb..86299be 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceConnectorConfig.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceConnectorConfig.java @@ -57,6 +57,11 @@ public class DynamoDBSourceConnectorConfig extends AbstractConfig { public static final String SRC_KCL_TABLE_BILLING_MODE_DISPLAY = "KCL table billing mode"; public static final String SRC_KCL_TABLE_BILLING_MODE_DEFAULT = "PROVISIONED"; + public static final String AWS_ASSUME_ROLE_ARN_CONFIG = "aws.assume.role.arn"; + public static final String AWS_ASSUME_ROLE_ARN_DOC = "Define which role arn the KCL/Dynamo Client should assume."; + public static final String AWS_ASSUME_ROLE_ARN_DISPLAY = "Assume Role Arn"; + public static final String AWS_ASSUME_ROLE_ARN_DEFAULT = null; + public static final String DST_TOPIC_PREFIX_CONFIG = "kafka.topic.prefix"; public static final String DST_TOPIC_PREFIX_DOC = "Define Kafka topic destination prefix. End will be the name of a table."; public static final String DST_TOPIC_PREFIX_DISPLAY = "Topic prefix"; @@ -181,6 +186,15 @@ public static ConfigDef baseConfigDef() { ConfigDef.Width.MEDIUM, SRC_KCL_TABLE_BILLING_MODE_DISPLAY) + .define(AWS_ASSUME_ROLE_ARN_CONFIG, + ConfigDef.Type.STRING, + AWS_ASSUME_ROLE_ARN_DEFAULT, + ConfigDef.Importance.LOW, + AWS_ASSUME_ROLE_ARN_DOC, + AWS_GROUP, 10, + ConfigDef.Width.LONG, + AWS_ASSUME_ROLE_ARN_DISPLAY) + .define(DST_TOPIC_PREFIX_CONFIG, ConfigDef.Type.STRING, DST_TOPIC_PREFIX_DEFAULT, @@ -272,4 +286,8 @@ public List getWhitelistTables() { public BillingMode getKCLTableBillingMode() { return BillingMode.fromValue(getString(SRC_KCL_TABLE_BILLING_MODE_CONFIG)); } + + public String getAwsAssumeRoleArn() { + return getString(AWS_ASSUME_ROLE_ARN_CONFIG); + } } From 5b9dae7e6449b3804db4bd31b994d76eef8000d9 Mon Sep 17 00:00:00 2001 From: Bree Coffey Date: Thu, 12 May 2022 16:11:43 -0500 Subject: [PATCH 02/23] Pass in the assumeRoleArn field from the config file to use it for providing credentions for the session --- .../dynamodb/DynamoDBSourceConnector.java | 6 ++-- .../dynamodb/DynamoDBSourceTask.java | 8 +++-- .../connector/dynamodb/aws/AwsClients.java | 36 +++++++++++++------ .../dynamodb/KafkaConnectITBase.java | 4 ++- 4 files changed, 37 insertions(+), 17 deletions(-) diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceConnector.java b/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceConnector.java index 0e95906..c864881 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceConnector.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceConnector.java @@ -57,12 +57,14 @@ public void start(Map properties) { AwsClients.buildAWSResourceGroupsTaggingAPIClient(config.getAwsRegion(), config.getResourceTaggingServiceEndpoint(), config.getAwsAccessKeyIdValue(), - config.getAwsSecretKeyValue()); + config.getAwsSecretKeyValue(), + config.getAwsAssumeRoleArn()); AmazonDynamoDB dynamoDBClient = AwsClients.buildDynamoDbClient(config.getAwsRegion(), config.getDynamoDBServiceEndpoint(), config.getAwsAccessKeyIdValue(), - config.getAwsSecretKeyValue()); + config.getAwsSecretKeyValue(), + config.getAwsAssumeRoleArn()); if (tablesProvider == null) { if (config.getWhitelistTables() != null) { diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java b/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java index 1868cc3..0f53fdc 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java @@ -118,7 +118,8 @@ public void start(Map configProperties) { config.getAwsRegion(), config.getDynamoDBServiceEndpoint(), config.getAwsAccessKeyIdValue(), - config.getAwsSecretKeyValue()); + config.getAwsSecretKeyValue(), + config.getAwsAssumeRoleArn()); } tableDesc = client.describeTable(config.getTableName()).getTable(); @@ -142,11 +143,12 @@ public void start(Map configProperties) { config.getAwsRegion(), config.getDynamoDBServiceEndpoint(), config.getAwsAccessKeyIdValue(), - config.getAwsSecretKeyValue()); + config.getAwsSecretKeyValue(), + config.getAwsAssumeRoleArn()); if (kclWorker == null) { kclWorker = new KclWorkerImpl( - AwsClients.getCredentials(config.getAwsAccessKeyIdValue(), config.getAwsSecretKeyValue()), + AwsClients.getCredentials(config.getAwsAccessKeyIdValue(), config.getAwsSecretKeyValue(), config.getAwsAssumeRoleArn()), eventsQueue, shardRegister); } diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/aws/AwsClients.java b/source/src/main/java/com/trustpilot/connector/dynamodb/aws/AwsClients.java index 175f6f0..d6cbdfc 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/aws/AwsClients.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/aws/AwsClients.java @@ -5,6 +5,7 @@ import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; +import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider; import com.amazonaws.client.builder.AwsClientBuilder; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; @@ -22,44 +23,56 @@ public class AwsClients { public static AmazonDynamoDB buildDynamoDbClient(String awsRegion, String serviceEndpoint, String awsAccessKeyID, - String awsSecretKey) { - + String awsSecretKey, + String awsAssumeRoleArn) { return (AmazonDynamoDB) configureBuilder( AmazonDynamoDBClientBuilder.standard(), awsRegion, serviceEndpoint, awsAccessKeyID, - awsSecretKey) + awsSecretKey, + awsAssumeRoleArn) .build(); } public static AWSResourceGroupsTaggingAPI buildAWSResourceGroupsTaggingAPIClient(String awsRegion, String serviceEndpoint, String awsAccessKeyID, - String awsSecretKey) { + String awsSecretKey, + String awsAssumeRoleArn) { return (AWSResourceGroupsTaggingAPI) configureBuilder( AWSResourceGroupsTaggingAPIClientBuilder.standard(), awsRegion, serviceEndpoint, awsAccessKeyID, - awsSecretKey) + awsSecretKey, + awsAssumeRoleArn) .build(); } public static AmazonDynamoDBStreams buildDynamoDbStreamsClient(String awsRegion, String serviceEndpoint, String awsAccessKeyID, - String awsSecretKey) { + String awsSecretKey, + String awsAssumeRoleArn) { return (AmazonDynamoDBStreams) configureBuilder( AmazonDynamoDBStreamsClientBuilder.standard(), awsRegion, serviceEndpoint, awsAccessKeyID, - awsSecretKey) + awsSecretKey, + awsAssumeRoleArn) .build(); } - public static AWSCredentialsProvider getCredentials(String awsAccessKeyID, String awsSecretKey) { - if (awsAccessKeyID == null || awsSecretKey == null) { + public static AWSCredentialsProvider getCredentials(String awsAccessKeyID, + String awsSecretKey, + String awsAssumeRoleArn) { + if (awsAssumeRoleArn != null ) { + LOGGER.debug("Using STSAssumeRoleSessionCredentialsProvider"); + AWSCredentialsProvider awsCredentialsProviderChain = DefaultAWSCredentialsProviderChain.getInstance(); + return new STSAssumeRoleSessionCredentialsProvider(awsCredentialsProviderChain, + awsAssumeRoleArn, "kafkaconnect"); + } else if (awsAccessKeyID == null || awsSecretKey == null) { LOGGER.debug("Using DefaultAWSCredentialsProviderChain"); return DefaultAWSCredentialsProviderChain.getInstance(); @@ -75,9 +88,10 @@ private static AwsClientBuilder configureBuilder(AwsClientBuilder builder, String awsRegion, String serviceEndpoint, String awsAccessKeyID, - String awsSecretKey) { + String awsSecretKey, + String awsAssumeRoleArn) { - builder.withCredentials(getCredentials(awsAccessKeyID, awsSecretKey)) + builder.withCredentials(getCredentials(awsAccessKeyID, awsSecretKey, awsAssumeRoleArn)) .withClientConfiguration(new ClientConfiguration().withThrottledRetries(true)); if(serviceEndpoint != null && !serviceEndpoint.isEmpty()) { diff --git a/source/src/test/java/com/trustpilot/connector/dynamodb/KafkaConnectITBase.java b/source/src/test/java/com/trustpilot/connector/dynamodb/KafkaConnectITBase.java index cc0a713..f163528 100644 --- a/source/src/test/java/com/trustpilot/connector/dynamodb/KafkaConnectITBase.java +++ b/source/src/test/java/com/trustpilot/connector/dynamodb/KafkaConnectITBase.java @@ -48,6 +48,7 @@ public class KafkaConnectITBase { protected static final String AWS_REGION_CONFIG = "eu-west-3"; protected static final String AWS_ACCESS_KEY_ID_CONFIG = "ABCD"; protected static final String AWS_SECRET_KEY_CONFIG = "1234"; + protected static final String AWS_ASSUME_ROLE_ARN_CONFIG = null; protected static final String SRC_DYNAMODB_TABLE_INGESTION_TAG_KEY_CONFIG = "datalake-ingest"; private static Network network; @@ -187,7 +188,8 @@ private AmazonDynamoDB getDynamoDBClient() { AWS_REGION_CONFIG, dynamodb.getEndpoint(), AWS_ACCESS_KEY_ID_CONFIG, - AWS_SECRET_KEY_CONFIG + AWS_SECRET_KEY_CONFIG, + AWS_ASSUME_ROLE_ARN_CONFIG ); } From 8b1d686730c7d38b91777f30f75922387d9986c1 Mon Sep 17 00:00:00 2001 From: Bree Coffey Date: Thu, 12 May 2022 16:50:39 -0500 Subject: [PATCH 03/23] Add unit tests for getCredentials with different combinations of inputs --- .../dynamodb/aws/AwsClientsTests.java | 53 +++++++++++++++++++ 1 file changed, 53 insertions(+) create mode 100644 source/src/test/java/com/trustpilot/connector/dynamodb/aws/AwsClientsTests.java diff --git a/source/src/test/java/com/trustpilot/connector/dynamodb/aws/AwsClientsTests.java b/source/src/test/java/com/trustpilot/connector/dynamodb/aws/AwsClientsTests.java new file mode 100644 index 0000000..8dcd9b9 --- /dev/null +++ b/source/src/test/java/com/trustpilot/connector/dynamodb/aws/AwsClientsTests.java @@ -0,0 +1,53 @@ +package com.trustpilot.connector.dynamodb.aws; + +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; +import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class AwsClientsTests { + + @Test + public void stsAssumeRoleProviderReturned() { + String testRoleArn = "arn:aws:iam::111111111111:role/unit-test"; + AWSCredentialsProvider provider = AwsClients.getCredentials( + null, + null, + testRoleArn + ); + + DefaultAWSCredentialsProviderChain testChain = Mockito.mock(DefaultAWSCredentialsProviderChain.class); + STSAssumeRoleSessionCredentialsProvider expectedProvider = new STSAssumeRoleSessionCredentialsProvider( + testChain.getInstance(), + testRoleArn, + "kafkaconnect" + ); + assertEquals(provider.getClass(), expectedProvider.getClass()); + } + + @Test + public void defaultProviderReturned() { + AWSCredentialsProvider provider = AwsClients.getCredentials( + null, + null, + null + ); + + assertEquals(provider.getClass(), DefaultAWSCredentialsProviderChain.class); + } + + @Test + public void staticCredentialsReturned() { + AWSCredentialsProvider provider = AwsClients.getCredentials( + "unit-test", + "unit-test", + null + ); + + assertEquals(provider.getClass(), AWSStaticCredentialsProvider.class); + } +} From 6ea540c08df3f6712b93f3163eef53e7bd557202 Mon Sep 17 00:00:00 2001 From: Bree Coffey Date: Thu, 12 May 2022 16:52:50 -0500 Subject: [PATCH 04/23] Update options with new config parameter --- docs/options.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/docs/options.md b/docs/options.md index 423a1c7..a0c6e4e 100644 --- a/docs/options.md +++ b/docs/options.md @@ -20,6 +20,7 @@ "aws.region": "eu-west-1", "aws.access.key.id": "", "aws.secret.key": "", + "aws.assume.role.arn": "", "dynamodb.table.env.tag.key": "environment", "dynamodb.table.env.tag.value": "dev", @@ -38,6 +39,8 @@ "connect.dynamodb.rediscovery.period": "60000" } ``` +`aws.assume.role.arn` - ARN identifier of an IAM role that the KCL and Dynamo Clients can assume for cross account access + `dynamodb.table.env.tag.key` - tag key used to define environment. Useful if you have `staging` and `production` under same AWS account. Or if you want to use different Kafka Connect clusters to sync different tables. `dynamodb.table.env.tag.value` - defines from which environment to ingest tables. For e.g. 'staging' or 'production'... From 2f53869d3d7e57bac045640239a3587ca7e77407 Mon Sep 17 00:00:00 2001 From: Nicholas Rinaldi Date: Fri, 10 Jun 2022 11:37:08 -0400 Subject: [PATCH 05/23] removed hardcoded table name --- .../trustpilot/connector/dynamodb/utils/RecordConverter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java b/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java index dd9137c..c485951 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java @@ -45,7 +45,7 @@ public class RecordConverter { public RecordConverter(TableDescription tableDesc, String topicNamePrefix) { this.tableDesc = tableDesc; - this.topic_name = topicNamePrefix + tableDesc.getTableName(); + this.topic_name = topicNamePrefix; valueSchema = SchemaBuilder.struct() .name(SchemaNameAdjuster.DEFAULT.adjust( "com.trustpilot.connector.dynamodb.envelope")) From 5817ebf8887af8f951ed246b39d7732684dff024 Mon Sep 17 00:00:00 2001 From: Nicholas Rinaldi Date: Fri, 10 Jun 2022 11:58:10 -0400 Subject: [PATCH 06/23] changed back to test --- .../trustpilot/connector/dynamodb/utils/RecordConverter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java b/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java index c485951..dd9137c 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java @@ -45,7 +45,7 @@ public class RecordConverter { public RecordConverter(TableDescription tableDesc, String topicNamePrefix) { this.tableDesc = tableDesc; - this.topic_name = topicNamePrefix; + this.topic_name = topicNamePrefix + tableDesc.getTableName(); valueSchema = SchemaBuilder.struct() .name(SchemaNameAdjuster.DEFAULT.adjust( "com.trustpilot.connector.dynamodb.envelope")) From bf5f430d78195ae9dd7df93c534e365a4b8dc473 Mon Sep 17 00:00:00 2001 From: Nicholas Rinaldi Date: Fri, 10 Jun 2022 11:58:50 -0400 Subject: [PATCH 07/23] removed hardcode --- .../trustpilot/connector/dynamodb/utils/RecordConverter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java b/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java index dd9137c..c485951 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java @@ -45,7 +45,7 @@ public class RecordConverter { public RecordConverter(TableDescription tableDesc, String topicNamePrefix) { this.tableDesc = tableDesc; - this.topic_name = topicNamePrefix + tableDesc.getTableName(); + this.topic_name = topicNamePrefix; valueSchema = SchemaBuilder.struct() .name(SchemaNameAdjuster.DEFAULT.adjust( "com.trustpilot.connector.dynamodb.envelope")) From 6fb0de42893a99afe5589d72b3aa04094f4a9b7c Mon Sep 17 00:00:00 2001 From: gurjit-sandhu Date: Fri, 10 Jun 2022 16:59:53 -0500 Subject: [PATCH 08/23] updated test-cases and jar filename as per build updated test-cases and jar filename as per build --- build.gradle | 2 ++ .../connector/dynamodb/utils/RecordConverterTests.java | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index e1c3448..deb9f54 100644 --- a/build.gradle +++ b/build.gradle @@ -20,6 +20,8 @@ task userWrapper(type: Wrapper) { description = "kafka-connect-dynamodb" +shadowJar.archiveName = "${project.name}.jar" + allprojects { group "com.trustpilot.kafka.connect.dynamodb" version = gitVersion() diff --git a/source/src/test/java/com/trustpilot/connector/dynamodb/utils/RecordConverterTests.java b/source/src/test/java/com/trustpilot/connector/dynamodb/utils/RecordConverterTests.java index 3dd2ab9..4cac03f 100644 --- a/source/src/test/java/com/trustpilot/connector/dynamodb/utils/RecordConverterTests.java +++ b/source/src/test/java/com/trustpilot/connector/dynamodb/utils/RecordConverterTests.java @@ -88,7 +88,7 @@ public void correctTopicNameIsConstructed() throws Exception { ); // Assert - assertEquals("TestTopicPrefix-TestTable1", record.topic()); + assertEquals("TestTopicPrefix-", record.topic()); } @Test From 4d8f24afc1e85c398dbc111975ffbe21f2169af9 Mon Sep 17 00:00:00 2001 From: gurjit-sandhu Date: Sat, 10 Sep 2022 23:08:17 -0500 Subject: [PATCH 09/23] unMarshalling Dynamo json records --- .../connector/dynamodb/utils/RecordConverter.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java b/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java index c485951..788c6f1 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java @@ -3,6 +3,7 @@ import com.amazonaws.services.dynamodbv2.model.AttributeValue; import com.amazonaws.services.dynamodbv2.model.KeySchemaElement; import com.amazonaws.services.dynamodbv2.model.TableDescription; +import com.amazonaws.services.dynamodbv2.document.ItemUtils; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Strings; @@ -74,6 +75,9 @@ public SourceRecord toSourceRecord( LinkedHashMap::new )); + // getUnmarshallItems from Dynamo Document + Map unMarshalledItems = ItemUtils.toSimpleMapValue(attributes); + // Leveraging offsets to store shard and sequence number with each item pushed to Kafka. // This info will only be used to update `shardRegister` and won't be used to reset state after restart Map offsets = SourceInfo.toOffset(sourceInfo); @@ -101,7 +105,7 @@ public SourceRecord toSourceRecord( Struct valueData = new Struct(valueSchema) .put(Envelope.FieldName.VERSION, sourceInfo.version) - .put(Envelope.FieldName.DOCUMENT, objectMapper.writeValueAsString(sanitisedAttributes)) + .put(Envelope.FieldName.DOCUMENT, objectMapper.writeValueAsString(unMarshalledItems)) .put(Envelope.FieldName.SOURCE, SourceInfo.toStruct(sourceInfo)) .put(Envelope.FieldName.OPERATION, op.code()) .put(Envelope.FieldName.TIMESTAMP, arrivalTimestamp.toEpochMilli()); From 90e852b35623a85b19dba3ee6bfcf15070d33a3e Mon Sep 17 00:00:00 2001 From: gurjit-sandhu Date: Sun, 11 Sep 2022 19:22:40 -0500 Subject: [PATCH 10/23] updated test cases --- .../connector/dynamodb/DynamoDBSourceTaskTests.java | 6 +++--- .../connector/dynamodb/utils/RecordConverterTests.java | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java b/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java index c132506..6d88497 100644 --- a/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java +++ b/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java @@ -278,7 +278,7 @@ public void onInitSyncRunningPollReturnsScannedItemsBatch() throws InterruptedEx assertEquals(1, response.size()); assertEquals("r", ((Struct) response.get(0).value()).getString("op")); - assertEquals("{\"col2\":{\"s\":\"val1\"},\"col3\":{\"n\":\"1\"},\"col1\":{\"s\":\"key1\"}}", ((Struct) response.get(0).value()).getString("document")); + assertEquals(({"col2":"val1","col3":1,"col1":"key1"}), ((Struct) response.get(0).value()).getString("document")); assertEquals(InitSyncStatus.RUNNING, task.getSourceInfo().initSyncStatus); assertEquals(exclusiveStartKey, task.getSourceInfo().exclusiveStartKey); } @@ -560,8 +560,8 @@ public void onSyncPollReturnsReceivedRecords() throws InterruptedException { // Assert assertEquals(3, response.size()); - assertEquals("{\"col2\":{\"s\":\"val1\"},\"col3\":{\"n\":\"1\"},\"col1\":{\"s\":\"key1\"}}", ((Struct) response.get(0).value()).getString("document")); - assertEquals("{\"col1\":{\"s\":\"key2\"}}", ((Struct) response.get(1).value()).getString("document")); + assertEquals("{\"col2\":\"val1\",\"col3\":1,\"col1\":\"key1\"}", ((Struct) response.get(0).value()).getString("document")); + assertEquals("{\"col1\":\"key2\"}", ((Struct) response.get(1).value()).getString("document")); assertNull(response.get(2).value()); // tombstone } diff --git a/source/src/test/java/com/trustpilot/connector/dynamodb/utils/RecordConverterTests.java b/source/src/test/java/com/trustpilot/connector/dynamodb/utils/RecordConverterTests.java index 4cac03f..a87cc38 100644 --- a/source/src/test/java/com/trustpilot/connector/dynamodb/utils/RecordConverterTests.java +++ b/source/src/test/java/com/trustpilot/connector/dynamodb/utils/RecordConverterTests.java @@ -199,7 +199,7 @@ public void recordAttributesAreAddedToValueData() throws Exception { ); // Assert - assertEquals("{\"testKV1\":{\"s\":\"testKV1Value\"},\"testKV2\":{\"s\":\"2\"},\"testV2\":{\"s\":\"testStringValue\"},\"testV1\":{\"n\":\"1\"}}", + assertEquals("{\"testKV1\":\"testKV1Value\",\"testKV2\":\"2\",\"testV2\":\"testStringValue\",\"testV1\":1}", ((Struct) record.value()).getString("document")); } @@ -271,7 +271,7 @@ public void recordAttributesAreAddedToValueDataWhenAttributesContainsInvalidChar "testSequenceNumberID1" ); - String expected = "{\"test1234\":{\"s\":\"testKV1Value\"},\"_starts_with_underscore\":{\"n\":\"1\"},\"startswithnumber\":{\"s\":\"2\"},\"test\":{\"s\":\"testStringValue\"}}"; + String expected = "{\"test-1234\":\"testKV1Value\",\"_starts_with_underscore\":1,\"1-starts-with-number\":\"2\",\"test!@£$%^\":\"testStringValue\"}"; // Assert assertEquals(expected, From 5b04a64f2b02902abc9f7021bdde22193c81942a Mon Sep 17 00:00:00 2001 From: gurjit-sandhu Date: Mon, 12 Sep 2022 18:10:36 -0500 Subject: [PATCH 11/23] updated test cases --- .../trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java b/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java index 6d88497..54623cb 100644 --- a/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java +++ b/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java @@ -278,7 +278,7 @@ public void onInitSyncRunningPollReturnsScannedItemsBatch() throws InterruptedEx assertEquals(1, response.size()); assertEquals("r", ((Struct) response.get(0).value()).getString("op")); - assertEquals(({"col2":"val1","col3":1,"col1":"key1"}), ((Struct) response.get(0).value()).getString("document")); + assertEquals(("{\"col2\":\"val1\",\"col3\":1,\"col1\":\"key1\"}"), ((Struct) response.get(0).value()).getString("document")); assertEquals(InitSyncStatus.RUNNING, task.getSourceInfo().initSyncStatus); assertEquals(exclusiveStartKey, task.getSourceInfo().exclusiveStartKey); } From f6ed6e026bc1cc09777d388add9aa8555dc1272c Mon Sep 17 00:00:00 2001 From: gurjit-sandhu Date: Mon, 12 Sep 2022 19:10:52 -0500 Subject: [PATCH 12/23] updating format to JSON before pushing records to kafka updating format to JSON before pushing records to kafka and updated test cases --- .../dynamodb/utils/RecordConverter.java | 22 ++++++++++++-- .../dynamodb/DynamoDBSourceTaskTests.java | 29 +++++++++++++++++-- .../dynamodb/utils/RecordConverterTests.java | 28 +++++++++++++----- 3 files changed, 66 insertions(+), 13 deletions(-) diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java b/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java index 788c6f1..68c0274 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java @@ -21,6 +21,12 @@ import java.util.Map; import java.util.stream.Collectors; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonParser; +import com.google.gson.JsonObject; +import com.google.gson.JsonParseException; + import static java.util.stream.Collectors.toList; /** @@ -76,7 +82,19 @@ public SourceRecord toSourceRecord( )); // getUnmarshallItems from Dynamo Document - Map unMarshalledItems = ItemUtils.toSimpleMapValue(attributes); + //Map unMarshalledItems = ItemUtils.toSimpleMapValue(attributes); + + //JSON conversion + String outputJsonString = null; + try { + String jsonString = ItemUtils.toItem(attributes).toJSON(); + JsonObject jsonObject = new JsonParser().parse(jsonString).getAsJsonObject(); + Gson gson = new GsonBuilder().setPrettyPrinting().create(); + outputJsonString = gson.toJson(jsonObject); + } catch (JsonParseException e) { + e.printStackTrace(); + throw new Exception("Error Occured in JSON Parsing " + e.getMessage(), e); + } // Leveraging offsets to store shard and sequence number with each item pushed to Kafka. // This info will only be used to update `shardRegister` and won't be used to reset state after restart @@ -105,7 +123,7 @@ public SourceRecord toSourceRecord( Struct valueData = new Struct(valueSchema) .put(Envelope.FieldName.VERSION, sourceInfo.version) - .put(Envelope.FieldName.DOCUMENT, objectMapper.writeValueAsString(unMarshalledItems)) + .put(Envelope.FieldName.DOCUMENT, objectMapper.writeValueAsString(outputJsonString)) .put(Envelope.FieldName.SOURCE, SourceInfo.toStruct(sourceInfo)) .put(Envelope.FieldName.OPERATION, op.code()) .put(Envelope.FieldName.TIMESTAMP, arrivalTimestamp.toEpochMilli()); diff --git a/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java b/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java index 6d88497..d3ff394 100644 --- a/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java +++ b/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java @@ -23,6 +23,9 @@ import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; + import static org.junit.jupiter.api.Assertions.*; import static org.mockito.Mockito.*; @@ -272,13 +275,21 @@ public void onInitSyncRunningPollReturnsScannedItemsBatch() throws InterruptedEx task.start(configs); List response = task.poll(); + String expected = "\"{\\n \\\"col2\\\": \\\"val1\\\",\\n \\\"col3\\\": 1,\\n \\\"col1\\\": \\\"key1\\\"\\n}\""; + String actual = (((Struct) response.get(0).value()).getString("document")); + + // Converting both expected and actual to JSON string + Gson gson = new GsonBuilder().setPrettyPrinting().create(); + expected = gson.toJson(expected); + actual = gson.toJson(actual); + // Assert assertEquals(Instant.parse("2001-01-01T00:00:00.00Z"), task.getSourceInfo().lastInitSyncStart); assertEquals(1, task.getSourceInfo().initSyncCount); assertEquals(1, response.size()); assertEquals("r", ((Struct) response.get(0).value()).getString("op")); - assertEquals(({"col2":"val1","col3":1,"col1":"key1"}), ((Struct) response.get(0).value()).getString("document")); + assertEquals(expected , actual); assertEquals(InitSyncStatus.RUNNING, task.getSourceInfo().initSyncStatus); assertEquals(exclusiveStartKey, task.getSourceInfo().exclusiveStartKey); } @@ -557,11 +568,23 @@ public void onSyncPollReturnsReceivedRecords() throws InterruptedException { // Act task.start(configs); List response = task.poll(); + + String expected = "\"{\\n \\\"col2\\\": \\\"val1\\\",\\n \\\"col3\\\": 1,\\n \\\"col1\\\": \\\"key1\\\"\\n}\""; + String expected_document_key = "\"{\\n \\\"col1\\\": \\\"key2\\\"\\n}\""; + String actual = (((Struct) response.get(0).value()).getString("document")); + String actual_document_key = ((Struct) response.get(1).value()).getString("document"); + + // Converting both expected and actual to JSON string + Gson gson = new GsonBuilder().setPrettyPrinting().create(); + expected = gson.toJson(expected); + expected_document_key = gson.toJson(expected_document_key); + actual = gson.toJson(actual); + actual_document_key = gson.toJson(actual_document_key); // Assert assertEquals(3, response.size()); - assertEquals("{\"col2\":\"val1\",\"col3\":1,\"col1\":\"key1\"}", ((Struct) response.get(0).value()).getString("document")); - assertEquals("{\"col1\":\"key2\"}", ((Struct) response.get(1).value()).getString("document")); + assertEquals(expected, actual); + assertEquals(expected_document_key, actual_document_key); assertNull(response.get(2).value()); // tombstone } diff --git a/source/src/test/java/com/trustpilot/connector/dynamodb/utils/RecordConverterTests.java b/source/src/test/java/com/trustpilot/connector/dynamodb/utils/RecordConverterTests.java index a87cc38..ffd2caf 100644 --- a/source/src/test/java/com/trustpilot/connector/dynamodb/utils/RecordConverterTests.java +++ b/source/src/test/java/com/trustpilot/connector/dynamodb/utils/RecordConverterTests.java @@ -1,6 +1,3 @@ - - - package com.trustpilot.connector.dynamodb.utils; import com.amazonaws.services.dynamodbv2.model.AttributeValue; @@ -22,6 +19,9 @@ import java.util.List; import java.util.Map; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; + import static org.junit.jupiter.api.Assertions.assertEquals; @@ -198,9 +198,16 @@ public void recordAttributesAreAddedToValueData() throws Exception { "testSequenceNumberID1" ); + String expected = "\"{\\n \\\"testKV1\\\": \\\"testKV1Value\\\",\\n \\\"testKV2\\\": \\\"2\\\",\\n \\\"testV2\\\": \\\"testStringValue\\\",\\n \\\"testV1\\\": 1\\n}\""; + String actual = ((Struct) record.value()).getString("document"); + + // Converting both expected and actual to JSON string + Gson gson = new GsonBuilder().setPrettyPrinting().create(); + expected = gson.toJson(expected); + actual = gson.toJson(actual); + // Assert - assertEquals("{\"testKV1\":\"testKV1Value\",\"testKV2\":\"2\",\"testV2\":\"testStringValue\",\"testV1\":1}", - ((Struct) record.value()).getString("document")); + assertEquals(expected, actual); } @Test @@ -271,11 +278,16 @@ public void recordAttributesAreAddedToValueDataWhenAttributesContainsInvalidChar "testSequenceNumberID1" ); - String expected = "{\"test-1234\":\"testKV1Value\",\"_starts_with_underscore\":1,\"1-starts-with-number\":\"2\",\"test!@£$%^\":\"testStringValue\"}"; + String expected = "\"{\\n \\\"test-1234\\\": \\\"testKV1Value\\\",\\n \\\"_starts_with_underscore\\\": 1,\\n \\\"1-starts-with-number\\\": \\\"2\\\",\\n \\\"test!@£$%^\\\": \\\"testStringValue\\\"\\n}\""; + String actual = ((Struct) record.value()).getString("document"); + + // Converting both expected and actual to JSON string + Gson gson = new GsonBuilder().setPrettyPrinting().create(); + expected = gson.toJson(expected); + actual = gson.toJson(actual); // Assert - assertEquals(expected, - ((Struct) record.value()).getString("document")); + assertEquals(expected, actual); } @Test From 8cc06aa721ed13128fd5d8c980aa17edc8541f07 Mon Sep 17 00:00:00 2001 From: gurjit-sandhu Date: Tue, 13 Sep 2022 14:54:00 -0500 Subject: [PATCH 13/23] updating document to JSON and updating test cases --- .../dynamodb/utils/RecordConverter.java | 21 +------ .../dynamodb/DynamoDBSourceTaskTests.java | 40 +++++-------- .../dynamodb/utils/RecordConverterTests.java | 56 ++++++++----------- 3 files changed, 41 insertions(+), 76 deletions(-) diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java b/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java index 68c0274..781e62c 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java @@ -21,12 +21,6 @@ import java.util.Map; import java.util.stream.Collectors; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; -import com.google.gson.JsonParser; -import com.google.gson.JsonObject; -import com.google.gson.JsonParseException; - import static java.util.stream.Collectors.toList; /** @@ -84,17 +78,8 @@ public SourceRecord toSourceRecord( // getUnmarshallItems from Dynamo Document //Map unMarshalledItems = ItemUtils.toSimpleMapValue(attributes); - //JSON conversion - String outputJsonString = null; - try { - String jsonString = ItemUtils.toItem(attributes).toJSON(); - JsonObject jsonObject = new JsonParser().parse(jsonString).getAsJsonObject(); - Gson gson = new GsonBuilder().setPrettyPrinting().create(); - outputJsonString = gson.toJson(jsonObject); - } catch (JsonParseException e) { - e.printStackTrace(); - throw new Exception("Error Occured in JSON Parsing " + e.getMessage(), e); - } + //JSON conversion + String outputJsonString = ItemUtils.toItem(attributes).toJSON(); // Leveraging offsets to store shard and sequence number with each item pushed to Kafka. // This info will only be used to update `shardRegister` and won't be used to reset state after restart @@ -123,7 +108,7 @@ public SourceRecord toSourceRecord( Struct valueData = new Struct(valueSchema) .put(Envelope.FieldName.VERSION, sourceInfo.version) - .put(Envelope.FieldName.DOCUMENT, objectMapper.writeValueAsString(outputJsonString)) + .put(Envelope.FieldName.DOCUMENT, outputJsonString) // objectMapper.writeValueAsString(outputJsonString)) .put(Envelope.FieldName.SOURCE, SourceInfo.toStruct(sourceInfo)) .put(Envelope.FieldName.OPERATION, op.code()) .put(Envelope.FieldName.TIMESTAMP, arrivalTimestamp.toEpochMilli()); diff --git a/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java b/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java index d3ff394..b199e64 100644 --- a/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java +++ b/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java @@ -23,8 +23,8 @@ import java.util.*; import java.util.concurrent.ConcurrentHashMap; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; +import com.google.gson.JsonParser; +import com.google.gson.JsonObject; import static org.junit.jupiter.api.Assertions.*; import static org.mockito.Mockito.*; @@ -275,21 +275,16 @@ public void onInitSyncRunningPollReturnsScannedItemsBatch() throws InterruptedEx task.start(configs); List response = task.poll(); - String expected = "\"{\\n \\\"col2\\\": \\\"val1\\\",\\n \\\"col3\\\": 1,\\n \\\"col1\\\": \\\"key1\\\"\\n}\""; - String actual = (((Struct) response.get(0).value()).getString("document")); - - // Converting both expected and actual to JSON string - Gson gson = new GsonBuilder().setPrettyPrinting().create(); - expected = gson.toJson(expected); - actual = gson.toJson(actual); - // Assert assertEquals(Instant.parse("2001-01-01T00:00:00.00Z"), task.getSourceInfo().lastInitSyncStart); assertEquals(1, task.getSourceInfo().initSyncCount); + String expected = "{col2:val1,col3:1,col1:key1}"; + JsonObject expectedJson = new JsonParser().parse(expected).getAsJsonObject(); + assertEquals(1, response.size()); assertEquals("r", ((Struct) response.get(0).value()).getString("op")); - assertEquals(expected , actual); + assertEquals(expectedJson.toString(), ((Struct) response.get(0).value()).getString("document")); assertEquals(InitSyncStatus.RUNNING, task.getSourceInfo().initSyncStatus); assertEquals(exclusiveStartKey, task.getSourceInfo().exclusiveStartKey); } @@ -568,23 +563,16 @@ public void onSyncPollReturnsReceivedRecords() throws InterruptedException { // Act task.start(configs); List response = task.poll(); - - String expected = "\"{\\n \\\"col2\\\": \\\"val1\\\",\\n \\\"col3\\\": 1,\\n \\\"col1\\\": \\\"key1\\\"\\n}\""; - String expected_document_key = "\"{\\n \\\"col1\\\": \\\"key2\\\"\\n}\""; - String actual = (((Struct) response.get(0).value()).getString("document")); - String actual_document_key = ((Struct) response.get(1).value()).getString("document"); - - // Converting both expected and actual to JSON string - Gson gson = new GsonBuilder().setPrettyPrinting().create(); - expected = gson.toJson(expected); - expected_document_key = gson.toJson(expected_document_key); - actual = gson.toJson(actual); - actual_document_key = gson.toJson(actual_document_key); + + String expected = "{col2:val1,col3:1,col1:key1}"; + String expectedKey = "{col1:key2}"; + JsonObject expectedJson = new JsonParser().parse(expected).getAsJsonObject(); + JsonObject expectedKeyJson = new JsonParser().parse(expectedKey).getAsJsonObject(); // Assert assertEquals(3, response.size()); - assertEquals(expected, actual); - assertEquals(expected_document_key, actual_document_key); + assertEquals(expectedJson.toString(), ((Struct) response.get(0).value()).getString("document")); + assertEquals(expectedKeyJson.toString(), ((Struct) response.get(1).value()).getString("document")); assertNull(response.get(2).value()); // tombstone } @@ -898,4 +886,4 @@ public void onCommitIgnoreRecordsWithoutSequenceNumber() throws InterruptedExcep assertEquals("", shardRegister.get("shard1").getLastCommittedRecordSeqNo()); } -} +} \ No newline at end of file diff --git a/source/src/test/java/com/trustpilot/connector/dynamodb/utils/RecordConverterTests.java b/source/src/test/java/com/trustpilot/connector/dynamodb/utils/RecordConverterTests.java index ffd2caf..ee01a30 100644 --- a/source/src/test/java/com/trustpilot/connector/dynamodb/utils/RecordConverterTests.java +++ b/source/src/test/java/com/trustpilot/connector/dynamodb/utils/RecordConverterTests.java @@ -19,8 +19,8 @@ import java.util.List; import java.util.Map; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; +import com.google.gson.JsonParser; +import com.google.gson.JsonObject; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -75,7 +75,7 @@ private SourceInfo getSourceInfo(String table) { @Test public void correctTopicNameIsConstructed() throws Exception { // Arrange - RecordConverter converter = new RecordConverter(getTableDescription(null), "TestTopicPrefix-"); + RecordConverter converter = new RecordConverter(getTableDescription(null), "TestTopicPrefix"); // Act SourceRecord record = converter.toSourceRecord( @@ -88,13 +88,13 @@ public void correctTopicNameIsConstructed() throws Exception { ); // Assert - assertEquals("TestTopicPrefix-", record.topic()); + assertEquals("TestTopicPrefix", record.topic()); } @Test public void sourceInfoIsPutToOffset() throws Exception { // Arrange - RecordConverter converter = new RecordConverter(getTableDescription(null), "TestTopicPrefix-"); + RecordConverter converter = new RecordConverter(getTableDescription(null), "TestTopicPrefix"); // Act SourceRecord record = converter.toSourceRecord( @@ -113,7 +113,7 @@ public void sourceInfoIsPutToOffset() throws Exception { @Test public void shardIdAndSequenceNumberIsPutToOffset() throws Exception { // Arrange - RecordConverter converter = new RecordConverter(getTableDescription(null), "TestTopicPrefix-"); + RecordConverter converter = new RecordConverter(getTableDescription(null), "TestTopicPrefix"); // Act SourceRecord record = converter.toSourceRecord( @@ -136,7 +136,7 @@ public void singleItemKeyIsAddedToRecord() throws Exception { List keySchema = new LinkedList<>(); keySchema.add(new KeySchemaElement().withKeyType("S").withAttributeName("testKV1")); - RecordConverter converter = new RecordConverter(getTableDescription(keySchema), "TestTopicPrefix-"); + RecordConverter converter = new RecordConverter(getTableDescription(keySchema), "TestTopicPrefix"); // Act SourceRecord record = converter.toSourceRecord( @@ -161,7 +161,7 @@ public void multiItemKeyIsAddedToRecord() throws Exception { keySchema.add(new KeySchemaElement().withKeyType("S").withAttributeName("testKV1")); keySchema.add(new KeySchemaElement().withKeyType("N").withAttributeName("testKV2")); - RecordConverter converter = new RecordConverter(getTableDescription(keySchema), "TestTopicPrefix-"); + RecordConverter converter = new RecordConverter(getTableDescription(keySchema), "TestTopicPrefix"); // Act SourceRecord record = converter.toSourceRecord( @@ -186,7 +186,7 @@ public void multiItemKeyIsAddedToRecord() throws Exception { @Test public void recordAttributesAreAddedToValueData() throws Exception { // Arrange - RecordConverter converter = new RecordConverter(getTableDescription(null), "TestTopicPrefix-"); + RecordConverter converter = new RecordConverter(getTableDescription(null), "TestTopicPrefix"); // Act SourceRecord record = converter.toSourceRecord( @@ -198,16 +198,12 @@ public void recordAttributesAreAddedToValueData() throws Exception { "testSequenceNumberID1" ); - String expected = "\"{\\n \\\"testKV1\\\": \\\"testKV1Value\\\",\\n \\\"testKV2\\\": \\\"2\\\",\\n \\\"testV2\\\": \\\"testStringValue\\\",\\n \\\"testV1\\\": 1\\n}\""; - String actual = ((Struct) record.value()).getString("document"); - - // Converting both expected and actual to JSON string - Gson gson = new GsonBuilder().setPrettyPrinting().create(); - expected = gson.toJson(expected); - actual = gson.toJson(actual); + String expected = "{testKV1:testKV1Value,testKV2:'2',testV2:testStringValue,testV1:1}"; + JsonObject expectedJson = new JsonParser().parse(expected).getAsJsonObject(); // Assert - assertEquals(expected, actual); + assertEquals(expectedJson.toString(), + ((Struct) record.value()).getString("document")); } @Test @@ -216,7 +212,7 @@ public void singleItemKeyIsAddedToRecordWhenKeyContainsInvalidCharacters() throw List keySchema = new LinkedList<>(); keySchema.add(new KeySchemaElement().withKeyType("S").withAttributeName("test-1234")); - RecordConverter converter = new RecordConverter(getTableDescription(keySchema), "TestTopicPrefix-"); + RecordConverter converter = new RecordConverter(getTableDescription(keySchema), "TestTopicPrefix"); // Act SourceRecord record = converter.toSourceRecord( @@ -241,7 +237,7 @@ public void multiItemKeyIsAddedToRecordWhenKeyContainsInvalidCharacters() throws keySchema.add(new KeySchemaElement().withKeyType("S").withAttributeName("test-1234")); keySchema.add(new KeySchemaElement().withKeyType("N").withAttributeName("1-starts-with-number")); - RecordConverter converter = new RecordConverter(getTableDescription(keySchema), "TestTopicPrefix-"); + RecordConverter converter = new RecordConverter(getTableDescription(keySchema), "TestTopicPrefix"); // Act SourceRecord record = converter.toSourceRecord( @@ -266,7 +262,7 @@ public void multiItemKeyIsAddedToRecordWhenKeyContainsInvalidCharacters() throws @Test public void recordAttributesAreAddedToValueDataWhenAttributesContainsInvalidCharacters() throws Exception { // Arrange - RecordConverter converter = new RecordConverter(getTableDescription(null), "TestTopicPrefix-"); + RecordConverter converter = new RecordConverter(getTableDescription(null), "TestTopicPrefix"); // Act SourceRecord record = converter.toSourceRecord( @@ -278,22 +274,18 @@ public void recordAttributesAreAddedToValueDataWhenAttributesContainsInvalidChar "testSequenceNumberID1" ); - String expected = "\"{\\n \\\"test-1234\\\": \\\"testKV1Value\\\",\\n \\\"_starts_with_underscore\\\": 1,\\n \\\"1-starts-with-number\\\": \\\"2\\\",\\n \\\"test!@£$%^\\\": \\\"testStringValue\\\"\\n}\""; - String actual = ((Struct) record.value()).getString("document"); - - // Converting both expected and actual to JSON string - Gson gson = new GsonBuilder().setPrettyPrinting().create(); - expected = gson.toJson(expected); - actual = gson.toJson(actual); + String expected = "{test-1234:testKV1Value,_starts_with_underscore:1,1-starts-with-number:'2',test!@£$%^:testStringValue}"; + JsonObject expectedJson = new JsonParser().parse(expected).getAsJsonObject(); // Assert - assertEquals(expected, actual); + assertEquals(expectedJson.toString(), + ((Struct) record.value()).getString("document")); } @Test public void sourceInfoIsAddedToValueData() throws Exception { // Arrange - RecordConverter converter = new RecordConverter(getTableDescription(null), "TestTopicPrefix-"); + RecordConverter converter = new RecordConverter(getTableDescription(null), "TestTopicPrefix"); // Act SourceRecord record = converter.toSourceRecord( @@ -315,7 +307,7 @@ public void sourceInfoIsAddedToValueData() throws Exception { @Test public void operationIsAddedToValueData() throws Exception { // Arrange - RecordConverter converter = new RecordConverter(getTableDescription(null), "TestTopicPrefix-"); + RecordConverter converter = new RecordConverter(getTableDescription(null), "TestTopicPrefix"); // Act SourceRecord record = converter.toSourceRecord( @@ -334,7 +326,7 @@ public void operationIsAddedToValueData() throws Exception { @Test public void arrivalTimestampIsAddedToValueData() throws Exception { // Arrange - RecordConverter converter = new RecordConverter(getTableDescription(null), "TestTopicPrefix-"); + RecordConverter converter = new RecordConverter(getTableDescription(null), "TestTopicPrefix"); // Act SourceRecord record = converter.toSourceRecord( @@ -350,4 +342,4 @@ public void arrivalTimestampIsAddedToValueData() throws Exception { assertEquals(978393600000L, ((Struct) record.value()).getInt64("ts_ms")); } -} +} \ No newline at end of file From ca565cae943224a3fbbf936315624543443f8e80 Mon Sep 17 00:00:00 2001 From: gurjit-sandhu Date: Wed, 14 Sep 2022 10:11:30 -0500 Subject: [PATCH 14/23] updated to send unmarshalled record with schema to kafka topic and updated test cases --- .../dynamodb/utils/RecordConverter.java | 88 ++++++++++++++++--- .../dynamodb/DynamoDBSourceTaskTests.java | 66 +++++++++++--- .../dynamodb/utils/RecordConverterTests.java | 65 +++++++++++--- 3 files changed, 184 insertions(+), 35 deletions(-) diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java b/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java index 781e62c..d7e7284 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java @@ -19,6 +19,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.HashMap; import java.util.stream.Collectors; import static java.util.stream.Collectors.toList; @@ -40,22 +41,14 @@ public class RecordConverter { private final TableDescription tableDesc; private final String topic_name; private Schema keySchema; - private final Schema valueSchema; + private Schema valueSchema; private List keys; public RecordConverter(TableDescription tableDesc, String topicNamePrefix) { this.tableDesc = tableDesc; - this.topic_name = topicNamePrefix; + this.topic_name = topicNamePrefix; - valueSchema = SchemaBuilder.struct() - .name(SchemaNameAdjuster.DEFAULT.adjust( "com.trustpilot.connector.dynamodb.envelope")) - .field(Envelope.FieldName.VERSION, Schema.STRING_SCHEMA) - .field(Envelope.FieldName.DOCUMENT, DynamoDbJson.schema()) - .field(Envelope.FieldName.SOURCE, SourceInfo.structSchema()) - .field(Envelope.FieldName.OPERATION, Schema.STRING_SCHEMA) - .field(Envelope.FieldName.TIMESTAMP, Schema.INT64_SCHEMA) - .build(); } public SourceRecord toSourceRecord( @@ -79,7 +72,17 @@ public SourceRecord toSourceRecord( //Map unMarshalledItems = ItemUtils.toSimpleMapValue(attributes); //JSON conversion - String outputJsonString = ItemUtils.toItem(attributes).toJSON(); + //String outputJsonString = ItemUtils.toItem(attributes).toJSON(); + Struct dynamoAttributes = getAttributeValueStruct(sanitisedAttributes); + + valueSchema = SchemaBuilder.struct() + .name(SchemaNameAdjuster.DEFAULT.adjust( "com.trustpilot.connector.dynamodb.envelope")) + .field(Envelope.FieldName.VERSION, Schema.STRING_SCHEMA) + .field(Envelope.FieldName.DOCUMENT, getAttributeValueSchema(sanitisedAttributes)) + .field(Envelope.FieldName.SOURCE, SourceInfo.structSchema()) + .field(Envelope.FieldName.OPERATION, Schema.STRING_SCHEMA) + .field(Envelope.FieldName.TIMESTAMP, Schema.INT64_SCHEMA) + .build(); // Leveraging offsets to store shard and sequence number with each item pushed to Kafka. // This info will only be used to update `shardRegister` and won't be used to reset state after restart @@ -108,7 +111,7 @@ public SourceRecord toSourceRecord( Struct valueData = new Struct(valueSchema) .put(Envelope.FieldName.VERSION, sourceInfo.version) - .put(Envelope.FieldName.DOCUMENT, outputJsonString) // objectMapper.writeValueAsString(outputJsonString)) + .put(Envelope.FieldName.DOCUMENT, dynamoAttributes) // objectMapper.writeValueAsString(outputJsonString)) .put(Envelope.FieldName.SOURCE, SourceInfo.toStruct(sourceInfo)) .put(Envelope.FieldName.OPERATION, op.code()) .put(Envelope.FieldName.TIMESTAMP, arrivalTimestamp.toEpochMilli()); @@ -145,4 +148,65 @@ private String sanitiseAttributeName(final String attributeName) { return sanitisedAttributeName; } + + public static Struct getAttributeValueStruct(Map attributes) { + final Struct attributeValueStruct = new Struct(getAttributeValueSchema(attributes)); + + // Mapping dynamo db attributes to schema registry types (dynamo db attributes are documented at below link) + //https://github.com/aws/aws-sdk-java/blob/master/aws-java-sdk-dynamodb/src/main/java/com/amazonaws/services/dynamodbv2/model/AttributeValue.java + + for (Map.Entry attribute : attributes.entrySet()) { + final String attributeName = attribute.getKey(); + final AttributeValue attributeValue = attribute.getValue(); + if (attributeValue.getS() != null) { + attributeValueStruct.put(attributeName, attributeValue.getS()); + } else if (attributeValue.getN() != null) { + attributeValueStruct.put(attributeName, attributeValue.getN()); + } else if (attributeValue.getB() != null) { + attributeValueStruct.put(attributeName, attributeValue.getB()); + } else if (attributeValue.getSS() != null) { + attributeValueStruct.put(attributeName, attributeValue.getSS()); + } else if (attributeValue.getNS() != null) { + attributeValueStruct.put(attributeName, attributeValue.getNS()); + } else if (attributeValue.getBS() != null) { + attributeValueStruct.put(attributeName, attributeValue.getBS()); + } else if (attributeValue.getNULL() != null) { + attributeValueStruct.put(attributeName, attributeValue.getNULL()); + } else if (attributeValue.getBOOL() != null) { + attributeValueStruct.put(attributeName, attributeValue.getBOOL()); + } + } + return attributeValueStruct; + } + + public static Schema getAttributeValueSchema(Map attributes) { + SchemaBuilder RECORD_ATTRIBUTES_SCHEMA = SchemaBuilder.struct().name("DynamoDB.AttributeValue"); + + // Mapping dynamo db attributes to schema registry types (dynamo db attributes are documented at below link) + //https://github.com/aws/aws-sdk-java/blob/master/aws-java-sdk-dynamodb/src/main/java/com/amazonaws/services/dynamodbv2/model/AttributeValue.java + + for (Map.Entry attribute : attributes.entrySet()) { + final String attributeName = attribute.getKey(); + final AttributeValue attributeValue = attribute.getValue(); + if (attributeValue.getS() != null) { + RECORD_ATTRIBUTES_SCHEMA.field(attributeName, Schema.STRING_SCHEMA); + } else if (attributeValue.getN() != null) { + RECORD_ATTRIBUTES_SCHEMA.field(attributeName, Schema.STRING_SCHEMA); + } else if (attributeValue.getB() != null) { + RECORD_ATTRIBUTES_SCHEMA.field(attributeName, Schema.BYTES_SCHEMA); + } else if (attributeValue.getSS() != null) { + RECORD_ATTRIBUTES_SCHEMA.field(attributeName, SchemaBuilder.array(Schema.STRING_SCHEMA)); + } else if (attributeValue.getNS() != null) { + RECORD_ATTRIBUTES_SCHEMA.field(attributeName, SchemaBuilder.array(Schema.STRING_SCHEMA)); + } else if (attributeValue.getBS() != null) { + RECORD_ATTRIBUTES_SCHEMA.field(attributeName, SchemaBuilder.array(Schema.BYTES_SCHEMA)); + } else if (attributeValue.getNULL() != null) { + RECORD_ATTRIBUTES_SCHEMA.field(attributeName, Schema.BOOLEAN_SCHEMA); + } else if (attributeValue.getBOOL() != null) { + RECORD_ATTRIBUTES_SCHEMA.field(attributeName, Schema.BOOLEAN_SCHEMA); + } + } + return RECORD_ATTRIBUTES_SCHEMA.build(); + } + } diff --git a/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java b/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java index b199e64..b263f34 100644 --- a/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java +++ b/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java @@ -23,8 +23,9 @@ import java.util.*; import java.util.concurrent.ConcurrentHashMap; -import com.google.gson.JsonParser; -import com.google.gson.JsonObject; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; import static org.junit.jupiter.api.Assertions.*; import static org.mockito.Mockito.*; @@ -279,12 +280,22 @@ public void onInitSyncRunningPollReturnsScannedItemsBatch() throws InterruptedEx assertEquals(Instant.parse("2001-01-01T00:00:00.00Z"), task.getSourceInfo().lastInitSyncStart); assertEquals(1, task.getSourceInfo().initSyncCount); - String expected = "{col2:val1,col3:1,col1:key1}"; - JsonObject expectedJson = new JsonParser().parse(expected).getAsJsonObject(); + final Schema expectedDocumentSchema = SchemaBuilder.struct().name("DynamoDB.AttributeValue") + .field("col2", Schema.STRING_SCHEMA) + .field("col3", Schema.STRING_SCHEMA) + .field("col1", Schema.STRING_SCHEMA) + .build(); + + final Struct expectedDocument = new Struct(expectedDocumentSchema) + .put("col2","val1") + .put("col3","1") + .put("col1","key1"); + + Struct actualDocument = ((Struct) response.get(0).value()).getStruct("document") ; assertEquals(1, response.size()); - assertEquals("r", ((Struct) response.get(0).value()).getString("op")); - assertEquals(expectedJson.toString(), ((Struct) response.get(0).value()).getString("document")); + assertEquals("r", ((Struct) response.get(0).value()).getString("op")); + compareStructs(expectedDocument, actualDocument); assertEquals(InitSyncStatus.RUNNING, task.getSourceInfo().initSyncStatus); assertEquals(exclusiveStartKey, task.getSourceInfo().exclusiveStartKey); } @@ -564,15 +575,30 @@ public void onSyncPollReturnsReceivedRecords() throws InterruptedException { task.start(configs); List response = task.poll(); - String expected = "{col2:val1,col3:1,col1:key1}"; - String expectedKey = "{col1:key2}"; - JsonObject expectedJson = new JsonParser().parse(expected).getAsJsonObject(); - JsonObject expectedKeyJson = new JsonParser().parse(expectedKey).getAsJsonObject(); + final Schema expectedDocumentSchema = SchemaBuilder.struct().name("DynamoDB.AttributeValue") + .field("col2", Schema.STRING_SCHEMA) + .field("col3", Schema.STRING_SCHEMA) + .field("col1", Schema.STRING_SCHEMA) + .build(); + final Struct expectedDocument = new Struct(expectedDocumentSchema) + .put("col2","val1") + .put("col3","1") + .put("col1","key1"); + + final Schema expectedDocumentColSchema = SchemaBuilder.struct().name("DynamoDB.AttributeValue") + .field("col1", Schema.STRING_SCHEMA) + .build(); + + final Struct expectedDocColValue = new Struct(expectedDocumentColSchema) + .put("col1","key2"); + + Struct actualDocument = ((Struct) response.get(0).value()).getStruct("document") ; + Struct actualDocumentCol = ((Struct) response.get(1).value()).getStruct("document"); // Assert - assertEquals(3, response.size()); - assertEquals(expectedJson.toString(), ((Struct) response.get(0).value()).getString("document")); - assertEquals(expectedKeyJson.toString(), ((Struct) response.get(1).value()).getString("document")); + assertEquals(3, response.size()); + compareStructs(expectedDocument, actualDocument); + compareStructs(expectedDocColValue, actualDocumentCol); assertNull(response.get(2).value()); // tombstone } @@ -886,4 +912,18 @@ public void onCommitIgnoreRecordsWithoutSequenceNumber() throws InterruptedExcep assertEquals("", shardRegister.get("shard1").getLastCommittedRecordSeqNo()); } + public void compareStructs(Struct expectedStruct , Struct actualStruct) { + + // comparing schema for both struct + if (!Objects.equals(expectedStruct.schema(), actualStruct.schema())) { + fail("Schema expected " + expectedStruct.schema().fields() + " but actual " + actualStruct.schema().fields()); + } + + // comparing all fields for both struct + for (Field expectedFieldName : expectedStruct.schema().fields()) { + Field actualFieldName = actualStruct.schema().field(expectedFieldName.name()); + assertEquals(expectedStruct.get(expectedFieldName), actualStruct.get(actualFieldName)); + } + + } } \ No newline at end of file diff --git a/source/src/test/java/com/trustpilot/connector/dynamodb/utils/RecordConverterTests.java b/source/src/test/java/com/trustpilot/connector/dynamodb/utils/RecordConverterTests.java index ee01a30..0d8b829 100644 --- a/source/src/test/java/com/trustpilot/connector/dynamodb/utils/RecordConverterTests.java +++ b/source/src/test/java/com/trustpilot/connector/dynamodb/utils/RecordConverterTests.java @@ -18,11 +18,14 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Objects; -import com.google.gson.JsonParser; -import com.google.gson.JsonObject; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.*; @SuppressWarnings("SameParameterValue") @@ -198,12 +201,27 @@ public void recordAttributesAreAddedToValueData() throws Exception { "testSequenceNumberID1" ); - String expected = "{testKV1:testKV1Value,testKV2:'2',testV2:testStringValue,testV1:1}"; - JsonObject expectedJson = new JsonParser().parse(expected).getAsJsonObject(); + //String expected = "{testKV1:testKV1Value,testKV2:'2',testV2:testStringValue,testV1:1}"; + //JsonObject expectedJson = new JsonParser().parse(expected).getAsJsonObject(); + + final Schema expectedDocumentSchema = SchemaBuilder.struct().name("DynamoDB.AttributeValue") + .field("testKV1", Schema.STRING_SCHEMA) + .field("testKV2", Schema.STRING_SCHEMA) + .field("testV2", Schema.STRING_SCHEMA) + .field("testV1", Schema.STRING_SCHEMA) + .build(); + + final Struct expectedDocument = new Struct(expectedDocumentSchema) + .put("testKV1","testKV1Value") + .put("testKV2","2") + .put("testV2","testStringValue") + .put("testV1","1"); + + Struct actualDocument = ((Struct) record.value()).getStruct("document") ; // Assert - assertEquals(expectedJson.toString(), - ((Struct) record.value()).getString("document")); + //assertEquals(expectedJson.toString(), ((Struct) record.value()).getString("document")); + compareStructs(expectedDocument, actualDocument); } @Test @@ -274,12 +292,25 @@ public void recordAttributesAreAddedToValueDataWhenAttributesContainsInvalidChar "testSequenceNumberID1" ); - String expected = "{test-1234:testKV1Value,_starts_with_underscore:1,1-starts-with-number:'2',test!@£$%^:testStringValue}"; - JsonObject expectedJson = new JsonParser().parse(expected).getAsJsonObject(); + //String expected = "{test-1234:testKV1Value,_starts_with_underscore:1,1-starts-with-number:'2',test!@£$%^:testStringValue}"; + //JsonObject expectedJson = new JsonParser().parse(expected).getAsJsonObject(); + + final Schema expectedDocumentSchema = SchemaBuilder.struct().name("DynamoDB.AttributeValue") + .field("test1234", Schema.STRING_SCHEMA) + .field("_starts_with_underscore", Schema.STRING_SCHEMA) + .field("startswithnumber", Schema.STRING_SCHEMA) + .field("test", Schema.STRING_SCHEMA) + .build(); + final Struct expectedDocument = new Struct(expectedDocumentSchema) + .put("test1234","testKV1Value") + .put("_starts_with_underscore","1") + .put("startswithnumber","2") + .put("test","testStringValue"); + + Struct actualDocument = ((Struct) record.value()).getStruct("document") ; // Assert - assertEquals(expectedJson.toString(), - ((Struct) record.value()).getString("document")); + compareStructs(expectedDocument, actualDocument); } @Test @@ -342,4 +373,18 @@ public void arrivalTimestampIsAddedToValueData() throws Exception { assertEquals(978393600000L, ((Struct) record.value()).getInt64("ts_ms")); } + public void compareStructs(Struct expectedStruct , Struct actualStruct) { + + // comparing schema for both struct + if (!Objects.equals(expectedStruct.schema(), actualStruct.schema())) { + fail("Schema expected " + expectedStruct.schema().fields() + " but actual " + actualStruct.schema().fields()); + } + + // comparing all fields for both struct + for (Field expectedFieldName : expectedStruct.schema().fields()) { + Field actualFieldName = actualStruct.schema().field(expectedFieldName.name()); + assertEquals(expectedStruct.get(expectedFieldName), actualStruct.get(actualFieldName)); + } + + } } \ No newline at end of file From b3686e21f56b108165cb4c19fea495f25d429bc6 Mon Sep 17 00:00:00 2001 From: gurjit-sandhu Date: Wed, 14 Sep 2022 16:46:57 -0500 Subject: [PATCH 15/23] unMarshalling dynamo documents before sending to kafka topic --- .../dynamodb/utils/RecordConverter.java | 91 +++--------------- .../dynamodb/DynamoDBSourceTaskTests.java | 42 +-------- .../dynamodb/utils/RecordConverterTests.java | 93 +++++-------------- 3 files changed, 37 insertions(+), 189 deletions(-) diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java b/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java index d7e7284..788c6f1 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java @@ -19,7 +19,6 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.HashMap; import java.util.stream.Collectors; import static java.util.stream.Collectors.toList; @@ -41,14 +40,22 @@ public class RecordConverter { private final TableDescription tableDesc; private final String topic_name; private Schema keySchema; - private Schema valueSchema; + private final Schema valueSchema; private List keys; public RecordConverter(TableDescription tableDesc, String topicNamePrefix) { this.tableDesc = tableDesc; - this.topic_name = topicNamePrefix; + this.topic_name = topicNamePrefix; + valueSchema = SchemaBuilder.struct() + .name(SchemaNameAdjuster.DEFAULT.adjust( "com.trustpilot.connector.dynamodb.envelope")) + .field(Envelope.FieldName.VERSION, Schema.STRING_SCHEMA) + .field(Envelope.FieldName.DOCUMENT, DynamoDbJson.schema()) + .field(Envelope.FieldName.SOURCE, SourceInfo.structSchema()) + .field(Envelope.FieldName.OPERATION, Schema.STRING_SCHEMA) + .field(Envelope.FieldName.TIMESTAMP, Schema.INT64_SCHEMA) + .build(); } public SourceRecord toSourceRecord( @@ -69,20 +76,7 @@ public SourceRecord toSourceRecord( )); // getUnmarshallItems from Dynamo Document - //Map unMarshalledItems = ItemUtils.toSimpleMapValue(attributes); - - //JSON conversion - //String outputJsonString = ItemUtils.toItem(attributes).toJSON(); - Struct dynamoAttributes = getAttributeValueStruct(sanitisedAttributes); - - valueSchema = SchemaBuilder.struct() - .name(SchemaNameAdjuster.DEFAULT.adjust( "com.trustpilot.connector.dynamodb.envelope")) - .field(Envelope.FieldName.VERSION, Schema.STRING_SCHEMA) - .field(Envelope.FieldName.DOCUMENT, getAttributeValueSchema(sanitisedAttributes)) - .field(Envelope.FieldName.SOURCE, SourceInfo.structSchema()) - .field(Envelope.FieldName.OPERATION, Schema.STRING_SCHEMA) - .field(Envelope.FieldName.TIMESTAMP, Schema.INT64_SCHEMA) - .build(); + Map unMarshalledItems = ItemUtils.toSimpleMapValue(attributes); // Leveraging offsets to store shard and sequence number with each item pushed to Kafka. // This info will only be used to update `shardRegister` and won't be used to reset state after restart @@ -111,7 +105,7 @@ public SourceRecord toSourceRecord( Struct valueData = new Struct(valueSchema) .put(Envelope.FieldName.VERSION, sourceInfo.version) - .put(Envelope.FieldName.DOCUMENT, dynamoAttributes) // objectMapper.writeValueAsString(outputJsonString)) + .put(Envelope.FieldName.DOCUMENT, objectMapper.writeValueAsString(unMarshalledItems)) .put(Envelope.FieldName.SOURCE, SourceInfo.toStruct(sourceInfo)) .put(Envelope.FieldName.OPERATION, op.code()) .put(Envelope.FieldName.TIMESTAMP, arrivalTimestamp.toEpochMilli()); @@ -148,65 +142,4 @@ private String sanitiseAttributeName(final String attributeName) { return sanitisedAttributeName; } - - public static Struct getAttributeValueStruct(Map attributes) { - final Struct attributeValueStruct = new Struct(getAttributeValueSchema(attributes)); - - // Mapping dynamo db attributes to schema registry types (dynamo db attributes are documented at below link) - //https://github.com/aws/aws-sdk-java/blob/master/aws-java-sdk-dynamodb/src/main/java/com/amazonaws/services/dynamodbv2/model/AttributeValue.java - - for (Map.Entry attribute : attributes.entrySet()) { - final String attributeName = attribute.getKey(); - final AttributeValue attributeValue = attribute.getValue(); - if (attributeValue.getS() != null) { - attributeValueStruct.put(attributeName, attributeValue.getS()); - } else if (attributeValue.getN() != null) { - attributeValueStruct.put(attributeName, attributeValue.getN()); - } else if (attributeValue.getB() != null) { - attributeValueStruct.put(attributeName, attributeValue.getB()); - } else if (attributeValue.getSS() != null) { - attributeValueStruct.put(attributeName, attributeValue.getSS()); - } else if (attributeValue.getNS() != null) { - attributeValueStruct.put(attributeName, attributeValue.getNS()); - } else if (attributeValue.getBS() != null) { - attributeValueStruct.put(attributeName, attributeValue.getBS()); - } else if (attributeValue.getNULL() != null) { - attributeValueStruct.put(attributeName, attributeValue.getNULL()); - } else if (attributeValue.getBOOL() != null) { - attributeValueStruct.put(attributeName, attributeValue.getBOOL()); - } - } - return attributeValueStruct; - } - - public static Schema getAttributeValueSchema(Map attributes) { - SchemaBuilder RECORD_ATTRIBUTES_SCHEMA = SchemaBuilder.struct().name("DynamoDB.AttributeValue"); - - // Mapping dynamo db attributes to schema registry types (dynamo db attributes are documented at below link) - //https://github.com/aws/aws-sdk-java/blob/master/aws-java-sdk-dynamodb/src/main/java/com/amazonaws/services/dynamodbv2/model/AttributeValue.java - - for (Map.Entry attribute : attributes.entrySet()) { - final String attributeName = attribute.getKey(); - final AttributeValue attributeValue = attribute.getValue(); - if (attributeValue.getS() != null) { - RECORD_ATTRIBUTES_SCHEMA.field(attributeName, Schema.STRING_SCHEMA); - } else if (attributeValue.getN() != null) { - RECORD_ATTRIBUTES_SCHEMA.field(attributeName, Schema.STRING_SCHEMA); - } else if (attributeValue.getB() != null) { - RECORD_ATTRIBUTES_SCHEMA.field(attributeName, Schema.BYTES_SCHEMA); - } else if (attributeValue.getSS() != null) { - RECORD_ATTRIBUTES_SCHEMA.field(attributeName, SchemaBuilder.array(Schema.STRING_SCHEMA)); - } else if (attributeValue.getNS() != null) { - RECORD_ATTRIBUTES_SCHEMA.field(attributeName, SchemaBuilder.array(Schema.STRING_SCHEMA)); - } else if (attributeValue.getBS() != null) { - RECORD_ATTRIBUTES_SCHEMA.field(attributeName, SchemaBuilder.array(Schema.BYTES_SCHEMA)); - } else if (attributeValue.getNULL() != null) { - RECORD_ATTRIBUTES_SCHEMA.field(attributeName, Schema.BOOLEAN_SCHEMA); - } else if (attributeValue.getBOOL() != null) { - RECORD_ATTRIBUTES_SCHEMA.field(attributeName, Schema.BOOLEAN_SCHEMA); - } - } - return RECORD_ATTRIBUTES_SCHEMA.build(); - } - } diff --git a/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java b/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java index 6ed6236..54623cb 100644 --- a/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java +++ b/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java @@ -23,10 +23,6 @@ import java.util.*; import java.util.concurrent.ConcurrentHashMap; -import org.apache.kafka.connect.data.Field; -import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.data.SchemaBuilder; - import static org.junit.jupiter.api.Assertions.*; import static org.mockito.Mockito.*; @@ -280,19 +276,6 @@ public void onInitSyncRunningPollReturnsScannedItemsBatch() throws InterruptedEx assertEquals(Instant.parse("2001-01-01T00:00:00.00Z"), task.getSourceInfo().lastInitSyncStart); assertEquals(1, task.getSourceInfo().initSyncCount); - final Schema expectedDocumentSchema = SchemaBuilder.struct().name("DynamoDB.AttributeValue") - .field("col2", Schema.STRING_SCHEMA) - .field("col3", Schema.STRING_SCHEMA) - .field("col1", Schema.STRING_SCHEMA) - .build(); - - final Struct expectedDocument = new Struct(expectedDocumentSchema) - .put("col2","val1") - .put("col3","1") - .put("col1","key1"); - - Struct actualDocument = ((Struct) response.get(0).value()).getStruct("document") ; - assertEquals(1, response.size()); assertEquals("r", ((Struct) response.get(0).value()).getString("op")); assertEquals(("{\"col2\":\"val1\",\"col3\":1,\"col1\":\"key1\"}"), ((Struct) response.get(0).value()).getString("document")); @@ -575,30 +558,10 @@ public void onSyncPollReturnsReceivedRecords() throws InterruptedException { task.start(configs); List response = task.poll(); - final Schema expectedDocumentSchema = SchemaBuilder.struct().name("DynamoDB.AttributeValue") - .field("col2", Schema.STRING_SCHEMA) - .field("col3", Schema.STRING_SCHEMA) - .field("col1", Schema.STRING_SCHEMA) - .build(); - final Struct expectedDocument = new Struct(expectedDocumentSchema) - .put("col2","val1") - .put("col3","1") - .put("col1","key1"); - - final Schema expectedDocumentColSchema = SchemaBuilder.struct().name("DynamoDB.AttributeValue") - .field("col1", Schema.STRING_SCHEMA) - .build(); - - final Struct expectedDocColValue = new Struct(expectedDocumentColSchema) - .put("col1","key2"); - - Struct actualDocument = ((Struct) response.get(0).value()).getStruct("document") ; - Struct actualDocumentCol = ((Struct) response.get(1).value()).getStruct("document"); - // Assert assertEquals(3, response.size()); - compareStructs(expectedDocument, actualDocument); - compareStructs(expectedDocColValue, actualDocumentCol); + assertEquals("{\"col2\":\"val1\",\"col3\":1,\"col1\":\"key1\"}", ((Struct) response.get(0).value()).getString("document")); + assertEquals("{\"col1\":\"key2\"}", ((Struct) response.get(1).value()).getString("document")); assertNull(response.get(2).value()); // tombstone } @@ -911,4 +874,5 @@ public void onCommitIgnoreRecordsWithoutSequenceNumber() throws InterruptedExcep // Assert assertEquals("", shardRegister.get("shard1").getLastCommittedRecordSeqNo()); } + } diff --git a/source/src/test/java/com/trustpilot/connector/dynamodb/utils/RecordConverterTests.java b/source/src/test/java/com/trustpilot/connector/dynamodb/utils/RecordConverterTests.java index 0d8b829..a87cc38 100644 --- a/source/src/test/java/com/trustpilot/connector/dynamodb/utils/RecordConverterTests.java +++ b/source/src/test/java/com/trustpilot/connector/dynamodb/utils/RecordConverterTests.java @@ -1,3 +1,6 @@ + + + package com.trustpilot.connector.dynamodb.utils; import com.amazonaws.services.dynamodbv2.model.AttributeValue; @@ -18,14 +21,8 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Objects; - -import org.apache.kafka.connect.data.Field; -import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.data.SchemaBuilder; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.*; @SuppressWarnings("SameParameterValue") @@ -78,7 +75,7 @@ private SourceInfo getSourceInfo(String table) { @Test public void correctTopicNameIsConstructed() throws Exception { // Arrange - RecordConverter converter = new RecordConverter(getTableDescription(null), "TestTopicPrefix"); + RecordConverter converter = new RecordConverter(getTableDescription(null), "TestTopicPrefix-"); // Act SourceRecord record = converter.toSourceRecord( @@ -91,13 +88,13 @@ public void correctTopicNameIsConstructed() throws Exception { ); // Assert - assertEquals("TestTopicPrefix", record.topic()); + assertEquals("TestTopicPrefix-", record.topic()); } @Test public void sourceInfoIsPutToOffset() throws Exception { // Arrange - RecordConverter converter = new RecordConverter(getTableDescription(null), "TestTopicPrefix"); + RecordConverter converter = new RecordConverter(getTableDescription(null), "TestTopicPrefix-"); // Act SourceRecord record = converter.toSourceRecord( @@ -116,7 +113,7 @@ public void sourceInfoIsPutToOffset() throws Exception { @Test public void shardIdAndSequenceNumberIsPutToOffset() throws Exception { // Arrange - RecordConverter converter = new RecordConverter(getTableDescription(null), "TestTopicPrefix"); + RecordConverter converter = new RecordConverter(getTableDescription(null), "TestTopicPrefix-"); // Act SourceRecord record = converter.toSourceRecord( @@ -139,7 +136,7 @@ public void singleItemKeyIsAddedToRecord() throws Exception { List keySchema = new LinkedList<>(); keySchema.add(new KeySchemaElement().withKeyType("S").withAttributeName("testKV1")); - RecordConverter converter = new RecordConverter(getTableDescription(keySchema), "TestTopicPrefix"); + RecordConverter converter = new RecordConverter(getTableDescription(keySchema), "TestTopicPrefix-"); // Act SourceRecord record = converter.toSourceRecord( @@ -164,7 +161,7 @@ public void multiItemKeyIsAddedToRecord() throws Exception { keySchema.add(new KeySchemaElement().withKeyType("S").withAttributeName("testKV1")); keySchema.add(new KeySchemaElement().withKeyType("N").withAttributeName("testKV2")); - RecordConverter converter = new RecordConverter(getTableDescription(keySchema), "TestTopicPrefix"); + RecordConverter converter = new RecordConverter(getTableDescription(keySchema), "TestTopicPrefix-"); // Act SourceRecord record = converter.toSourceRecord( @@ -189,7 +186,7 @@ public void multiItemKeyIsAddedToRecord() throws Exception { @Test public void recordAttributesAreAddedToValueData() throws Exception { // Arrange - RecordConverter converter = new RecordConverter(getTableDescription(null), "TestTopicPrefix"); + RecordConverter converter = new RecordConverter(getTableDescription(null), "TestTopicPrefix-"); // Act SourceRecord record = converter.toSourceRecord( @@ -201,27 +198,9 @@ public void recordAttributesAreAddedToValueData() throws Exception { "testSequenceNumberID1" ); - //String expected = "{testKV1:testKV1Value,testKV2:'2',testV2:testStringValue,testV1:1}"; - //JsonObject expectedJson = new JsonParser().parse(expected).getAsJsonObject(); - - final Schema expectedDocumentSchema = SchemaBuilder.struct().name("DynamoDB.AttributeValue") - .field("testKV1", Schema.STRING_SCHEMA) - .field("testKV2", Schema.STRING_SCHEMA) - .field("testV2", Schema.STRING_SCHEMA) - .field("testV1", Schema.STRING_SCHEMA) - .build(); - - final Struct expectedDocument = new Struct(expectedDocumentSchema) - .put("testKV1","testKV1Value") - .put("testKV2","2") - .put("testV2","testStringValue") - .put("testV1","1"); - - Struct actualDocument = ((Struct) record.value()).getStruct("document") ; - // Assert - //assertEquals(expectedJson.toString(), ((Struct) record.value()).getString("document")); - compareStructs(expectedDocument, actualDocument); + assertEquals("{\"testKV1\":\"testKV1Value\",\"testKV2\":\"2\",\"testV2\":\"testStringValue\",\"testV1\":1}", + ((Struct) record.value()).getString("document")); } @Test @@ -230,7 +209,7 @@ public void singleItemKeyIsAddedToRecordWhenKeyContainsInvalidCharacters() throw List keySchema = new LinkedList<>(); keySchema.add(new KeySchemaElement().withKeyType("S").withAttributeName("test-1234")); - RecordConverter converter = new RecordConverter(getTableDescription(keySchema), "TestTopicPrefix"); + RecordConverter converter = new RecordConverter(getTableDescription(keySchema), "TestTopicPrefix-"); // Act SourceRecord record = converter.toSourceRecord( @@ -255,7 +234,7 @@ public void multiItemKeyIsAddedToRecordWhenKeyContainsInvalidCharacters() throws keySchema.add(new KeySchemaElement().withKeyType("S").withAttributeName("test-1234")); keySchema.add(new KeySchemaElement().withKeyType("N").withAttributeName("1-starts-with-number")); - RecordConverter converter = new RecordConverter(getTableDescription(keySchema), "TestTopicPrefix"); + RecordConverter converter = new RecordConverter(getTableDescription(keySchema), "TestTopicPrefix-"); // Act SourceRecord record = converter.toSourceRecord( @@ -280,7 +259,7 @@ public void multiItemKeyIsAddedToRecordWhenKeyContainsInvalidCharacters() throws @Test public void recordAttributesAreAddedToValueDataWhenAttributesContainsInvalidCharacters() throws Exception { // Arrange - RecordConverter converter = new RecordConverter(getTableDescription(null), "TestTopicPrefix"); + RecordConverter converter = new RecordConverter(getTableDescription(null), "TestTopicPrefix-"); // Act SourceRecord record = converter.toSourceRecord( @@ -292,31 +271,17 @@ public void recordAttributesAreAddedToValueDataWhenAttributesContainsInvalidChar "testSequenceNumberID1" ); - //String expected = "{test-1234:testKV1Value,_starts_with_underscore:1,1-starts-with-number:'2',test!@£$%^:testStringValue}"; - //JsonObject expectedJson = new JsonParser().parse(expected).getAsJsonObject(); + String expected = "{\"test-1234\":\"testKV1Value\",\"_starts_with_underscore\":1,\"1-starts-with-number\":\"2\",\"test!@£$%^\":\"testStringValue\"}"; - final Schema expectedDocumentSchema = SchemaBuilder.struct().name("DynamoDB.AttributeValue") - .field("test1234", Schema.STRING_SCHEMA) - .field("_starts_with_underscore", Schema.STRING_SCHEMA) - .field("startswithnumber", Schema.STRING_SCHEMA) - .field("test", Schema.STRING_SCHEMA) - .build(); - - final Struct expectedDocument = new Struct(expectedDocumentSchema) - .put("test1234","testKV1Value") - .put("_starts_with_underscore","1") - .put("startswithnumber","2") - .put("test","testStringValue"); - - Struct actualDocument = ((Struct) record.value()).getStruct("document") ; // Assert - compareStructs(expectedDocument, actualDocument); + assertEquals(expected, + ((Struct) record.value()).getString("document")); } @Test public void sourceInfoIsAddedToValueData() throws Exception { // Arrange - RecordConverter converter = new RecordConverter(getTableDescription(null), "TestTopicPrefix"); + RecordConverter converter = new RecordConverter(getTableDescription(null), "TestTopicPrefix-"); // Act SourceRecord record = converter.toSourceRecord( @@ -338,7 +303,7 @@ public void sourceInfoIsAddedToValueData() throws Exception { @Test public void operationIsAddedToValueData() throws Exception { // Arrange - RecordConverter converter = new RecordConverter(getTableDescription(null), "TestTopicPrefix"); + RecordConverter converter = new RecordConverter(getTableDescription(null), "TestTopicPrefix-"); // Act SourceRecord record = converter.toSourceRecord( @@ -357,7 +322,7 @@ public void operationIsAddedToValueData() throws Exception { @Test public void arrivalTimestampIsAddedToValueData() throws Exception { // Arrange - RecordConverter converter = new RecordConverter(getTableDescription(null), "TestTopicPrefix"); + RecordConverter converter = new RecordConverter(getTableDescription(null), "TestTopicPrefix-"); // Act SourceRecord record = converter.toSourceRecord( @@ -373,18 +338,4 @@ public void arrivalTimestampIsAddedToValueData() throws Exception { assertEquals(978393600000L, ((Struct) record.value()).getInt64("ts_ms")); } - public void compareStructs(Struct expectedStruct , Struct actualStruct) { - - // comparing schema for both struct - if (!Objects.equals(expectedStruct.schema(), actualStruct.schema())) { - fail("Schema expected " + expectedStruct.schema().fields() + " but actual " + actualStruct.schema().fields()); - } - - // comparing all fields for both struct - for (Field expectedFieldName : expectedStruct.schema().fields()) { - Field actualFieldName = actualStruct.schema().field(expectedFieldName.name()); - assertEquals(expectedStruct.get(expectedFieldName), actualStruct.get(actualFieldName)); - } - - } -} \ No newline at end of file +} From 9dcbf4211bb78f4c53cce0ad711fb735e08e5dbd Mon Sep 17 00:00:00 2001 From: Ryan Clark Date: Tue, 28 Feb 2023 18:04:35 -0600 Subject: [PATCH 16/23] add init sync skip with new status and initial position in stream --- README.md | 5 +- docs/details.md | 2 + docs/options.md | 3 + .../DynamoDBSourceConnectorConfig.java | 20 +++- .../dynamodb/DynamoDBSourceTask.java | 19 +++- .../connector/dynamodb/InitSyncStatus.java | 3 +- .../connector/dynamodb/kcl/KclWorker.java | 1 + .../connector/dynamodb/kcl/KclWorkerImpl.java | 13 ++- .../dynamodb/DynamoDBSourceTaskTests.java | 107 +++++++++++++++++- .../dynamodb/kcl/KclWorkerImplTests.java | 3 +- 10 files changed, 164 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index 5fe3307..fe0ef6b 100644 --- a/README.md +++ b/README.md @@ -21,8 +21,8 @@ In our implementation we opted to use Amazon Kinesis Client with DynamoDB Stream * Java 8 * Gradlew 5.3.1 * Kafka Connect Framework >= 2.1.1 -* Amazon Kinesis Client 1.9.1 -* DynamoDB Streams Kinesis Adapter 1.5.2 +* Amazon Kinesis Client 1.13.1 +* DynamoDB Streams Kinesis Adapter 1.5.3 ## Documentation * [Getting started](docs/getting-started.md) @@ -44,6 +44,7 @@ In our implementation we opted to use Amazon Kinesis Client with DynamoDB Stream * However you will only encounter this issue by running lots of tasks on one machine with really high load. * Synced(Source) DynamoDB table unit capacity must be large enough to ensure `INIT_SYNC` to be finished in around 16 hours. Otherwise there is a risk `INIT_SYNC` being restarted just as soon as it's finished because DynamoDB Streams store change events only for 24 hours. + * `INIT_SYNC` can be skipped with `init.sync.skip=true` configuration * Required AWS roles: ```json diff --git a/docs/details.md b/docs/details.md index c95aafb..8d95c97 100644 --- a/docs/details.md +++ b/docs/details.md @@ -13,6 +13,8 @@ This connector can sync multiple DynamoDB tables at the same time and it does so `INIT_SYNC` is a process when all existing table data is scanned and pushed into Kafka destination topic. Usually this happens only once after source task for specific table is started for the first time. But it can be repeated in case of unexpected issues, e.g. if source connector was down for long period of time and it is possible that it has missed some of the change events from the table stream (DynamoDB streams store data for 24 hours only). +Using `init.sync.skip` will skip this process and the connector will only ever read from the LATEST position in the stream. + ### 3. "SYNC" Once `INIT_SYNC` is finished source task switches into DynamoDB Streams consumer state. There all changes that happen to the source table are represented in this stream and copied over to the Kafka's destination topic. Consumers of this topic can recreate full state of the source table at any given time. diff --git a/docs/options.md b/docs/options.md index a0c6e4e..4bacf77 100644 --- a/docs/options.md +++ b/docs/options.md @@ -36,6 +36,7 @@ "tasks.max": "1", "init.sync.delay.period": 60, + "init.sync.skip": false, "connect.dynamodb.rediscovery.period": "60000" } ``` @@ -53,6 +54,8 @@ `init.sync.delay.period` - time interval in seconds. Defines how long `INIT_SYNC` should delay execution before starting. This is used to give time for Kafka Connect tasks to calm down after rebalance (Since multiple tasks rebalances can happen in quick succession and this would mean more duplicated data since `INIT_SYNC` process won't have time mark it's progress). +`init.sync.skip` - boolean to determine whether to start the connector reading the entire table or from the latest offset. + `connect.dynamodb.rediscovery.period` - time interval in milliseconds. Defines how often connector should try to find new DynamoDB tables (or detect removed ones). If changes are found tasks are automatically reconfigured. `dynamodb.service.endpoint` - AWS DynamoDB API Endpoint. Will use default AWS if not set. diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceConnectorConfig.java b/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceConnectorConfig.java index 86299be..f077930 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceConnectorConfig.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceConnectorConfig.java @@ -17,6 +17,11 @@ public class DynamoDBSourceConnectorConfig extends AbstractConfig { public static final String SRC_INIT_SYNC_DELAY_DISPLAY = "INIT_SYNC delay"; public static final int SRC_INIT_SYNC_DELAY_DEFAULT = 60; + public static final String SRC_INIT_SYNC_SKIP_CONFIG = "init.sync.skip"; + public static final String SRC_INIT_SYNC_SKIP_DOC = "Define whether to skip INIT_SYNC of table."; + public static final String SRC_INIT_SYNC_SKIP_DISPLAY = "Skip INIT_SYNC"; + public static final boolean SRC_INIT_SYNC_SKIP_DEFAULT = false; + public static final String AWS_REGION_CONFIG = "aws.region"; public static final String AWS_REGION_DOC = "Define AWS region."; public static final String AWS_REGION_DISPLAY = "Region"; @@ -204,12 +209,21 @@ public static ConfigDef baseConfigDef() { ConfigDef.Width.MEDIUM, DST_TOPIC_PREFIX_DISPLAY) + .define(SRC_INIT_SYNC_SKIP_CONFIG, + ConfigDef.Type.BOOLEAN, + SRC_INIT_SYNC_SKIP_DEFAULT, + ConfigDef.Importance.LOW, + SRC_INIT_SYNC_SKIP_DOC, + CONNECTOR_GROUP, 2, + ConfigDef.Width.MEDIUM, + SRC_INIT_SYNC_SKIP_DISPLAY) + .define(SRC_INIT_SYNC_DELAY_CONFIG, ConfigDef.Type.INT, SRC_INIT_SYNC_DELAY_DEFAULT, ConfigDef.Importance.LOW, SRC_INIT_SYNC_DELAY_DOC, - CONNECTOR_GROUP, 2, + CONNECTOR_GROUP, 3, ConfigDef.Width.MEDIUM, SRC_INIT_SYNC_DELAY_DISPLAY) @@ -267,6 +281,10 @@ public long getRediscoveryPeriod() { return getLong(REDISCOVERY_PERIOD_CONFIG); } + public boolean getInitSyncSkip() { + return (boolean)get(SRC_INIT_SYNC_SKIP_CONFIG); + } + public int getInitSyncDelay() { return (int)get(SRC_INIT_SYNC_DELAY_CONFIG); } diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java b/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java index 0f53fdc..98729cb 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java @@ -152,7 +152,14 @@ public void start(Map configProperties) { eventsQueue, shardRegister); } - kclWorker.start(client, dynamoDBStreamsClient, tableDesc.getTableName(), config.getTaskID(), config.getDynamoDBServiceEndpoint(), config.getKCLTableBillingMode()); + kclWorker.start(client, + dynamoDBStreamsClient, + tableDesc.getTableName(), + config.getTaskID(), + config.getDynamoDBServiceEndpoint(), + config.getInitSyncSkip(), + config.getKCLTableBillingMode() + ); shutdown = false; } @@ -196,6 +203,9 @@ public List poll() throws InterruptedException { if (sourceInfo.initSyncStatus == InitSyncStatus.FINISHED) { return sync(); } + if (sourceInfo.initSyncStatus == InitSyncStatus.SKIPPED) { + return sync(); + } throw new Exception("Invalid SourceInfo InitSyncStatus state: " + sourceInfo.initSyncStatus); } catch (InterruptedException ex) { LOGGER.error("Failed to handle incoming records. Records dropped!", ex); @@ -213,6 +223,8 @@ public List poll() throws InterruptedException { * {@link SourceInfo}. */ private LinkedList initSync() throws Exception { + // TODO: remove log + LOGGER.info("init sync running"); if (sourceInfo.lastInitSyncStart.compareTo(Instant.now(clock).minus(Duration.ofHours(19))) <= 0) { LOGGER.error("Current INIT_SYNC took over 19 hours. Restarting INIT_SYNC! {}", sourceInfo); sourceInfo.startInitSync(); @@ -279,6 +291,7 @@ private List sync() throws Exception { LOGGER.debug("Waiting for records from eventsQueue for table: {}", tableDesc.getTableName()); KclRecordsWrapper dynamoDBRecords = eventsQueue.poll(500, TimeUnit.MILLISECONDS); if (dynamoDBRecords == null) { + LOGGER.debug("null dynamoDBRecords"); return null; // returning thread control at regular intervals } @@ -312,12 +325,12 @@ private List sync() throws Exception { } // Received record which is behind "safe" zone. Indicating that "potentially" we lost some records. - // Need to resync... + // Need to resync if sync hasn't been skipped... // This happens if: // * connector was down for some time // * connector is lagging // * connector failed to finish init sync in acceptable time frame - if (recordIsInDangerZone(arrivalTimestamp)) { + if (recordIsInDangerZone(arrivalTimestamp) && sourceInfo.initSyncStatus != InitSyncStatus.SKIPPED) { sourceInfo.startInitSync(); LOGGER.info( diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/InitSyncStatus.java b/source/src/main/java/com/trustpilot/connector/dynamodb/InitSyncStatus.java index 2ab048c..be6791a 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/InitSyncStatus.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/InitSyncStatus.java @@ -3,5 +3,6 @@ public enum InitSyncStatus { UNDEFINED, RUNNING, - FINISHED + FINISHED, + SKIPPED } diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/kcl/KclWorker.java b/source/src/main/java/com/trustpilot/connector/dynamodb/kcl/KclWorker.java index 27557bb..98026ba 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/kcl/KclWorker.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/kcl/KclWorker.java @@ -10,6 +10,7 @@ void start(AmazonDynamoDB dynamoDBClient, String tableName, String taskid, String endpoint, + Boolean isSkipSync, BillingMode kclTablebillingMode); void stop(); diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/kcl/KclWorkerImpl.java b/source/src/main/java/com/trustpilot/connector/dynamodb/kcl/KclWorkerImpl.java index f6bd839..71dcfe4 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/kcl/KclWorkerImpl.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/kcl/KclWorkerImpl.java @@ -30,7 +30,6 @@ public class KclWorkerImpl implements KclWorker { private final AWSCredentialsProvider awsCredentialsProvider; private final ArrayBlockingQueue eventsQueue; private final ConcurrentHashMap recordProcessorsRegister; - private volatile Thread thread; private volatile Worker worker; @@ -49,13 +48,14 @@ public void start(AmazonDynamoDB dynamoDBClient, String tableName, String taskid, String endpoint, + Boolean isSkipSync, BillingMode kclTableBillingMode) { IRecordProcessorFactory recordProcessorFactory = new KclRecordProcessorFactory(tableName, eventsQueue, recordProcessorsRegister); KinesisClientLibConfiguration clientLibConfiguration = getClientLibConfiguration(tableName, taskid, - dynamoDBClient, endpoint, kclTableBillingMode); + dynamoDBClient, endpoint, isSkipSync, kclTableBillingMode); AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient(dynamoDBStreamsClient); @@ -123,8 +123,15 @@ KinesisClientLibConfiguration getClientLibConfiguration(String tableName, String taskid, AmazonDynamoDB dynamoDBClient, String endpoint, + Boolean isSkipSync, BillingMode kclTableBillingMode) { + InitialPositionInStream initialPosition; + if (isSkipSync) { + initialPosition = InitialPositionInStream.LATEST; + } else { + initialPosition = InitialPositionInStream.TRIM_HORIZON; + } String streamArn = dynamoDBClient.describeTable( new DescribeTableRequest() .withTableName(tableName)).getTable().getLatestStreamArn(); @@ -141,7 +148,7 @@ KinesisClientLibConfiguration getClientLibConfiguration(String tableName, // worker will use checkpoint tableName if available, otherwise it is safer // to start at beginning of the stream - .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON) + .withInitialPositionInStream(initialPosition) // we want the maximum batch size to avoid network transfer latency overhead .withMaxRecords(Constants.STREAMS_RECORDS_LIMIT) diff --git a/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java b/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java index 54623cb..e179d9d 100644 --- a/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java +++ b/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java @@ -15,6 +15,8 @@ import org.mockito.ArgumentMatchers; import org.mockito.Mockito; import org.mockito.stubbing.Answer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.time.Clock; import java.time.Duration; @@ -29,7 +31,8 @@ @SuppressWarnings("ConstantConditions") public class DynamoDBSourceTaskTests { private final static String tableName = "testTable1"; - + // TODO: remove logger + private static final Logger LOGGER = LoggerFactory.getLogger(DynamoDBSourceTaskTests.class); private HashMap configs; @BeforeEach @@ -208,6 +211,7 @@ public void kclWorkerIsStartedOnStart() throws InterruptedException { eq(tableName), eq("testTask1"), eq(null), + eq(false), eq(BillingMode.PROVISIONED) ); } @@ -243,6 +247,7 @@ public void ifTaskIsStoppedPollDoesNothing() throws InterruptedException { @Test public void onInitSyncRunningPollReturnsScannedItemsBatch() throws InterruptedException { + LOGGER.debug("start onInitSyncRunningPollReturnsScannedItemsBatch"); // Arrange HashMap offset = new HashMap<>(); offset.put("table_name", tableName); @@ -875,4 +880,104 @@ public void onCommitIgnoreRecordsWithoutSequenceNumber() throws InterruptedExcep assertEquals("", shardRegister.get("shard1").getLastCommittedRecordSeqNo()); } + @Test + public void skippedInitSyncOnSyncPollReturnsReceivedRecords() throws InterruptedException { + // Arrange + HashMap offset = new HashMap<>(); + offset.put("table_name", tableName); + offset.put("init_sync_state", "SKIPPED"); + offset.put("init_sync_start", Instant.parse("2001-01-01T00:00:00.00Z").toEpochMilli()); + + KclRecordsWrapper dynamoDBRecords = new KclRecordsWrapper("testShardId1", new LinkedList<>()); + + TableDescription tableDescription = new TableDescription(); + tableDescription.setTableName(tableName); + tableDescription.setKeySchema(Collections.singleton(new KeySchemaElement("col1", "S"))); + + Map row = new HashMap<>(); + row.put("col1", new AttributeValue("key1")); + row.put("col2", new AttributeValue("val1")); + row.put("col3", new AttributeValue().withN("1")); + List> initSyncRecords = Collections.singletonList(row); + + Map exclusiveStartKey = Collections.singletonMap("fake", new AttributeValue("key")); + + dynamoDBRecords.getRecords().add( + getRecordAdapter(Collections.singletonMap("col1", new AttributeValue().withS("key1")), + row, Instant.parse("2001-01-01T01:00:00.00Z"), + "1000000001", + "INSERT")); + dynamoDBRecords.getRecords().add( + getRecordAdapter(Collections.singletonMap("col1", new AttributeValue().withS("key2")), + null, Instant.parse("2001-01-01T01:00:00.00Z"), + "1000000002", + "REMOVE")); + dynamoDBRecords.getRecords().add( + getRecordAdapter(Collections.singletonMap("col1", new AttributeValue().withS("key1")), + row, Instant.parse("2001-01-01T01:00:00.00Z"), + "1000000003", + "INSERT")); + + DynamoDBSourceTask task = new SourceTaskBuilder() + .withOffset(offset) + .withTableDescription(tableDescription) + .withInitSyncRecords(initSyncRecords, exclusiveStartKey) + .withSyncRecords(Collections.singletonList(dynamoDBRecords)) + .buildTask(); + + // Act + task.start(configs); + List response = task.poll(); + + // Assert + assertEquals(4, response.size()); + assertEquals("{\"col2\":\"val1\",\"col3\":1,\"col1\":\"key1\"}", ((Struct) response.get(0).value()).getString("document")); + assertEquals("{\"col1\":\"key2\"}", ((Struct) response.get(1).value()).getString("document")); + assertNull(response.get(2).value()); // tombstone + } + @Test + public void onStartInitSyncSkipIsNotDelayed() throws InterruptedException { + // Arrange + configs.put("init.sync.delay.period", "2"); + + HashMap offset = new HashMap<>(); + offset.put("table_name", tableName); + offset.put("init_sync_state", "SKIPPED"); + offset.put("init_sync_start", Instant.parse("2001-01-01T00:00:00.00Z").toEpochMilli()); + + KclRecordsWrapper dynamoDBRecords = new KclRecordsWrapper("testShardId1", new LinkedList<>()); + + TableDescription tableDescription = new TableDescription(); + tableDescription.setTableName(tableName); + tableDescription.setKeySchema(Collections.singleton(new KeySchemaElement("col1", "S"))); + + Map row = new HashMap<>(); + row.put("col1", new AttributeValue("key1")); + List> initSyncRecords = Collections.singletonList(row); + + dynamoDBRecords.getRecords().add( + getRecordAdapter(Collections.singletonMap("col1", new AttributeValue().withS("key1")), + row, Instant.parse("2001-01-01T01:01:00.00Z"), + "1000000001", + "INSERT")); + + DynamoDBSourceTask task = new SourceTaskBuilder() + .withOffset(offset) + .withClock(Clock.fixed(Instant.parse("2001-01-01T01:00:00.00Z"), ZoneId.of("UTC"))) + .withTableDescription(tableDescription) + .withInitSyncRecords(initSyncRecords, null) + .withSyncRecords(Collections.singletonList(dynamoDBRecords)) + .buildTask(); + + // Act + task.start(configs); + Instant start = Instant.now(); + List response = task.poll(); + Instant stop = Instant.now(); + + // Assert + assertTrue(Duration.between(start, stop).getSeconds() == 0); + assertEquals(0, task.getSourceInfo().initSyncCount); + assertEquals(1, response.size()); + } } diff --git a/source/src/test/java/com/trustpilot/connector/dynamodb/kcl/KclWorkerImplTests.java b/source/src/test/java/com/trustpilot/connector/dynamodb/kcl/KclWorkerImplTests.java index 12b7bdd..e2336b2 100644 --- a/source/src/test/java/com/trustpilot/connector/dynamodb/kcl/KclWorkerImplTests.java +++ b/source/src/test/java/com/trustpilot/connector/dynamodb/kcl/KclWorkerImplTests.java @@ -35,6 +35,7 @@ void initializationRegistersNewShardToRegistry() { String tableName = "testTableName1"; String taskId = "task1"; String serviceEndpoint = "http://localhost:8000"; + Boolean isSyncSkip = false; BillingMode kclTableBillingMode = BillingMode.PROVISIONED; AmazonDynamoDB dynamoDBClient = Mockito.mock(AmazonDynamoDB.class); @@ -43,7 +44,7 @@ void initializationRegistersNewShardToRegistry() { when(dynamoDBClient.describeTable(ArgumentMatchers.any())).thenReturn(result); // Act - KinesisClientLibConfiguration clientLibConfiguration = kclWorker.getClientLibConfiguration(tableName, taskId, dynamoDBClient, serviceEndpoint, kclTableBillingMode); + KinesisClientLibConfiguration clientLibConfiguration = kclWorker.getClientLibConfiguration(tableName, taskId, dynamoDBClient, serviceEndpoint, isSyncSkip, kclTableBillingMode); // Assert assertEquals("datalake-KCL-testTableName1", clientLibConfiguration.getApplicationName()); From 7665279292a799b63dc30ad22592f97a43d4ae9f Mon Sep 17 00:00:00 2001 From: Ryan Clark Date: Wed, 1 Mar 2023 19:03:25 -0600 Subject: [PATCH 17/23] sourceInfo skipInitSync method --- .../dynamodb/DynamoDBSourceTask.java | 7 +++-- .../connector/dynamodb/SourceInfo.java | 8 ++++++ .../dynamodb/DynamoDBSourceTaskTests.java | 27 +++++++++++++++++++ 3 files changed, 40 insertions(+), 2 deletions(-) diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java b/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java index 98729cb..7e9a05a 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java @@ -89,6 +89,7 @@ public class DynamoDBSourceTask extends SourceTask { private SourceInfo sourceInfo; private TableDescription tableDesc; private int initSyncDelay; + private boolean initSyncSkip; @SuppressWarnings("unused") //Used by Confluent platform to initialize connector @@ -124,6 +125,7 @@ public void start(Map configProperties) { tableDesc = client.describeTable(config.getTableName()).getTable(); initSyncDelay = config.getInitSyncDelay(); + initSyncSkip = config.getInitSyncSkip(); LOGGER.debug("Getting offset for table: {}", tableDesc.getTableName()); setStateFromOffset(); @@ -169,6 +171,9 @@ private void setStateFromOffset() { .offset(Collections.singletonMap("table_name", tableDesc.getTableName())); if (offset != null) { sourceInfo = SourceInfo.fromOffset(offset, clock); + if (initSyncSkip) { + sourceInfo.skipInitSync(); + } } else { LOGGER.debug("No stored offset found for table: {}", tableDesc.getTableName()); sourceInfo = new SourceInfo(tableDesc.getTableName(), clock); @@ -223,8 +228,6 @@ public List poll() throws InterruptedException { * {@link SourceInfo}. */ private LinkedList initSync() throws Exception { - // TODO: remove log - LOGGER.info("init sync running"); if (sourceInfo.lastInitSyncStart.compareTo(Instant.now(clock).minus(Duration.ofHours(19))) <= 0) { LOGGER.error("Current INIT_SYNC took over 19 hours. Restarting INIT_SYNC! {}", sourceInfo); sourceInfo.startInitSync(); diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/SourceInfo.java b/source/src/main/java/com/trustpilot/connector/dynamodb/SourceInfo.java index c0f04d4..51c3162 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/SourceInfo.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/SourceInfo.java @@ -71,6 +71,14 @@ public void endInitSync() { lastInitSyncEnd = Instant.now(clock); } + public void skipInitSync() { + initSyncStatus = InitSyncStatus.SKIPPED; + lastInitSyncStart = Instant.ofEpochSecond(0); + lastInitSyncEnd = Instant.ofEpochSecond(0);; + exclusiveStartKey = null; + initSyncCount = 0L; + } + private static final Schema STRUCT_SCHEMA = SchemaBuilder.struct() .name(SchemaNameAdjuster .defaultAdjuster() diff --git a/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java b/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java index e179d9d..8b24372 100644 --- a/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java +++ b/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java @@ -880,9 +880,35 @@ public void onCommitIgnoreRecordsWithoutSequenceNumber() throws InterruptedExcep assertEquals("", shardRegister.get("shard1").getLastCommittedRecordSeqNo()); } + @Test + public void sourceInfoOfSkippedInitSyncIsLoadedFromOffsetOnStart() throws InterruptedException { + configs.put("init.sync.skip", "true"); + // Arrange + HashMap offset = new HashMap<>(); + offset.put("table_name", tableName); + offset.put("init_sync_state", "SKIPPED"); + offset.put("init_sync_start", Instant.parse("2001-01-02T00:00:00.00Z").toEpochMilli()); + + DynamoDBSourceTask task = new SourceTaskBuilder() + .withOffset(offset) + .buildTask(); + + // Act + task.start(configs); + + // Assert + SourceInfo sourceInfo = task.getSourceInfo(); + assertEquals(tableName, sourceInfo.tableName); + assertEquals(InitSyncStatus.SKIPPED, sourceInfo.initSyncStatus); + assertEquals(Instant.parse("1970-01-01T00:00:00Z"), sourceInfo.lastInitSyncStart); + assertEquals(Instant.parse("1970-01-01T00:00:00Z"), sourceInfo.lastInitSyncEnd); + } + @Test public void skippedInitSyncOnSyncPollReturnsReceivedRecords() throws InterruptedException { // Arrange + configs.put("init.sync.skip", "true"); + HashMap offset = new HashMap<>(); offset.put("table_name", tableName); offset.put("init_sync_state", "SKIPPED"); @@ -938,6 +964,7 @@ public void skippedInitSyncOnSyncPollReturnsReceivedRecords() throws Interrupted @Test public void onStartInitSyncSkipIsNotDelayed() throws InterruptedException { // Arrange + configs.put("init.sync.skip", "true"); configs.put("init.sync.delay.period", "2"); HashMap offset = new HashMap<>(); From b1f839c0ca076af23fc68b48bb3b6464427594a7 Mon Sep 17 00:00:00 2001 From: Ryan Clark Date: Thu, 2 Mar 2023 13:10:40 -0600 Subject: [PATCH 18/23] init sync flag to not restart if skipped --- .../dynamodb/DynamoDBSourceTaskTests.java | 50 +++++++++++++++++-- 1 file changed, 47 insertions(+), 3 deletions(-) diff --git a/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java b/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java index 8b24372..41fcdee 100644 --- a/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java +++ b/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java @@ -31,8 +31,6 @@ @SuppressWarnings("ConstantConditions") public class DynamoDBSourceTaskTests { private final static String tableName = "testTable1"; - // TODO: remove logger - private static final Logger LOGGER = LoggerFactory.getLogger(DynamoDBSourceTaskTests.class); private HashMap configs; @BeforeEach @@ -247,7 +245,6 @@ public void ifTaskIsStoppedPollDoesNothing() throws InterruptedException { @Test public void onInitSyncRunningPollReturnsScannedItemsBatch() throws InterruptedException { - LOGGER.debug("start onInitSyncRunningPollReturnsScannedItemsBatch"); // Arrange HashMap offset = new HashMap<>(); offset.put("table_name", tableName); @@ -1007,4 +1004,51 @@ public void onStartInitSyncSkipIsNotDelayed() throws InterruptedException { assertEquals(0, task.getSourceInfo().initSyncCount); assertEquals(1, response.size()); } + + @Test + public void onSyncPollInitSyncSkipReturnsNullAndDoesNotStartInitSyncIfAnyOneRecordEventArrivedTooLate() throws InterruptedException { + // Arrange + configs.put("init.sync.skip", "true"); + + HashMap offset = new HashMap<>(); + offset.put("table_name", tableName); + offset.put("init_sync_state", "SKIPPED"); + offset.put("init_sync_start", Instant.parse("2001-01-01T00:00:00.00Z").toEpochMilli()); + + KclRecordsWrapper dynamoDBRecords = new KclRecordsWrapper("testShardId1", new LinkedList<>()); + + TableDescription tableDescription = new TableDescription(); + tableDescription.setTableName(tableName); + tableDescription.setKeySchema(Collections.singleton(new KeySchemaElement("col1", "S"))); + + Map row = new HashMap<>(); + row.put("col1", new AttributeValue("key1")); + row.put("col2", new AttributeValue("val1")); + row.put("col3", new AttributeValue().withN("1")); + List> initSyncRecords = Collections.singletonList(row); + + Map exclusiveStartKey = Collections.singletonMap("fake", new AttributeValue("key")); + + dynamoDBRecords.getRecords().add(getRecordAdapter(Collections.singletonMap("col1", new AttributeValue().withN("key1")), + row, Instant.parse("2001-01-03T15:00:00.00Z"), "s1", "INSERT")); + dynamoDBRecords.getRecords().add(getRecordAdapter(Collections.singletonMap("col1", new AttributeValue().withN("key1")), + row, Instant.parse("2001-01-03T00:00:00.00Z"), "s2", "INSERT")); + + DynamoDBSourceTask task = new SourceTaskBuilder() + .withOffset(offset) + .withClock(Clock.fixed(Instant.parse("2001-01-03T20:00:00.00Z"), ZoneId.of("UTC"))) + .withTableDescription(tableDescription) + .withInitSyncRecords(initSyncRecords, exclusiveStartKey) + .withSyncRecords(Collections.singletonList(dynamoDBRecords)) + .buildTask(); + + // Act + task.start(configs); + List response = task.poll(); + + // Assert + assertEquals(2, response.size()); + assertEquals(0, task.getSourceInfo().initSyncCount); + assertEquals(InitSyncStatus.SKIPPED, task.getSourceInfo().initSyncStatus); + } } From 1bf58c45d961cd623cf64786e32546db94060e96 Mon Sep 17 00:00:00 2001 From: Ryan Clark Date: Fri, 3 Mar 2023 13:49:52 -0600 Subject: [PATCH 19/23] changed offset time of init sync sourceInfo --- .../trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java b/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java index 41fcdee..3e067f5 100644 --- a/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java +++ b/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java @@ -884,7 +884,7 @@ public void sourceInfoOfSkippedInitSyncIsLoadedFromOffsetOnStart() throws Interr HashMap offset = new HashMap<>(); offset.put("table_name", tableName); offset.put("init_sync_state", "SKIPPED"); - offset.put("init_sync_start", Instant.parse("2001-01-02T00:00:00.00Z").toEpochMilli()); + offset.put("init_sync_start", Instant.parse("1970-01-01T00:00:00Z").toEpochMilli()); DynamoDBSourceTask task = new SourceTaskBuilder() .withOffset(offset) From 3e63b7364293554faa1cad7ad0e0bf058f6eeee8 Mon Sep 17 00:00:00 2001 From: Ryan Clark Date: Tue, 7 Mar 2023 18:55:37 -0600 Subject: [PATCH 20/23] remove comment --- .../com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java | 2 +- .../trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java b/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java index 7e9a05a..10a0c0e 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java @@ -177,7 +177,7 @@ private void setStateFromOffset() { } else { LOGGER.debug("No stored offset found for table: {}", tableDesc.getTableName()); sourceInfo = new SourceInfo(tableDesc.getTableName(), clock); - sourceInfo.startInitSync(); // InitSyncStatus always needs to run after adding new table + sourceInfo.startInitSync(); } } diff --git a/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java b/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java index 3e067f5..ea01058 100644 --- a/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java +++ b/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java @@ -15,8 +15,6 @@ import org.mockito.ArgumentMatchers; import org.mockito.Mockito; import org.mockito.stubbing.Answer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.time.Clock; import java.time.Duration; From 8b646ac328cca1fff7895322cdbbdfa07bf2f1dd Mon Sep 17 00:00:00 2001 From: Ryan Clark Date: Mon, 27 Mar 2023 06:42:32 -0500 Subject: [PATCH 21/23] handle no offset init.sync.skip --- .../dynamodb/DynamoDBSourceTask.java | 6 +++++- .../dynamodb/DynamoDBSourceTaskTests.java | 19 +++++++++++++++++++ 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java b/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java index 10a0c0e..1a33b9f 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java @@ -177,7 +177,11 @@ private void setStateFromOffset() { } else { LOGGER.debug("No stored offset found for table: {}", tableDesc.getTableName()); sourceInfo = new SourceInfo(tableDesc.getTableName(), clock); - sourceInfo.startInitSync(); + if (initSyncSkip) { + sourceInfo.skipInitSync(); + } else { + sourceInfo.startInitSync(); + } } } diff --git a/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java b/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java index ea01058..3c2efbe 100644 --- a/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java +++ b/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java @@ -875,6 +875,25 @@ public void onCommitIgnoreRecordsWithoutSequenceNumber() throws InterruptedExcep assertEquals("", shardRegister.get("shard1").getLastCommittedRecordSeqNo()); } + @Test + public void initSyncIsSkippedWithNoOffsetOnStart() throws InterruptedException { + configs.put("init.sync.skip", "true"); + // Arrange + DynamoDBSourceTask task = new SourceTaskBuilder() + .withOffset(null) + .buildTask(); + + // Act + task.start(configs); + + // Assert + SourceInfo sourceInfo = task.getSourceInfo(); + assertEquals(tableName, sourceInfo.tableName); + assertEquals(InitSyncStatus.SKIPPED, sourceInfo.initSyncStatus); + assertEquals(Instant.parse("1970-01-01T00:00:00Z"), sourceInfo.lastInitSyncStart); + assertEquals(Instant.parse("1970-01-01T00:00:00Z"), sourceInfo.lastInitSyncEnd); + } + @Test public void sourceInfoOfSkippedInitSyncIsLoadedFromOffsetOnStart() throws InterruptedException { configs.put("init.sync.skip", "true"); From de864e1a93a18f42c640d62b80515fc0d2585b2e Mon Sep 17 00:00:00 2001 From: Sam Corzine Date: Thu, 15 Jun 2023 17:05:56 -0500 Subject: [PATCH 22/23] initial commit for testing --- .../connector/dynamodb/DynamoDBSourceTask.java | 5 +++++ .../com/trustpilot/connector/dynamodb/Envelope.java | 4 ++++ .../connector/dynamodb/utils/RecordConverter.java | 3 +++ .../dynamodb/utils/RecordConverterTests.java | 12 ++++++++++++ 4 files changed, 24 insertions(+) diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java b/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java index 1a33b9f..c934bb0 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java @@ -257,6 +257,7 @@ private LinkedList initSync() throws Exception { result.add(converter.toSourceRecord(sourceInfo, Envelope.Operation.READ, record, + null, sourceInfo.lastInitSyncStart, null, null)); @@ -275,6 +276,7 @@ private LinkedList initSync() throws Exception { result.add(converter.toSourceRecord(sourceInfo, Envelope.Operation.READ, lastRecord, + null, sourceInfo.lastInitSyncStart, null, null)); @@ -364,9 +366,12 @@ private List sync() throws Exception { attributes = dynamoDbRecord.getDynamodb().getKeys(); } + Map oldImage = dynamoDbRecord.getDynamodb().getOldImage(); + SourceRecord sourceRecord = converter.toSourceRecord(sourceInfo, op, attributes, + oldImage, arrivalTimestamp.toInstant(), dynamoDBRecords.getShardId(), record.getSequenceNumber()); diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/Envelope.java b/source/src/main/java/com/trustpilot/connector/dynamodb/Envelope.java index a803ff1..81cccb1 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/Envelope.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/Envelope.java @@ -56,6 +56,10 @@ public static final class FieldName { * The {@code after} field is used to store the state of a record after an operation. */ public static final String DOCUMENT = "document"; + /** + * The {@code oldDocument} field is used to store the state of a record before an operation. + */ + public static final String OLD_DOCUMENT = "oldDocument"; /** * The {@code op} field is used to store the kind of operation on a record. */ diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java b/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java index 788c6f1..e6218b2 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java @@ -62,6 +62,7 @@ public SourceRecord toSourceRecord( SourceInfo sourceInfo, Envelope.Operation op, Map attributes, + Map oldImage, Instant arrivalTimestamp, String shardId, String sequenceNumber) throws Exception { @@ -77,6 +78,7 @@ public SourceRecord toSourceRecord( // getUnmarshallItems from Dynamo Document Map unMarshalledItems = ItemUtils.toSimpleMapValue(attributes); + Map unMarshalledOldItems = ItemUtils.toSimpleMapValue(oldImage); // Leveraging offsets to store shard and sequence number with each item pushed to Kafka. // This info will only be used to update `shardRegister` and won't be used to reset state after restart @@ -106,6 +108,7 @@ public SourceRecord toSourceRecord( Struct valueData = new Struct(valueSchema) .put(Envelope.FieldName.VERSION, sourceInfo.version) .put(Envelope.FieldName.DOCUMENT, objectMapper.writeValueAsString(unMarshalledItems)) + .put(Envelope.FieldName.OLD_DOCUMENT, objectMapper.writeValueAsString(unMarshalledOldItems)) .put(Envelope.FieldName.SOURCE, SourceInfo.toStruct(sourceInfo)) .put(Envelope.FieldName.OPERATION, op.code()) .put(Envelope.FieldName.TIMESTAMP, arrivalTimestamp.toEpochMilli()); diff --git a/source/src/test/java/com/trustpilot/connector/dynamodb/utils/RecordConverterTests.java b/source/src/test/java/com/trustpilot/connector/dynamodb/utils/RecordConverterTests.java index a87cc38..8ecfc85 100644 --- a/source/src/test/java/com/trustpilot/connector/dynamodb/utils/RecordConverterTests.java +++ b/source/src/test/java/com/trustpilot/connector/dynamodb/utils/RecordConverterTests.java @@ -82,6 +82,7 @@ public void correctTopicNameIsConstructed() throws Exception { getSourceInfo(table), Envelope.Operation.forCode("r"), getAttributes(), + getAttributes(), Instant.parse("2001-01-02T00:00:00.00Z"), "testShardID1", "testSequenceNumberID1" @@ -101,6 +102,7 @@ public void sourceInfoIsPutToOffset() throws Exception { getSourceInfo(table), Envelope.Operation.forCode("r"), getAttributes(), + getAttributes(), Instant.parse("2001-01-02T00:00:00.00Z"), "testShardID1", "testSequenceNumberID1" @@ -120,6 +122,7 @@ public void shardIdAndSequenceNumberIsPutToOffset() throws Exception { getSourceInfo(table), Envelope.Operation.forCode("r"), getAttributes(), + getAttributes(), Instant.parse("2001-01-02T00:00:00.00Z"), "testShardID1", "testSequenceNumberID1" @@ -143,6 +146,7 @@ public void singleItemKeyIsAddedToRecord() throws Exception { getSourceInfo(table), Envelope.Operation.forCode("r"), getAttributes(), + getAttributes(), Instant.parse("2001-01-02T00:00:00.00Z"), "testShardID1", "testSequenceNumberID1" @@ -168,6 +172,7 @@ public void multiItemKeyIsAddedToRecord() throws Exception { getSourceInfo(table), Envelope.Operation.forCode("r"), getAttributes(), + getAttributes(), Instant.parse("2001-01-02T00:00:00.00Z"), "testShardID1", "testSequenceNumberID1" @@ -193,6 +198,7 @@ public void recordAttributesAreAddedToValueData() throws Exception { getSourceInfo(table), Envelope.Operation.forCode("r"), getAttributes(), + getAttributes(), Instant.parse("2001-01-02T00:00:00.00Z"), "testShardID1", "testSequenceNumberID1" @@ -216,6 +222,7 @@ public void singleItemKeyIsAddedToRecordWhenKeyContainsInvalidCharacters() throw getSourceInfo(table), Envelope.Operation.forCode("r"), getAttributesWithInvalidAvroCharacters(), + getAttributesWithInvalidAvroCharacters(), Instant.parse("2001-01-02T00:00:00.00Z"), "testShardID1", "testSequenceNumberID1" @@ -241,6 +248,7 @@ public void multiItemKeyIsAddedToRecordWhenKeyContainsInvalidCharacters() throws getSourceInfo(table), Envelope.Operation.forCode("r"), getAttributesWithInvalidAvroCharacters(), + getAttributesWithInvalidAvroCharacters(), Instant.parse("2001-01-02T00:00:00.00Z"), "testShardID1", "testSequenceNumberID1" @@ -266,6 +274,7 @@ public void recordAttributesAreAddedToValueDataWhenAttributesContainsInvalidChar getSourceInfo(table), Envelope.Operation.forCode("r"), getAttributesWithInvalidAvroCharacters(), + getAttributesWithInvalidAvroCharacters(), Instant.parse("2001-01-02T00:00:00.00Z"), "testShardID1", "testSequenceNumberID1" @@ -288,6 +297,7 @@ public void sourceInfoIsAddedToValueData() throws Exception { getSourceInfo(table), Envelope.Operation.forCode("r"), getAttributes(), + getAttributes(), Instant.parse("2001-01-02T00:00:00.00Z"), "testShardID1", "testSequenceNumberID1" @@ -310,6 +320,7 @@ public void operationIsAddedToValueData() throws Exception { getSourceInfo(table), Envelope.Operation.forCode("r"), getAttributes(), + getAttributes(), Instant.parse("2001-01-02T00:00:00.00Z"), "testShardID1", "testSequenceNumberID1" @@ -329,6 +340,7 @@ public void arrivalTimestampIsAddedToValueData() throws Exception { getSourceInfo(table), Envelope.Operation.forCode("r"), getAttributes(), + getAttributes(), Instant.parse("2001-01-02T00:00:00.00Z"), "testShardID1", "testSequenceNumberID1" From 9f6535147c0705c5fe6caa08cb1619da0acc4163 Mon Sep 17 00:00:00 2001 From: Sam Corzine Date: Fri, 16 Jun 2023 10:36:02 -0500 Subject: [PATCH 23/23] task is working as desired --- .../main/java/com/trustpilot/connector/dynamodb/Envelope.java | 2 +- .../trustpilot/connector/dynamodb/utils/RecordConverter.java | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/Envelope.java b/source/src/main/java/com/trustpilot/connector/dynamodb/Envelope.java index 81cccb1..92bd884 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/Envelope.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/Envelope.java @@ -59,7 +59,7 @@ public static final class FieldName { /** * The {@code oldDocument} field is used to store the state of a record before an operation. */ - public static final String OLD_DOCUMENT = "oldDocument"; + public static final String OLD_DOCUMENT = "old_document"; /** * The {@code op} field is used to store the kind of operation on a record. */ diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java b/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java index e6218b2..d5285ff 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java @@ -52,6 +52,7 @@ public RecordConverter(TableDescription tableDesc, String topicNamePrefix) { .name(SchemaNameAdjuster.DEFAULT.adjust( "com.trustpilot.connector.dynamodb.envelope")) .field(Envelope.FieldName.VERSION, Schema.STRING_SCHEMA) .field(Envelope.FieldName.DOCUMENT, DynamoDbJson.schema()) + .field(Envelope.FieldName.OLD_DOCUMENT, DynamoDbJson.schema()) .field(Envelope.FieldName.SOURCE, SourceInfo.structSchema()) .field(Envelope.FieldName.OPERATION, Schema.STRING_SCHEMA) .field(Envelope.FieldName.TIMESTAMP, Schema.INT64_SCHEMA)