From c35e7a59a6864292a18752260eeac3100f2e1b33 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Raimondas=20Tij=C5=ABnaitis?= Date: Thu, 19 Nov 2020 22:25:09 +0200 Subject: [PATCH] Table whitelist config parameter (#7) integration tests - updated kinesis adapter - service endpoints config - fixed credential config types updated kinesis-adapter use gson only integration test task test switch from init sync table whitelist config parameter kcl.table.billing.mode config parameter add aws-java-sdk-sts dependency https://github.com/trustpilot/kafka-connect-dynamodb/issues/5 params cleaning cleanup docs --- README.md | 8 +- build.gradle | 34 ++- docs/details.md | 4 +- docs/options.md | 12 +- source/build.gradle | 3 +- .../dynamodb/DynamoDBSourceConnector.java | 29 +- .../DynamoDBSourceConnectorConfig.java | 97 ++++++- .../dynamodb/DynamoDBSourceTask.java | 42 +-- .../connector/dynamodb/aws/AwsClients.java | 63 +++-- .../dynamodb/aws/ConfigTablesProvider.java | 41 +++ .../dynamodb/aws/DynamoDBTablesProvider.java | 18 +- .../dynamodb/aws/TablesProvider.java | 1 + .../dynamodb/aws/TablesProviderBase.java | 29 ++ .../dynamodb/kcl/KclNoopCloudWatch.java | 68 +++++ .../connector/dynamodb/kcl/KclWorker.java | 13 +- .../connector/dynamodb/kcl/KclWorkerImpl.java | 52 ++-- .../dynamodb/DynamoDBSourceConnectorIT.java | 108 ++++++++ .../dynamodb/DynamoDBSourceTaskTests.java | 74 +++--- .../dynamodb/KafkaConnectITBase.java | 250 ++++++++++++++++++ .../dynamodb/kcl/KclWorkerImplTests.java | 8 +- .../testcontainers/ConnectContainer.java | 68 +++++ .../testcontainers/DynamoDBContainer.java | 25 ++ .../MockServerContainerExt.java | 14 + .../SchemaRegistryContainer.java | 26 ++ 24 files changed, 953 insertions(+), 134 deletions(-) create mode 100644 source/src/main/java/com/trustpilot/connector/dynamodb/aws/ConfigTablesProvider.java create mode 100644 source/src/main/java/com/trustpilot/connector/dynamodb/aws/TablesProviderBase.java create mode 100644 source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceConnectorIT.java create mode 100644 source/src/test/java/com/trustpilot/connector/dynamodb/KafkaConnectITBase.java create mode 100644 source/src/test/java/com/trustpilot/connector/dynamodb/testcontainers/ConnectContainer.java create mode 100644 source/src/test/java/com/trustpilot/connector/dynamodb/testcontainers/DynamoDBContainer.java create mode 100644 source/src/test/java/com/trustpilot/connector/dynamodb/testcontainers/MockServerContainerExt.java create mode 100644 source/src/test/java/com/trustpilot/connector/dynamodb/testcontainers/SchemaRegistryContainer.java diff --git a/README.md b/README.md index 6a6ed06..5fe3307 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ A [Kafka Connector](http://kafka.apache.org/documentation.html#connect) which im ## Notable features * `autodiscovery` - monitors and automatically discovers DynamoDB tables to start/stop syncing from (based on AWS TAG's) * `initial sync` - automatically detects and if needed performs initial(existing) data replication before tracking changes from the DynamoDB table stream - +* `local debugging` - use of test containers to test full connector life-cycle ## Alternatives Prior our development we found only one existing implementation by [shikhar](https://github.com/shikhar/kafka-connect-dynamodb), but it seems to be missing major features (initial sync, handling shard changes) and is no longer supported. @@ -22,7 +22,7 @@ In our implementation we opted to use Amazon Kinesis Client with DynamoDB Stream * Gradlew 5.3.1 * Kafka Connect Framework >= 2.1.1 * Amazon Kinesis Client 1.9.1 -* DynamoDB Streams Kinesis Adapter 1.4.0 +* DynamoDB Streams Kinesis Adapter 1.5.2 ## Documentation * [Getting started](docs/getting-started.md) @@ -97,6 +97,9 @@ In our implementation we opted to use Amazon Kinesis Client with DynamoDB Stream # build & run unit tests ./gradlew +# run integration tests +./gradlew integrationTests + # build final jar ./gradlew shadowJar ``` @@ -128,7 +131,6 @@ Releases are done by creating new release(aka tag) via Github user interface. On ## Roadmap (TODO: move to issues?) -* Add Confluent stack as docker-compose.yml for easier local debugging * Use multithreaded DynamoDB table scan for faster `INIT SYNC` diff --git a/build.gradle b/build.gradle index 8524812..bf1a4bf 100644 --- a/build.gradle +++ b/build.gradle @@ -45,8 +45,19 @@ allprojects { dependencies { - testImplementation('org.junit.jupiter:junit-jupiter:5.4.1') - testCompile ("org.junit.jupiter:junit-jupiter-params:5.3.2") + def junitJupiterVersion = '5.6.2' + testImplementation "org.junit.jupiter:junit-jupiter:$junitJupiterVersion" + testCompile "org.junit.jupiter:junit-jupiter-api:$junitJupiterVersion" + testCompile "org.junit.jupiter:junit-jupiter-params:$junitJupiterVersion" + testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:$junitJupiterVersion" + implementation 'io.rest-assured:rest-assured:4.3.1' + testCompile "org.testcontainers:testcontainers:1.14.3" + testCompile "org.testcontainers:junit-jupiter:1.14.3" + testCompile "org.testcontainers:kafka:1.15.0-rc2" + testCompile "org.testcontainers:mockserver:1.15.0-rc2" + testCompile "org.mock-server:mockserver-client-java:5.11.1" + testCompile "com.google.code.gson:gson:2.8.6" + testCompile group: 'org.mockito', name: 'mockito-junit-jupiter', version: '2.26.0' compile group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.11.2' compile group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.11.2' @@ -54,6 +65,7 @@ allprojects { } test { + exclude '**/**IT**' useJUnitPlatform() testLogging { outputs.upToDateWhen {false} @@ -66,8 +78,26 @@ allprojects { } } } + + task integrationTests(type: Test) { + dependsOn shadowJar + useJUnitPlatform() + include '**/**IT**' + testLogging { + outputs.upToDateWhen {false} + events = ["passed", "failed", "skipped"] + showStandardStreams = true + afterSuite { desc, result -> + if (!desc.parent) { // will match the outermost suite + println "Results: ${result.resultType} (${result.testCount} tests, ${result.successfulTestCount} successes, ${result.failedTestCount} failures, ${result.skippedTestCount} skipped)" + } + } + } + } } + + dependencies { compile project(':source') } diff --git a/docs/details.md b/docs/details.md index 664b45e..c95aafb 100644 --- a/docs/details.md +++ b/docs/details.md @@ -8,7 +8,7 @@ This connector can sync multiple DynamoDB tables at the same time and it does so * environment TAG key and value set * DynamoDB streams enabled (in `new_image` or `new_and_old_image` mode) - +> Note: if `dynamodb.table.whitelist` parameter is set, then auto-discovery will not be executed and replication will be issued for explicitly defined tables. ### 2. "INIT_SYNC" `INIT_SYNC` is a process when all existing table data is scanned and pushed into Kafka destination topic. Usually this happens only once after source task for specific table is started for the first time. But it can be repeated in case of unexpected issues, e.g. if source connector was down for long period of time and it is possible that it has missed some of the change events from the table stream (DynamoDB streams store data for 24 hours only). @@ -40,7 +40,7 @@ Since we are using two different frameworks/libraries together there are two dif ### `DISCOVERY` state and task configuration -Connector uses AWS resource group API to receive a list of DynamoDB tables which have ingestion TAG defined. Then it iterates over this list and checks if environment TAG is matched and streams are actually enabled. Connect task is started for each table which meats all requirements. +If `dynamodb.table.whitelist` parameter is not defined connector uses AWS resource group API to receive a list of DynamoDB tables which have ingestion TAG defined. Then it iterates over this list and checks if environment TAG is matched and streams are actually enabled. Connect task is started for each table which meats all requirements. `discovery` phase is executed on start and every 60 seconds(default config value) after initial start. diff --git a/docs/options.md b/docs/options.md index 66ea66c..423a1c7 100644 --- a/docs/options.md +++ b/docs/options.md @@ -24,7 +24,13 @@ "dynamodb.table.env.tag.key": "environment", "dynamodb.table.env.tag.value": "dev", "dynamodb.table.ingestion.tag.key": "datalake-ingest", + "dynamodb.table.whitelist": "", + "dynamodb.service.endpoint": "", + "kcl.table.billing.mode": "PROVISIONED", + + "resource.tagging.service.endpoint": "", + "kafka.topic.prefix": "dynamodb-", "tasks.max": "1", @@ -44,8 +50,12 @@ `init.sync.delay.period` - time interval in seconds. Defines how long `INIT_SYNC` should delay execution before starting. This is used to give time for Kafka Connect tasks to calm down after rebalance (Since multiple tasks rebalances can happen in quick succession and this would mean more duplicated data since `INIT_SYNC` process won't have time mark it's progress). - `connect.dynamodb.rediscovery.period` - time interval in milliseconds. Defines how often connector should try to find new DynamoDB tables (or detect removed ones). If changes are found tasks are automatically reconfigured. +`connect.dynamodb.rediscovery.period` - time interval in milliseconds. Defines how often connector should try to find new DynamoDB tables (or detect removed ones). If changes are found tasks are automatically reconfigured. +`dynamodb.service.endpoint` - AWS DynamoDB API Endpoint. Will use default AWS if not set. +`resource.tagging.service.endpoint` - AWS Resource Group Tag API Endpoint. Will use default AWS if not set. +`kcl.table.billing.mode` - Define billing mode for internal table created by the KCL library. Default is provisioned. +`dynamodb.table.whitelist` - Define whitelist of dynamodb table names. This overrides table auto-discovery by ingestion tag. diff --git a/source/build.gradle b/source/build.gradle index 126c50e..d8faae6 100644 --- a/source/build.gradle +++ b/source/build.gradle @@ -6,5 +6,6 @@ dependencies { compile group: 'org.apache.kafka', name: 'connect-api', version: "${rootProject.ext.kafkaConnectApiVersion}" compile group: 'com.amazonaws', name: 'amazon-kinesis-client', version: '1.9.1' - compile group: 'com.amazonaws', name: 'dynamodb-streams-kinesis-adapter', version: '1.4.0' + compile group: 'com.amazonaws', name: 'dynamodb-streams-kinesis-adapter', version: '1.5.2' + compile group: 'com.amazonaws', name: 'aws-java-sdk-sts', version: '1.11.877' } \ No newline at end of file diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceConnector.java b/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceConnector.java index f7ad5b6..0e95906 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceConnector.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceConnector.java @@ -2,8 +2,9 @@ import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.resourcegroupstaggingapi.AWSResourceGroupsTaggingAPI; -import com.trustpilot.connector.dynamodb.aws.DynamoDBTablesProvider; import com.trustpilot.connector.dynamodb.aws.AwsClients; +import com.trustpilot.connector.dynamodb.aws.ConfigTablesProvider; +import com.trustpilot.connector.dynamodb.aws.DynamoDBTablesProvider; import com.trustpilot.connector.dynamodb.aws.TablesProvider; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.connector.ConnectorContext; @@ -54,20 +55,26 @@ public void start(Map properties) { AWSResourceGroupsTaggingAPI groupsTaggingAPIClient = AwsClients.buildAWSResourceGroupsTaggingAPIClient(config.getAwsRegion(), - config.getAwsAccessKeyId(), - config.getAwsSecretKey()); + config.getResourceTaggingServiceEndpoint(), + config.getAwsAccessKeyIdValue(), + config.getAwsSecretKeyValue()); AmazonDynamoDB dynamoDBClient = AwsClients.buildDynamoDbClient(config.getAwsRegion(), - config.getAwsAccessKeyId(), - config.getAwsSecretKey()); + config.getDynamoDBServiceEndpoint(), + config.getAwsAccessKeyIdValue(), + config.getAwsSecretKeyValue()); if (tablesProvider == null) { - tablesProvider = new DynamoDBTablesProvider( - groupsTaggingAPIClient, - dynamoDBClient, - config.getSrcDynamoDBIngestionTagKey(), - config.getSrcDynamoDBEnvTagKey(), - config.getSrcDynamoDBEnvTagValue()); + if (config.getWhitelistTables() != null) { + tablesProvider = new ConfigTablesProvider(dynamoDBClient, config); + } else { + tablesProvider = new DynamoDBTablesProvider( + groupsTaggingAPIClient, + dynamoDBClient, + config.getSrcDynamoDBIngestionTagKey(), + config.getSrcDynamoDBEnvTagKey(), + config.getSrcDynamoDBEnvTagValue()); + } } startBackgroundReconfigurationTasks(this.context, config.getRediscoveryPeriod()); diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceConnectorConfig.java b/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceConnectorConfig.java index 3f91bd2..078d6fb 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceConnectorConfig.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceConnectorConfig.java @@ -1,8 +1,11 @@ package com.trustpilot.connector.dynamodb; +import com.amazonaws.services.dynamodbv2.model.BillingMode; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.types.Password; +import java.util.List; import java.util.Map; public class DynamoDBSourceConnectorConfig extends AbstractConfig { @@ -22,12 +25,12 @@ public class DynamoDBSourceConnectorConfig extends AbstractConfig { public static final String AWS_ACCESS_KEY_ID_CONFIG = "aws.access.key.id"; public static final String AWS_ACCESS_KEY_ID_DOC = "Explicit AWS access key ID. Leave empty to utilize the default credential provider chain."; public static final String AWS_ACCESS_KEY_ID_DISPLAY = "Access key id"; - public static final Object AWS_ACCESS_KEY_ID_DEFAULT = null; + public static final Password AWS_ACCESS_KEY_ID_DEFAULT = null; public static final String AWS_SECRET_KEY_CONFIG = "aws.secret.key"; public static final String AWS_SECRET_KEY_DOC = "Explicit AWS secret access key. Leave empty to utilize the default credential provider chain."; public static final String AWS_SECRET_KEY_DISPLAY = "Secret key"; - public static final Object AWS_SECRET_KEY_DEFAULT = null; + public static final Password AWS_SECRET_KEY_DEFAULT = null; public static final String SRC_DYNAMODB_TABLE_INGESTION_TAG_KEY_CONFIG = "dynamodb.table.ingestion.tag.key"; public static final String SRC_DYNAMODB_TABLE_INGESTION_TAG_KEY_DOC = "Define DynamoDB table tag name. Only tables with this tag key will be ingested."; @@ -44,6 +47,16 @@ public class DynamoDBSourceConnectorConfig extends AbstractConfig { public static final String SRC_DYNAMODB_TABLE_ENV_TAG_VALUE_DISPLAY = "Environment"; public static final String SRC_DYNAMODB_TABLE_ENV_TAG_VALUE_DEFAULT = "dev"; + public static final String SRC_DYNAMODB_TABLE_WHITELIST_CONFIG = "dynamodb.table.whitelist"; + public static final String SRC_DYNAMODB_TABLE_WHITELIST_DOC = "Define whitelist of dynamodb table names. This overrides table auto-discovery by ingestion tag."; + public static final String SRC_DYNAMODB_TABLE_WHITELIST_DISPLAY = "Tables whitelist"; + public static final String SRC_DYNAMODB_TABLE_WHITELIST_DEFAULT = null; + + public static final String SRC_KCL_TABLE_BILLING_MODE_CONFIG = "kcl.table.billing.mode"; + public static final String SRC_KCL_TABLE_BILLING_MODE_DOC = "Define billing mode for internal table created by the KCL library. Default is provisioned."; + public static final String SRC_KCL_TABLE_BILLING_MODE_DISPLAY = "KCL table billing mode"; + public static final String SRC_KCL_TABLE_BILLING_MODE_DEFAULT = "PROVISIONED"; + public static final String DST_TOPIC_PREFIX_CONFIG = "kafka.topic.prefix"; public static final String DST_TOPIC_PREFIX_DOC = "Define Kafka topic destination prefix. End will be the name of a table."; public static final String DST_TOPIC_PREFIX_DISPLAY = "Topic prefix"; @@ -55,6 +68,16 @@ public class DynamoDBSourceConnectorConfig extends AbstractConfig { public static final String REDISCOVERY_PERIOD_DISPLAY = "Rediscovery period"; public static final long REDISCOVERY_PERIOD_DEFAULT = 1 * 60 * 1000; // 1 minute + public static final String AWS_RESOURCE_TAGGING_API_ENDPOINT_CONFIG = "resource.tagging.service.endpoint"; + public static final String AWS_RESOURCE_TAGGING_API_ENDPOINT_DOC = "AWS Resource Group Tag API Endpoint. Will use default AWS if not set."; + public static final String AWS_RESOURCE_TAGGING_API_ENDPOINT_DISPLAY = "AWS Resource Group Tag API Endpoint"; + public static final String AWS_RESOURCE_TAGGING_API_ENDPOINT_DEFAULT = null; + + public static final String AWS_DYNAMODB_SERVICE_ENDPOINT_CONFIG = "dynamodb.service.endpoint"; + public static final String AWS_DYNAMODB_SERVICE_ENDPOINT_DOC = "AWS DynamoDB API Endpoint. Will use default AWS if not set."; + public static final String AWS_DYNAMODB_SERVICE_ENDPOINT_DISPLAY = "AWS DynamoDB API Endpoint"; + public static final String AWS_DYNAMODB_SERVICE_ENDPOINT_DEFAULT = null; + static final ConfigDef config = baseConfigDef(); public DynamoDBSourceConnectorConfig(Map props) { @@ -122,6 +145,42 @@ public static ConfigDef baseConfigDef() { ConfigDef.Width.MEDIUM, SRC_DYNAMODB_TABLE_ENV_TAG_VALUE_DISPLAY) + .define(AWS_DYNAMODB_SERVICE_ENDPOINT_CONFIG, + ConfigDef.Type.STRING, + AWS_DYNAMODB_SERVICE_ENDPOINT_DEFAULT, + ConfigDef.Importance.LOW, + AWS_DYNAMODB_SERVICE_ENDPOINT_DOC, + AWS_GROUP, 7, + ConfigDef.Width.MEDIUM, + AWS_DYNAMODB_SERVICE_ENDPOINT_DISPLAY) + + .define(AWS_RESOURCE_TAGGING_API_ENDPOINT_CONFIG, + ConfigDef.Type.STRING, + AWS_RESOURCE_TAGGING_API_ENDPOINT_DEFAULT, + ConfigDef.Importance.LOW, + AWS_RESOURCE_TAGGING_API_ENDPOINT_DOC, + AWS_GROUP, 8, + ConfigDef.Width.MEDIUM, + AWS_RESOURCE_TAGGING_API_ENDPOINT_DISPLAY) + + .define(SRC_DYNAMODB_TABLE_WHITELIST_CONFIG, + ConfigDef.Type.LIST, + SRC_DYNAMODB_TABLE_WHITELIST_DEFAULT, + ConfigDef.Importance.LOW, + SRC_DYNAMODB_TABLE_WHITELIST_DOC, + AWS_GROUP, 8, + ConfigDef.Width.MEDIUM, + SRC_DYNAMODB_TABLE_WHITELIST_DISPLAY) + + .define(SRC_KCL_TABLE_BILLING_MODE_CONFIG, + ConfigDef.Type.STRING, + SRC_KCL_TABLE_BILLING_MODE_DEFAULT, + ConfigDef.Importance.LOW, + SRC_KCL_TABLE_BILLING_MODE_DOC, + AWS_GROUP, 9, + ConfigDef.Width.MEDIUM, + SRC_KCL_TABLE_BILLING_MODE_DISPLAY) + .define(DST_TOPIC_PREFIX_CONFIG, ConfigDef.Type.STRING, DST_TOPIC_PREFIX_DEFAULT, @@ -148,8 +207,8 @@ public static ConfigDef baseConfigDef() { CONNECTOR_GROUP, 4, ConfigDef.Width.MEDIUM, REDISCOVERY_PERIOD_DISPLAY) - ; + } public static void main(String[] args) { @@ -160,12 +219,20 @@ public String getAwsRegion() { return getString(AWS_REGION_CONFIG); } - public String getAwsAccessKeyId() { - return getString(AWS_ACCESS_KEY_ID_CONFIG); + public Password getAwsAccessKeyId() { + return getPassword(AWS_ACCESS_KEY_ID_CONFIG); + } + + public String getAwsAccessKeyIdValue() { + return getPassword(AWS_ACCESS_KEY_ID_CONFIG) == null ? null : getPassword(AWS_ACCESS_KEY_ID_CONFIG).value(); + } + + public Password getAwsSecretKey() { + return getPassword(AWS_SECRET_KEY_CONFIG); } - public String getAwsSecretKey() { - return getString(AWS_SECRET_KEY_CONFIG); + public String getAwsSecretKeyValue() { + return getPassword(AWS_SECRET_KEY_CONFIG) == null ? null : getPassword(AWS_SECRET_KEY_CONFIG).value(); } public String getSrcDynamoDBIngestionTagKey() { @@ -189,4 +256,20 @@ public long getRediscoveryPeriod() { public int getInitSyncDelay() { return (int)get(SRC_INIT_SYNC_DELAY_CONFIG); } + + public String getDynamoDBServiceEndpoint() { + return getString(AWS_DYNAMODB_SERVICE_ENDPOINT_CONFIG); + } + + public String getResourceTaggingServiceEndpoint() { + return getString(AWS_RESOURCE_TAGGING_API_ENDPOINT_CONFIG); + } + + public List getWhitelistTables() { + return getList(SRC_DYNAMODB_TABLE_WHITELIST_CONFIG) != null ? getList(SRC_DYNAMODB_TABLE_WHITELIST_CONFIG) : null; + } + + public BillingMode getKCLTableBillingMode() { + return BillingMode.fromValue(getString(SRC_KCL_TABLE_BILLING_MODE_CONFIG)); + } } diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java b/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java index 22a9844..1868cc3 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java @@ -1,18 +1,19 @@ package com.trustpilot.connector.dynamodb; -import com.amazonaws.auth.AWSCredentialsProvider; -import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; -import com.amazonaws.services.dynamodbv2.model.*; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreams; +import com.amazonaws.services.dynamodbv2.model.AttributeValue; +import com.amazonaws.services.dynamodbv2.model.ScanResult; +import com.amazonaws.services.dynamodbv2.model.TableDescription; import com.amazonaws.services.dynamodbv2.streamsadapter.model.RecordAdapter; import com.amazonaws.services.kinesis.model.Record; -import com.trustpilot.connector.dynamodb.kcl.KclWorker; +import com.trustpilot.connector.dynamodb.aws.AwsClients; +import com.trustpilot.connector.dynamodb.aws.DynamoDBTableScanner; +import com.trustpilot.connector.dynamodb.aws.TableScanner; import com.trustpilot.connector.dynamodb.kcl.KclRecordsWrapper; +import com.trustpilot.connector.dynamodb.kcl.KclWorker; import com.trustpilot.connector.dynamodb.kcl.KclWorkerImpl; import com.trustpilot.connector.dynamodb.kcl.ShardInfo; -import com.trustpilot.connector.dynamodb.aws.DynamoDBTableScanner; -import com.trustpilot.connector.dynamodb.aws.AwsClients; -import com.trustpilot.connector.dynamodb.aws.TableScanner; import com.trustpilot.connector.dynamodb.utils.RecordConverter; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; @@ -24,7 +25,9 @@ import java.time.Duration; import java.time.Instant; import java.util.*; -import java.util.concurrent.*; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; /** * This source tasks tracks all DynamoDB table changes via DynamoDB Streams. @@ -111,9 +114,11 @@ public void start(Map configProperties) { LOGGER.debug("Getting DynamoDB description for table: {}", config.getTableName()); if (client == null) { - client = AwsClients.buildDynamoDbClient(config.getAwsRegion(), - config.getAwsAccessKeyId(), - config.getAwsSecretKey()); + client = AwsClients.buildDynamoDbClient( + config.getAwsRegion(), + config.getDynamoDBServiceEndpoint(), + config.getAwsAccessKeyIdValue(), + config.getAwsSecretKeyValue()); } tableDesc = client.describeTable(config.getTableName()).getTable(); @@ -126,19 +131,26 @@ public void start(Map configProperties) { LOGGER.debug("Initiating DynamoDB table scanner and record converter."); if (tableScanner == null) { tableScanner = new DynamoDBTableScanner(client, - tableDesc.getTableName(), - tableDesc.getProvisionedThroughput().getReadCapacityUnits()); + tableDesc.getTableName(), + tableDesc.getProvisionedThroughput().getReadCapacityUnits()); } converter = new RecordConverter(tableDesc, config.getDestinationTopicPrefix()); LOGGER.info("Starting background KCL worker thread for table: {}", tableDesc.getTableName()); + + AmazonDynamoDBStreams dynamoDBStreamsClient = AwsClients.buildDynamoDbStreamsClient( + config.getAwsRegion(), + config.getDynamoDBServiceEndpoint(), + config.getAwsAccessKeyIdValue(), + config.getAwsSecretKeyValue()); + if (kclWorker == null) { kclWorker = new KclWorkerImpl( - AwsClients.getCredentials(config.getAwsAccessKeyId(), config.getAwsSecretKey()), + AwsClients.getCredentials(config.getAwsAccessKeyIdValue(), config.getAwsSecretKeyValue()), eventsQueue, shardRegister); } - kclWorker.start(config.getAwsRegion(), tableDesc.getTableName(), config.getTaskID()); + kclWorker.start(client, dynamoDBStreamsClient, tableDesc.getTableName(), config.getTaskID(), config.getDynamoDBServiceEndpoint(), config.getKCLTableBillingMode()); shutdown = false; } diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/aws/AwsClients.java b/source/src/main/java/com/trustpilot/connector/dynamodb/aws/AwsClients.java index b6717a7..175f6f0 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/aws/AwsClients.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/aws/AwsClients.java @@ -5,41 +5,57 @@ import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; +import com.amazonaws.client.builder.AwsClientBuilder; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreams; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClientBuilder; import com.amazonaws.services.resourcegroupstaggingapi.AWSResourceGroupsTaggingAPI; import com.amazonaws.services.resourcegroupstaggingapi.AWSResourceGroupsTaggingAPIClientBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + public class AwsClients { private static final Logger LOGGER = LoggerFactory.getLogger(AwsClients.class); public static AmazonDynamoDB buildDynamoDbClient(String awsRegion, + String serviceEndpoint, String awsAccessKeyID, String awsSecretKey) { - ClientConfiguration clientConfig = new ClientConfiguration(); - clientConfig.setUseThrottleRetries(true); - return AmazonDynamoDBClientBuilder.standard() - .withCredentials(getCredentials(awsAccessKeyID, awsSecretKey)) - .withClientConfiguration(clientConfig) - .withRegion(awsRegion) - .build(); + return (AmazonDynamoDB) configureBuilder( + AmazonDynamoDBClientBuilder.standard(), + awsRegion, serviceEndpoint, + awsAccessKeyID, + awsSecretKey) + .build(); } public static AWSResourceGroupsTaggingAPI buildAWSResourceGroupsTaggingAPIClient(String awsRegion, + String serviceEndpoint, String awsAccessKeyID, String awsSecretKey) { - ClientConfiguration clientConfig = new ClientConfiguration(); - clientConfig.setUseThrottleRetries(true); - - return AWSResourceGroupsTaggingAPIClientBuilder.standard() - .withCredentials(getCredentials(awsAccessKeyID, awsSecretKey)) - .withClientConfiguration(clientConfig) - .withRegion(awsRegion) - .build(); + return (AWSResourceGroupsTaggingAPI) configureBuilder( + AWSResourceGroupsTaggingAPIClientBuilder.standard(), + awsRegion, serviceEndpoint, + awsAccessKeyID, + awsSecretKey) + .build(); + } + + public static AmazonDynamoDBStreams buildDynamoDbStreamsClient(String awsRegion, + String serviceEndpoint, + String awsAccessKeyID, + String awsSecretKey) { + return (AmazonDynamoDBStreams) configureBuilder( + AmazonDynamoDBStreamsClientBuilder.standard(), + awsRegion, serviceEndpoint, + awsAccessKeyID, + awsSecretKey) + .build(); + } public static AWSCredentialsProvider getCredentials(String awsAccessKeyID, String awsSecretKey) { @@ -54,4 +70,21 @@ public static AWSCredentialsProvider getCredentials(String awsAccessKeyID, Strin return new AWSStaticCredentialsProvider(awsCreds); } } + + private static AwsClientBuilder configureBuilder(AwsClientBuilder builder, + String awsRegion, + String serviceEndpoint, + String awsAccessKeyID, + String awsSecretKey) { + + builder.withCredentials(getCredentials(awsAccessKeyID, awsSecretKey)) + .withClientConfiguration(new ClientConfiguration().withThrottledRetries(true)); + + if(serviceEndpoint != null && !serviceEndpoint.isEmpty()) { + builder.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(serviceEndpoint, awsRegion)); + } else { + builder.withRegion(awsRegion); + } + return builder; + } } diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/aws/ConfigTablesProvider.java b/source/src/main/java/com/trustpilot/connector/dynamodb/aws/ConfigTablesProvider.java new file mode 100644 index 0000000..914b0e1 --- /dev/null +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/aws/ConfigTablesProvider.java @@ -0,0 +1,41 @@ +package com.trustpilot.connector.dynamodb.aws; + +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException; +import com.amazonaws.services.dynamodbv2.model.TableDescription; +import com.trustpilot.connector.dynamodb.DynamoDBSourceConnectorConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.LinkedList; +import java.util.List; + +public class ConfigTablesProvider extends TablesProviderBase implements TablesProvider { + private static final Logger LOGGER = LoggerFactory.getLogger(ConfigTablesProvider.class); + List tables; + AmazonDynamoDB client; + + public ConfigTablesProvider(AmazonDynamoDB client, DynamoDBSourceConnectorConfig connectorConfig) { + tables = connectorConfig.getWhitelistTables(); + this.client = client; + } + + @Override + public List getConsumableTables() { + final List consumableTables = new LinkedList<>(); + for (String table : tables) { + try { + final TableDescription tableDesc = client.describeTable(table).getTable(); + + if (this.hasValidConfig(tableDesc, table)) { + LOGGER.info("Table to sync: {}", table); + consumableTables.add(table); + } + } + catch (AmazonDynamoDBException ex) { + LOGGER.warn("Error while trying to read metadata for table " + table, ex); + } + } + return consumableTables; + } +} diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/aws/DynamoDBTablesProvider.java b/source/src/main/java/com/trustpilot/connector/dynamodb/aws/DynamoDBTablesProvider.java index ee539ec..49c4e72 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/aws/DynamoDBTablesProvider.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/aws/DynamoDBTablesProvider.java @@ -21,7 +21,7 @@ * Also validates if tables have streams enabled with valid options. * All invalid tables are skipped. */ -public class DynamoDBTablesProvider implements TablesProvider { +public class DynamoDBTablesProvider extends TablesProviderBase implements TablesProvider { private static final Logger LOGGER = LoggerFactory.getLogger(DynamoDBTablesProvider.class); private final AWSResourceGroupsTaggingAPI groupsTaggingAPI; private final AmazonDynamoDB client; @@ -96,20 +96,4 @@ private GetResourcesRequest getGetResourcesRequest() { return resourcesRequest; } - private boolean hasValidConfig(TableDescription tableDesc, String tableName) { - final StreamSpecification streamSpec = tableDesc.getStreamSpecification(); - if (streamSpec == null || !streamSpec.isStreamEnabled()) { - LOGGER.warn("DynamoDB table `{}` does not have streams enabled", tableName); - return false; - } - - final String streamViewType = streamSpec.getStreamViewType(); - if (!streamViewType.equals(StreamViewType.NEW_IMAGE.name()) - && !streamViewType.equals(StreamViewType.NEW_AND_OLD_IMAGES.name())) { - LOGGER.warn("DynamoDB stream view type for table `{}` is {}", tableName, streamViewType); - return false; - } - - return true; - } } diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/aws/TablesProvider.java b/source/src/main/java/com/trustpilot/connector/dynamodb/aws/TablesProvider.java index 5cf6f7c..7fc5c98 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/aws/TablesProvider.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/aws/TablesProvider.java @@ -5,3 +5,4 @@ public interface TablesProvider { List getConsumableTables() throws InterruptedException; } + diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/aws/TablesProviderBase.java b/source/src/main/java/com/trustpilot/connector/dynamodb/aws/TablesProviderBase.java new file mode 100644 index 0000000..773408b --- /dev/null +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/aws/TablesProviderBase.java @@ -0,0 +1,29 @@ +package com.trustpilot.connector.dynamodb.aws; + +import com.amazonaws.services.dynamodbv2.model.StreamSpecification; +import com.amazonaws.services.dynamodbv2.model.StreamViewType; +import com.amazonaws.services.dynamodbv2.model.TableDescription; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class TablesProviderBase implements TablesProvider { + private static final Logger LOGGER = LoggerFactory.getLogger(TablesProviderBase.class); + + protected boolean hasValidConfig(TableDescription tableDesc, String tableName) { + final StreamSpecification streamSpec = tableDesc.getStreamSpecification(); + if (streamSpec == null || !streamSpec.isStreamEnabled()) { + LOGGER.warn("DynamoDB table `{}` does not have streams enabled", tableName); + return false; + } + + final String streamViewType = streamSpec.getStreamViewType(); + if (!streamViewType.equals(StreamViewType.NEW_IMAGE.name()) + && !streamViewType.equals(StreamViewType.NEW_AND_OLD_IMAGES.name())) { + LOGGER.warn("DynamoDB stream view type for table `{}` is {}", tableName, streamViewType); + return false; + } + + return true; + } +} + diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/kcl/KclNoopCloudWatch.java b/source/src/main/java/com/trustpilot/connector/dynamodb/kcl/KclNoopCloudWatch.java index 90fa332..2c960bf 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/kcl/KclNoopCloudWatch.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/kcl/KclNoopCloudWatch.java @@ -11,21 +11,29 @@ * This class allows KCL to be started without CloudWatch connectivity. */ public class KclNoopCloudWatch implements AmazonCloudWatch { + @Deprecated @Override public void setEndpoint(String s) { } + @Deprecated @Override public void setRegion(Region region) { } + @Override public DeleteAlarmsResult deleteAlarms(DeleteAlarmsRequest deleteAlarmsRequest) { return null; } + @Override + public DeleteAnomalyDetectorResult deleteAnomalyDetector(DeleteAnomalyDetectorRequest deleteAnomalyDetectorRequest) { + return null; + } + @Override public DescribeAlarmHistoryResult describeAlarmHistory(DescribeAlarmHistoryRequest describeAlarmHistoryRequest) { return null; @@ -51,21 +59,46 @@ public DescribeAlarmsForMetricResult describeAlarmsForMetric(DescribeAlarmsForMe return null; } + @Override + public DescribeAnomalyDetectorsResult describeAnomalyDetectors(DescribeAnomalyDetectorsRequest describeAnomalyDetectorsRequest) { + return null; + } + + @Override + public DescribeInsightRulesResult describeInsightRules(DescribeInsightRulesRequest describeInsightRulesRequest) { + return null; + } + @Override public DisableAlarmActionsResult disableAlarmActions(DisableAlarmActionsRequest disableAlarmActionsRequest) { return null; } + @Override + public DisableInsightRulesResult disableInsightRules(DisableInsightRulesRequest disableInsightRulesRequest) { + return null; + } + @Override public EnableAlarmActionsResult enableAlarmActions(EnableAlarmActionsRequest enableAlarmActionsRequest) { return null; } + @Override + public EnableInsightRulesResult enableInsightRules(EnableInsightRulesRequest enableInsightRulesRequest) { + return null; + } + @Override public GetMetricStatisticsResult getMetricStatistics(GetMetricStatisticsRequest getMetricStatisticsRequest) { return null; } + @Override + public GetMetricWidgetImageResult getMetricWidgetImage(GetMetricWidgetImageRequest getMetricWidgetImageRequest) { + return null; + } + @Override public ListMetricsResult listMetrics(ListMetricsRequest listMetricsRequest) { return null; @@ -76,6 +109,16 @@ public ListMetricsResult listMetrics() { return null; } + @Override + public ListTagsForResourceResult listTagsForResource(ListTagsForResourceRequest listTagsForResourceRequest) { + return null; + } + + @Override + public PutAnomalyDetectorResult putAnomalyDetector(PutAnomalyDetectorRequest putAnomalyDetectorRequest) { + return null; + } + @Override public PutMetricAlarmResult putMetricAlarm(PutMetricAlarmRequest putMetricAlarmRequest) { return null; @@ -91,6 +134,16 @@ public SetAlarmStateResult setAlarmState(SetAlarmStateRequest setAlarmStateReque return null; } + @Override + public TagResourceResult tagResource(TagResourceRequest tagResourceRequest) { + return null; + } + + @Override + public UntagResourceResult untagResource(UntagResourceRequest untagResourceRequest) { + return null; + } + @Override public void shutdown() { @@ -111,11 +164,21 @@ public DeleteDashboardsResult deleteDashboards(DeleteDashboardsRequest deleteDas return null; } + @Override + public DeleteInsightRulesResult deleteInsightRules(DeleteInsightRulesRequest deleteInsightRulesRequest) { + return null; + } + @Override public GetDashboardResult getDashboard(GetDashboardRequest getDashboardRequest) { return null; } + @Override + public GetInsightRuleReportResult getInsightRuleReport(GetInsightRuleReportRequest getInsightRuleReportRequest) { + return null; + } + @Override public GetMetricDataResult getMetricData(GetMetricDataRequest getMetricDataRequest) { return null; @@ -130,4 +193,9 @@ public ListDashboardsResult listDashboards(ListDashboardsRequest listDashboardsR public PutDashboardResult putDashboard(PutDashboardRequest putDashboardRequest) { return null; } + + @Override + public PutInsightRuleResult putInsightRule(PutInsightRuleRequest putInsightRuleRequest) { + return null; + } } diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/kcl/KclWorker.java b/source/src/main/java/com/trustpilot/connector/dynamodb/kcl/KclWorker.java index cc4b728..27557bb 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/kcl/KclWorker.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/kcl/KclWorker.java @@ -1,7 +1,16 @@ package com.trustpilot.connector.dynamodb.kcl; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreams; +import com.amazonaws.services.dynamodbv2.model.BillingMode; + public interface KclWorker { - void start(String awsRegion, String tableName, String taskid); + void start(AmazonDynamoDB dynamoDBClient, + AmazonDynamoDBStreams dynamoDBStreamsClient, + String tableName, + String taskid, + String endpoint, + BillingMode kclTablebillingMode); void stop(); -} +} \ No newline at end of file diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/kcl/KclWorkerImpl.java b/source/src/main/java/com/trustpilot/connector/dynamodb/kcl/KclWorkerImpl.java index 4051bc6..f6bd839 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/kcl/KclWorkerImpl.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/kcl/KclWorkerImpl.java @@ -3,9 +3,8 @@ import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.services.cloudwatch.AmazonCloudWatch; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreams; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClientBuilder; +import com.amazonaws.services.dynamodbv2.model.BillingMode; import com.amazonaws.services.dynamodbv2.model.DescribeTableRequest; import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient; import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsWorkerFactory; @@ -45,20 +44,19 @@ public KclWorkerImpl(AWSCredentialsProvider awsCredentialsProvider, @Override - public void start(String awsRegion, String tableName, String taskid) { - AmazonDynamoDB dynamoDBClient = AmazonDynamoDBClientBuilder.standard() - .withRegion(awsRegion) - .build(); - - + public void start(AmazonDynamoDB dynamoDBClient, + AmazonDynamoDBStreams dynamoDBStreamsClient, + String tableName, + String taskid, + String endpoint, + BillingMode kclTableBillingMode) { IRecordProcessorFactory recordProcessorFactory = new KclRecordProcessorFactory(tableName, eventsQueue, - recordProcessorsRegister); + recordProcessorsRegister); + KinesisClientLibConfiguration clientLibConfiguration = getClientLibConfiguration(tableName, - taskid, - dynamoDBClient); - AmazonDynamoDBStreams dynamoDBStreamsClient = AmazonDynamoDBStreamsClientBuilder.standard() - .withRegion(awsRegion) - .build(); + taskid, + dynamoDBClient, endpoint, kclTableBillingMode); + AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient(dynamoDBStreamsClient); // If enabled, throws exception if trying to consume expired shards. But seems there is no way to catch @@ -73,16 +71,16 @@ public void start(String awsRegion, String tableName, String taskid) { worker = StreamsWorkerFactory .createDynamoDbStreamsWorker(recordProcessorFactory, - clientLibConfiguration, - adapterClient, - dynamoDBClient, - cloudWatchClient); + clientLibConfiguration, + adapterClient, + dynamoDBClient, + cloudWatchClient); LOGGER.info("Creating KCL worker for Stream: {} ApplicationName: {} WorkerId: {}", - clientLibConfiguration.getStreamName(), - clientLibConfiguration.getApplicationName(), - clientLibConfiguration.getWorkerIdentifier() + clientLibConfiguration.getStreamName(), + clientLibConfiguration.getApplicationName(), + clientLibConfiguration.getWorkerIdentifier() ); thread = new Thread(worker); @@ -123,7 +121,9 @@ public synchronized void stop() { KinesisClientLibConfiguration getClientLibConfiguration(String tableName, String taskid, - AmazonDynamoDB dynamoDBClient) { + AmazonDynamoDB dynamoDBClient, + String endpoint, + BillingMode kclTableBillingMode) { String streamArn = dynamoDBClient.describeTable( new DescribeTableRequest() @@ -159,7 +159,13 @@ KinesisClientLibConfiguration getClientLibConfiguration(String tableName, .withLogWarningForTaskAfterMillis(60 * 1000) // fix some random issues with https://github.com/awslabs/amazon-kinesis-client/issues/164 - .withIgnoreUnexpectedChildShards(true); + .withIgnoreUnexpectedChildShards(true) + + // custom streams API endpoint + .withDynamoDBEndpoint(endpoint) + + // custom table billing mode + .withBillingMode(kclTableBillingMode); } /** diff --git a/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceConnectorIT.java b/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceConnectorIT.java new file mode 100644 index 0000000..0034c72 --- /dev/null +++ b/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceConnectorIT.java @@ -0,0 +1,108 @@ +package com.trustpilot.connector.dynamodb; + +import com.amazonaws.services.dynamodbv2.document.Item; +import com.amazonaws.services.dynamodbv2.document.KeyAttribute; +import com.amazonaws.services.dynamodbv2.model.*; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.junit.jupiter.api.Test; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.util.Arrays; + +@Testcontainers +public class DynamoDBSourceConnectorIT extends KafkaConnectITBase { + + @Test + public void onConnectorCreateRecordsAreReplicatedFromBeginningAndSwitchedToStreaming() { + String tableName = "Movies1"; + String connector = "con1"; + CreateTableResult createTableResult = newDynamoDBTable( + newReplicationTableRequest() + .withTableName(tableName) + .withKeySchema(Arrays.asList( + new KeySchemaElement("year", KeyType.HASH), + new KeySchemaElement("title", KeyType.RANGE))) + .withAttributeDefinitions(Arrays.asList( + new AttributeDefinition("year", ScalarAttributeType.N), + new AttributeDefinition("title", ScalarAttributeType.S)))); + + putDynamoDBItems(tableName, Arrays.asList( + new Item().withKeyComponents(new KeyAttribute("year", 2020), new KeyAttribute("title", "Tenet")), + new Item().withKeyComponents(new KeyAttribute("year", 1999), new KeyAttribute("title", "The Matrix")) + )); + + mockTaggingAPIResponse("/", new ResourceTags( + Arrays.asList(createTableResult.getTableDescription().getTableArn()), SRC_DYNAMODB_TABLE_INGESTION_TAG_KEY_CONFIG) + .asJson()); + + registerConnector(connector); + + + try(KafkaConsumer consumer = getConsumer()) { + consumer.subscribe(Arrays.asList("dynamodb-" + tableName)); + + // 2 - from initial sync + // 2 - from streams as Table streaming was enabled upon table creation + drain(consumer, 4); + } + + + putDynamoDBItems(tableName, Arrays.asList( + new Item().withKeyComponents(new KeyAttribute("year", 2000), new KeyAttribute("title", "Memento")) + )); + + try(KafkaConsumer consumer = getConsumer()) { + consumer.subscribe(Arrays.asList("dynamodb-" + tableName)); + + // all previous messages +1 newly inserted + drain(consumer, 5); + } + + } + + @Test + public void onConnectorWithTableWhitelistCreateRecordsAreReplicatedFromBeginningAndSwitchedToStreaming() { + String tableName = "Movies2"; + String connector = "con2"; + newDynamoDBTable( + newReplicationTableRequest() + .withTableName(tableName) + .withKeySchema(Arrays.asList( + new KeySchemaElement("year", KeyType.HASH), + new KeySchemaElement("title", KeyType.RANGE))) + .withAttributeDefinitions(Arrays.asList( + new AttributeDefinition("year", ScalarAttributeType.N), + new AttributeDefinition("title", ScalarAttributeType.S)))); + + putDynamoDBItems(tableName, Arrays.asList( + new Item().withKeyComponents(new KeyAttribute("year", 2020), new KeyAttribute("title", "Tenet")), + new Item().withKeyComponents(new KeyAttribute("year", 1999), new KeyAttribute("title", "The Matrix")) + )); + + + registerConnector(connector, Arrays.asList(tableName)); + + + try(KafkaConsumer consumer = getConsumer()) { + consumer.subscribe(Arrays.asList("dynamodb-" + tableName)); + + // 2 - from initial sync + // 2 - from streams as Table streaming was enabled upon table creation + drain(consumer, 4); + } + + + putDynamoDBItems(tableName, Arrays.asList( + new Item().withKeyComponents(new KeyAttribute("year", 2000), new KeyAttribute("title", "Memento")) + )); + + try(KafkaConsumer consumer = getConsumer()) { + consumer.subscribe(Arrays.asList("dynamodb-" + tableName)); + + + // all previous messages +1 newly inserted + drain(consumer, 5); + } + + } +} diff --git a/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java b/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java index bd7fb1e..c132506 100644 --- a/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java +++ b/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java @@ -1,12 +1,13 @@ package com.trustpilot.connector.dynamodb; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClient; import com.amazonaws.services.dynamodbv2.model.*; import com.amazonaws.services.dynamodbv2.streamsadapter.model.RecordAdapter; -import com.trustpilot.connector.dynamodb.kcl.KclWorker; +import com.trustpilot.connector.dynamodb.aws.TableScanner; import com.trustpilot.connector.dynamodb.kcl.KclRecordsWrapper; +import com.trustpilot.connector.dynamodb.kcl.KclWorker; import com.trustpilot.connector.dynamodb.kcl.ShardInfo; -import com.trustpilot.connector.dynamodb.aws.TableScanner; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import org.junit.jupiter.api.BeforeEach; @@ -201,7 +202,14 @@ public void kclWorkerIsStartedOnStart() throws InterruptedException { task.start(configs); // Assert - verify(builder.kclWorker, times(1)).start("testRegion", tableName, "testTask1"); + verify(builder.kclWorker, times(1)).start( + any(AmazonDynamoDB.class), + any(AmazonDynamoDBStreamsClient.class), + eq(tableName), + eq("testTask1"), + eq(null), + eq(BillingMode.PROVISIONED) + ); } @Test @@ -352,8 +360,8 @@ public void onInitSyncRunningPollReturnsScannedItemsBatchAndEndsInitSyncIfExclus // Assert assertEquals(InitSyncStatus.FINISHED, task.getSourceInfo().initSyncStatus); assertEquals(Instant.parse("2001-01-01T00:00:00.00Z"), - task.getSourceInfo().lastInitSyncStart, - "Init sync was restarted?"); + task.getSourceInfo().lastInitSyncStart, + "Init sync was restarted?"); assertEquals(Instant.parse("2001-01-01T01:00:00.00Z"), task.getSourceInfo().lastInitSyncEnd); assertEquals(2, task.getSourceInfo().initSyncCount); assertNull(task.getSourceInfo().exclusiveStartKey); @@ -415,7 +423,7 @@ public void onInitSyncOnlyLastRecordHasLatestSourceInfoState() throws Interrupte assertEquals(null, ((Map) response.get(1).sourceOffset()).get("exclusive_start_key")); } - @Test + @Test public void onInitSyncPollEndsInitSyncIfExclusiveStartKeyIsNullWithNoRecordsReturned() throws InterruptedException { // Arrange HashMap offset = new HashMap<>(); @@ -444,8 +452,8 @@ public void onInitSyncPollEndsInitSyncIfExclusiveStartKeyIsNullWithNoRecordsRetu // Assert assertEquals(InitSyncStatus.FINISHED, task.getSourceInfo().initSyncStatus); assertEquals(Instant.parse("2001-01-01T00:00:00.00Z"), - task.getSourceInfo().lastInitSyncStart, - "Init sync was restarted?"); + task.getSourceInfo().lastInitSyncStart, + "Init sync was restarted?"); assertEquals(Instant.parse("2001-01-01T01:00:00.00Z"), task.getSourceInfo().lastInitSyncEnd); assertNull(task.getSourceInfo().exclusiveStartKey); @@ -531,14 +539,14 @@ public void onSyncPollReturnsReceivedRecords() throws InterruptedException { dynamoDBRecords.getRecords().add( getRecordAdapter(Collections.singletonMap("col1", new AttributeValue().withS("key1")), - row, Instant.parse("2001-01-01T01:00:00.00Z"), - "1000000001", - "INSERT")); + row, Instant.parse("2001-01-01T01:00:00.00Z"), + "1000000001", + "INSERT")); dynamoDBRecords.getRecords().add( getRecordAdapter(Collections.singletonMap("col1", new AttributeValue().withS("key2")), - null, Instant.parse("2001-01-01T01:00:00.00Z"), - "1000000002", - "REMOVE")); + null, Instant.parse("2001-01-01T01:00:00.00Z"), + "1000000002", + "REMOVE")); DynamoDBSourceTask task = new SourceTaskBuilder() .withOffset(offset) @@ -575,14 +583,14 @@ public void onSyncPollReturnsReceivedRecordsWithCorrectOperationChosen() throws row.put("col1", new AttributeValue("key1")); dynamoDBRecords.getRecords().add(getRecordAdapter(Collections.singletonMap("col1", new AttributeValue().withN("key1")), - row, Instant.parse("2001-01-01T01:00:00.00Z"), "1000000001", - "INSERT")); + row, Instant.parse("2001-01-01T01:00:00.00Z"), "1000000001", + "INSERT")); dynamoDBRecords.getRecords().add(getRecordAdapter(Collections.singletonMap("col1", new AttributeValue().withN("key1")), - row, Instant.parse("2001-01-01T01:00:00.00Z"), "1000000002", - "MODIFY")); + row, Instant.parse("2001-01-01T01:00:00.00Z"), "1000000002", + "MODIFY")); dynamoDBRecords.getRecords().add(getRecordAdapter(Collections.singletonMap("col1", new AttributeValue().withN("key1")), - row, Instant.parse("2001-01-01T01:00:00.00Z"), "1000000003", - "REMOVE")); + row, Instant.parse("2001-01-01T01:00:00.00Z"), "1000000003", + "REMOVE")); DynamoDBSourceTask task = new SourceTaskBuilder() @@ -619,8 +627,8 @@ public void onSyncPollReturnsDeletedRecordAndTombstoneMessage() throws Interrupt row.put("col1", new AttributeValue("key1")); dynamoDBRecords.getRecords().add(getRecordAdapter(Collections.singletonMap("col1", new AttributeValue().withN("key1")), - row, Instant.parse("2001-01-01T01:00:00.00Z"), "1000000003", - "REMOVE")); + row, Instant.parse("2001-01-01T01:00:00.00Z"), "1000000003", + "REMOVE")); DynamoDBSourceTask task = new SourceTaskBuilder() @@ -656,11 +664,11 @@ public void onSyncPollSkipsRecordsWhichHappenedBeforeTheLastInitSync() throws In row.put("col1", new AttributeValue("key1")); dynamoDBRecords.getRecords().add(getRecordAdapter(Collections.singletonMap("col1", new AttributeValue().withN("key1")), - row, Instant.parse("2001-01-01T01:00:00.00Z"), "1000000001", - "INSERT")); + row, Instant.parse("2001-01-01T01:00:00.00Z"), "1000000001", + "INSERT")); dynamoDBRecords.getRecords().add(getRecordAdapter(Collections.singletonMap("col1", new AttributeValue().withN("key1")), - row, Instant.parse("2000-12-12T23:01:00.00Z"), "1000000002", - "INSERT")); + row, Instant.parse("2000-12-12T23:01:00.00Z"), "1000000002", + "INSERT")); DynamoDBSourceTask task = new SourceTaskBuilder() .withOffset(offset) @@ -702,12 +710,12 @@ public void onSyncPollLogsErrorAndDropsRecordsWhichCannotBeParsed() throws Inter row.put("col1", new AttributeValue("key1")); dynamoDBRecords.getRecords().add(getRecordAdapter(Collections.singletonMap("col1", new AttributeValue().withN("key1")), - row, Instant.parse("2001-01-01T01:00:00.00Z"), - "10000000000000000000001", - "INSERT")); + row, Instant.parse("2001-01-01T01:00:00.00Z"), + "10000000000000000000001", + "INSERT")); dynamoDBRecords.getRecords().add(getRecordAdapter(Collections.singletonMap("col1", new AttributeValue().withN("key1")), - row, Instant.parse("2001-01-01T01:00:00.00Z"), - "10000000000000000000002" + row, Instant.parse("2001-01-01T01:00:00.00Z"), + "10000000000000000000002" , "INVALID")); DynamoDBSourceTask task = new SourceTaskBuilder() @@ -749,9 +757,9 @@ public void onSyncPollReturnsNullAndStartsInitSyncIfAnyOneRecordEventArrivedToLa row.put("col1", new AttributeValue("key1")); dynamoDBRecords.getRecords().add(getRecordAdapter(Collections.singletonMap("col1", new AttributeValue().withN("key1")), - row, Instant.parse("2001-01-03T15:00:00.00Z"), "s1", "INSERT")); + row, Instant.parse("2001-01-03T15:00:00.00Z"), "s1", "INSERT")); dynamoDBRecords.getRecords().add(getRecordAdapter(Collections.singletonMap("col1", new AttributeValue().withN("key1")), - row, Instant.parse("2001-01-03T00:00:00.00Z"), "s2", "INSERT")); + row, Instant.parse("2001-01-03T00:00:00.00Z"), "s2", "INSERT")); DynamoDBSourceTask task = new SourceTaskBuilder() .withOffset(offset) diff --git a/source/src/test/java/com/trustpilot/connector/dynamodb/KafkaConnectITBase.java b/source/src/test/java/com/trustpilot/connector/dynamodb/KafkaConnectITBase.java new file mode 100644 index 0000000..cc0a713 --- /dev/null +++ b/source/src/test/java/com/trustpilot/connector/dynamodb/KafkaConnectITBase.java @@ -0,0 +1,250 @@ +package com.trustpilot.connector.dynamodb; + +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.document.DynamoDB; +import com.amazonaws.services.dynamodbv2.document.Item; +import com.amazonaws.services.dynamodbv2.document.Table; +import com.amazonaws.services.dynamodbv2.model.*; +import com.github.dockerjava.zerodep.shaded.org.apache.hc.core5.http.HttpStatus; +import com.google.gson.Gson; +import com.trustpilot.connector.dynamodb.aws.AwsClients; +import com.trustpilot.connector.dynamodb.testcontainers.ConnectContainer; +import com.trustpilot.connector.dynamodb.testcontainers.DynamoDBContainer; +import com.trustpilot.connector.dynamodb.testcontainers.MockServerContainerExt; +import com.trustpilot.connector.dynamodb.testcontainers.SchemaRegistryContainer; +import io.restassured.http.ContentType; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.junit.jupiter.api.BeforeAll; +import org.mockserver.client.MockServerClient; +import org.mockserver.model.JsonBody; +import org.rnorth.ducttape.unreliables.Unreliables; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.utility.DockerImageName; + +import java.io.Serializable; +import java.time.Duration; +import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static io.restassured.RestAssured.given; +import static org.mockserver.model.HttpRequest.request; +import static org.mockserver.model.HttpResponse.response; + +public class KafkaConnectITBase { + protected static final String AMAZON_DYNAMO_IMAGE = "amazon/dynamodb-local:1.13.5"; + protected static final String KAFKA_IMAGE = "confluentinc/cp-kafka:5.3.1"; + protected static final String SCHEMA_REGISTRY_IMAGE = "confluentinc/cp-schema-registry:5.3.1"; + protected static final String CONNECT_IMAGE = "confluentinc/cp-kafka-connect:5.3.1"; + protected static final String MOCK_SERVER_IMAGE = "jamesdbloom/mockserver:mockserver-5.11.0"; + + protected static final String AWS_REGION_CONFIG = "eu-west-3"; + protected static final String AWS_ACCESS_KEY_ID_CONFIG = "ABCD"; + protected static final String AWS_SECRET_KEY_CONFIG = "1234"; + protected static final String SRC_DYNAMODB_TABLE_INGESTION_TAG_KEY_CONFIG = "datalake-ingest"; + + private static Network network; + private static KafkaContainer kafka; + private static SchemaRegistryContainer schemaRegistry; + private static ConnectContainer connect; + private static DynamoDBContainer dynamodb; + private static MockServerContainerExt mockServer; + private static MockServerClient fakeTaggingAPI; + + @BeforeAll + public static void dockerSetup() { + network = Network.newNetwork(); + + dynamodb = new DynamoDBContainer(DockerImageName.parse(AMAZON_DYNAMO_IMAGE)) + .withNetwork(network); + + kafka = new KafkaContainer(DockerImageName.parse(KAFKA_IMAGE)) + .withEnv("KAFKA_ADVERTISED_LISTENERS", "PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:9092") + .withNetworkAliases("broker") + .withEmbeddedZookeeper() + .withNetwork(network); + + schemaRegistry = new SchemaRegistryContainer(DockerImageName.parse(SCHEMA_REGISTRY_IMAGE)) + .withNetwork(network) + .withNetworkAliases("mock-server") + .withKafka(kafka) + .dependsOn(kafka); + + mockServer = (MockServerContainerExt) new MockServerContainerExt(DockerImageName.parse(MOCK_SERVER_IMAGE)) + .withNetwork(network) + .withEnv("MOCKSERVER_LIVENESS_HTTP_GET_PATH", "/health") + .waitingFor(Wait.forHttp("/health").forStatusCode(200)); + + connect = new ConnectContainer(DockerImageName.parse(CONNECT_IMAGE), kafka, schemaRegistry) + .withNetworkAliases("connect") + .withNetwork(network) + .withPlugins("../build/libs/") + .dependsOn(schemaRegistry, mockServer); + + Startables.deepStart(Stream.of( + dynamodb, + kafka, + schemaRegistry, + mockServer, + connect + )).join(); + + fakeTaggingAPI = new MockServerClient(mockServer.getHost(), mockServer.getServerPort()); + } + + protected void mockTaggingAPIResponse(String url, String responseBody) { + fakeTaggingAPI + .when(request() + .withPath(url)) + .respond(response() + .withBody(new JsonBody(responseBody))); + } + + protected void registerConnector(String name) { + ConnectorConfig connector = createBaseConfig(name); + connector.config.put("resource.tagging.service.endpoint", mockServer.getInternalEndpoint()); + postConnector(connector); + } + + protected void registerConnector(String name, List tablesWhitelist) { + ConnectorConfig connector = createBaseConfig(name); + connector.config.put("dynamodb.table.whitelist", String.join(",", tablesWhitelist)); + postConnector(connector); + } + + private void postConnector(ConnectorConfig connector) { + given() + .log().all() + .contentType(ContentType.JSON) + .accept(ContentType.JSON) + .body(new Gson().toJson(connector)) + .when() + .post(connect.getEndpoint() + "/connectors") + .andReturn() + .then() + .log().all() + .statusCode(HttpStatus.SC_CREATED); + } + + protected KafkaConsumer getConsumer() { + return new KafkaConsumer<>( + new HashMap() {{ + put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); + put(ConsumerConfig.GROUP_ID_CONFIG, "tc-" + UUID.randomUUID()); + put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + }}, + new StringDeserializer(), new StringDeserializer()); + } + + protected List> drain( + KafkaConsumer consumer, + int expectedRecordCount) { + + List> allRecords = new ArrayList<>(); + + Unreliables.retryUntilTrue(200, TimeUnit.SECONDS, () -> { + consumer.poll(Duration.ofMillis(2000)) + .iterator() + .forEachRemaining(allRecords::add); + + return allRecords.size() == expectedRecordCount; + }); + + return allRecords; + } + + protected CreateTableRequest newReplicationTableRequest() { + return new CreateTableRequest() + .withProvisionedThroughput(new ProvisionedThroughput() + .withReadCapacityUnits(5L) + .withWriteCapacityUnits(5L)) + .withStreamSpecification( + new StreamSpecification() + .withStreamEnabled(true) + .withStreamViewType(StreamViewType.NEW_IMAGE)); + } + + protected CreateTableResult newDynamoDBTable(CreateTableRequest createTableRequest) { + AmazonDynamoDB client = getDynamoDBClient(); + + return client.createTable(createTableRequest); + } + + protected void putDynamoDBItems(String tableName, List itemStream) { + Table table = new DynamoDB(getDynamoDBClient()).getTable(tableName); + itemStream.forEach(item -> table.putItem(item)); + } + + private AmazonDynamoDB getDynamoDBClient() { + return AwsClients.buildDynamoDbClient( + AWS_REGION_CONFIG, + dynamodb.getEndpoint(), + AWS_ACCESS_KEY_ID_CONFIG, + AWS_SECRET_KEY_CONFIG + ); + } + + private ConnectorConfig createBaseConfig(String name) { + return new ConnectorConfig(name, new HashMap() {{ + put("name", name); + put("connector.class", "com.trustpilot.connector.dynamodb.DynamoDBSourceConnector"); + put("aws.access.key.id", AWS_ACCESS_KEY_ID_CONFIG); + put("aws.secret.key", AWS_SECRET_KEY_CONFIG); + put("aws.region", AWS_REGION_CONFIG); + put("dynamodb.service.endpoint", dynamodb.getInternalEndpoint()); + put("init.sync.delay.period", "10"); + }}); + } + + static class ConnectorConfig implements Serializable { + private String name; + private Map config; + + public ConnectorConfig(String name, Map config) { + this.name = name; + this.config = config; + } + } + + static class ResourceTags implements Serializable { + static class Tag implements Serializable { + public Tag(String key, String value) { + Value = value; + Key = key; + } + + public String Value; + public String Key; + } + + static class Mapping implements Serializable { + public String ResourceARN; + public List Tags; + + public Mapping(String resourceARN, List tags) { + ResourceARN = resourceARN; + Tags = tags; + } + } + + public List ResourceTagMappingList; + + public ResourceTags(List tableArns, String tagName) { + ResourceTagMappingList = tableArns + .stream() + .map(ta -> new Mapping(ta, Arrays.asList(new Tag(tagName, null)))) + .collect(Collectors.toList()); + } + + public String asJson() { + return new Gson().toJson(this); + } + } +} diff --git a/source/src/test/java/com/trustpilot/connector/dynamodb/kcl/KclWorkerImplTests.java b/source/src/test/java/com/trustpilot/connector/dynamodb/kcl/KclWorkerImplTests.java index f112d05..12b7bdd 100644 --- a/source/src/test/java/com/trustpilot/connector/dynamodb/kcl/KclWorkerImplTests.java +++ b/source/src/test/java/com/trustpilot/connector/dynamodb/kcl/KclWorkerImplTests.java @@ -1,6 +1,7 @@ package com.trustpilot.connector.dynamodb.kcl; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.model.BillingMode; import com.amazonaws.services.dynamodbv2.model.DescribeTableRequest; import com.amazonaws.services.dynamodbv2.model.DescribeTableResult; import com.amazonaws.services.dynamodbv2.model.TableDescription; @@ -9,12 +10,12 @@ import org.junit.jupiter.api.Test; import org.mockito.ArgumentMatchers; import org.mockito.Mockito; -import static org.mockito.Mockito.when; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.when; public class KclWorkerImplTests { @@ -33,6 +34,8 @@ void initializationRegistersNewShardToRegistry() { KclWorkerImpl kclWorker = new KclWorkerImpl(null, queue, shardRegister); String tableName = "testTableName1"; String taskId = "task1"; + String serviceEndpoint = "http://localhost:8000"; + BillingMode kclTableBillingMode = BillingMode.PROVISIONED; AmazonDynamoDB dynamoDBClient = Mockito.mock(AmazonDynamoDB.class); TableDescription table = new TableDescription().withTableArn("testArn1"); @@ -40,10 +43,11 @@ void initializationRegistersNewShardToRegistry() { when(dynamoDBClient.describeTable(ArgumentMatchers.any())).thenReturn(result); // Act - KinesisClientLibConfiguration clientLibConfiguration = kclWorker.getClientLibConfiguration(tableName, taskId, dynamoDBClient); + KinesisClientLibConfiguration clientLibConfiguration = kclWorker.getClientLibConfiguration(tableName, taskId, dynamoDBClient, serviceEndpoint, kclTableBillingMode); // Assert assertEquals("datalake-KCL-testTableName1", clientLibConfiguration.getApplicationName()); assertEquals("datalake-KCL-testTableName1-worker-task1", clientLibConfiguration.getWorkerIdentifier()); + assertEquals(serviceEndpoint, clientLibConfiguration.getDynamoDBEndpoint()); } } diff --git a/source/src/test/java/com/trustpilot/connector/dynamodb/testcontainers/ConnectContainer.java b/source/src/test/java/com/trustpilot/connector/dynamodb/testcontainers/ConnectContainer.java new file mode 100644 index 0000000..1081939 --- /dev/null +++ b/source/src/test/java/com/trustpilot/connector/dynamodb/testcontainers/ConnectContainer.java @@ -0,0 +1,68 @@ +package com.trustpilot.connector.dynamodb.testcontainers; + +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.MountableFile; + +import java.io.File; +import java.util.Set; +import java.util.stream.Stream; + +public class ConnectContainer extends GenericContainer { + private static final int PORT = 8083; + private static final String PLUGIN_PATH_CONTAINER = "/usr/share/java/"; + + private static final String networkAlias = "connect"; + + public ConnectContainer(DockerImageName dockerImageName, KafkaContainer kafka, SchemaRegistryContainer schemaRegistry) { + super(dockerImageName); + withNetwork(kafka.getNetwork()); + addEnv("CONNECT_BOOTSTRAP_SERVERS", String.format("%s:%d", kafka.getNetworkAliases().get(0), 9092)); + addEnv("CONNECT_REST_ADVERTISED_HOST_NAME", networkAlias); + addEnv("CONNECT_REST_PORT", Integer.toString(PORT)); + addEnv("CONNECT_GROUP_ID", "connect-1"); + addEnv("CONNECT_CONFIG_STORAGE_TOPIC", "connect-config"); + addEnv("CONNECT_OFFSET_STORAGE_TOPIC", "connect-offsets"); + addEnv("CONNECT_STATUS_STORAGE_TOPIC", "connect-status"); + addEnv("CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR", "1"); + addEnv("CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR", "1"); + addEnv("CONNECT_STATUS_STORAGE_REPLICATION_FACTOR", "1"); + addEnv("CONNECT_LOG4J_ROOT_LOGLEVEL", "INFO"); + addEnv("CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL", + String.format("http://%s:%s", schemaRegistry.getNetworkAliases().get(0), schemaRegistry.getExposedPorts().get(0))); + addEnv("CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL", + String.format("http://%s:%s", schemaRegistry.getNetworkAliases().get(0), schemaRegistry.getExposedPorts().get(0))); + addEnv("CONNECT_VALUE_CONVERTER", "io.confluent.connect.avro.AvroConverter"); + addEnv("CONNECT_KEY_CONVERTER", "io.confluent.connect.avro.AvroConverter"); + addEnv("CONNECT_INTERNAL_KEY_CONVERTER", "org.apache.kafka.connect.json.JsonConverter"); + addEnv("CONNECT_INTERNAL_VALUE_CONVERTER", "org.apache.kafka.connect.json.JsonConverter"); + addEnv("CONNECT_PLUGIN_PATH", PLUGIN_PATH_CONTAINER); + withExposedPorts(PORT); + } + + @Override + protected void configure() { + super.configure(); + } + + public ConnectContainer withPlugins(Set plugins) { + if (plugins == null) { + return this; + } + plugins.forEach(this::withPlugins); + return this; + } + + public ConnectContainer withPlugins(String pluginPath) { + Stream.of(new File(pluginPath).listFiles()) + .forEach(f -> this.withCopyFileToContainer(MountableFile.forHostPath(f.getPath()), PLUGIN_PATH_CONTAINER)); + return this; + } + + public String getEndpoint() { + return String.format("http://%s:%d", getHost(), getMappedPort(PORT)); + } + +} + diff --git a/source/src/test/java/com/trustpilot/connector/dynamodb/testcontainers/DynamoDBContainer.java b/source/src/test/java/com/trustpilot/connector/dynamodb/testcontainers/DynamoDBContainer.java new file mode 100644 index 0000000..7d0a701 --- /dev/null +++ b/source/src/test/java/com/trustpilot/connector/dynamodb/testcontainers/DynamoDBContainer.java @@ -0,0 +1,25 @@ +package com.trustpilot.connector.dynamodb.testcontainers; + +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.utility.DockerImageName; + +public class DynamoDBContainer extends GenericContainer { + + private static final int PORT = 8000; + private static final String networkAlias = "dynamodb"; + + + public DynamoDBContainer(DockerImageName dockerImageName) { + super(dockerImageName); + this.withExposedPorts(PORT); + this.withNetworkAliases(networkAlias); + } + + public String getEndpoint() { + return String.format("http://%s:%d", getHost(), getMappedPort(PORT)); + } + + public String getInternalEndpoint() { + return String.format("http://%s:%d", getNetworkAliases().get(0), getExposedPorts().get(0)); + } +} diff --git a/source/src/test/java/com/trustpilot/connector/dynamodb/testcontainers/MockServerContainerExt.java b/source/src/test/java/com/trustpilot/connector/dynamodb/testcontainers/MockServerContainerExt.java new file mode 100644 index 0000000..41bde20 --- /dev/null +++ b/source/src/test/java/com/trustpilot/connector/dynamodb/testcontainers/MockServerContainerExt.java @@ -0,0 +1,14 @@ +package com.trustpilot.connector.dynamodb.testcontainers; + +import org.testcontainers.containers.MockServerContainer; +import org.testcontainers.utility.DockerImageName; + +public class MockServerContainerExt extends MockServerContainer { + public MockServerContainerExt(DockerImageName dockerImageName) { + super(dockerImageName); + } + + public String getInternalEndpoint() { + return String.format("http://%s:%d", getNetworkAliases().get(0), getExposedPorts().get(0)); + } +} diff --git a/source/src/test/java/com/trustpilot/connector/dynamodb/testcontainers/SchemaRegistryContainer.java b/source/src/test/java/com/trustpilot/connector/dynamodb/testcontainers/SchemaRegistryContainer.java new file mode 100644 index 0000000..16bdfd5 --- /dev/null +++ b/source/src/test/java/com/trustpilot/connector/dynamodb/testcontainers/SchemaRegistryContainer.java @@ -0,0 +1,26 @@ +package com.trustpilot.connector.dynamodb.testcontainers; + +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.utility.DockerImageName; + + +public class SchemaRegistryContainer extends GenericContainer { + private static final int PORT = 8081; + private static final String networkAlias = "schema-registry"; + + public SchemaRegistryContainer(final DockerImageName dockerImageName) { + super(dockerImageName); + addEnv("SCHEMA_REGISTRY_HOST_NAME", "localhost"); + withExposedPorts(PORT); + } + + public SchemaRegistryContainer withKafka(final KafkaContainer kafka) { + withNetwork(kafka.getNetwork()); + withNetworkAliases(networkAlias); + addEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", + String.format("%s:%d", kafka.getNetworkAliases().get(0), 9092)); + + return this; + } +} \ No newline at end of file