diff --git a/common/src/test/java/com/amazonaws/services/schemaregistry/common/configs/GlueSchemaRegistryConfigurationTest.java b/common/src/test/java/com/amazonaws/services/schemaregistry/common/configs/GlueSchemaRegistryConfigurationTest.java index 249c4605..70613e06 100644 --- a/common/src/test/java/com/amazonaws/services/schemaregistry/common/configs/GlueSchemaRegistryConfigurationTest.java +++ b/common/src/test/java/com/amazonaws/services/schemaregistry/common/configs/GlueSchemaRegistryConfigurationTest.java @@ -97,6 +97,7 @@ public void testBuildConfig_fromMap_succeeds() { public void testBuildConfig_noRegionConfigsSupplied_throwsException() { Map configWithoutRegion = new HashMap<>(); configWithoutRegion.put(AWSSchemaRegistryConstants.AWS_ENDPOINT, "https://test/"); + System.setProperty("aws.profile", ""); Exception exception = assertThrows(AWSSchemaRegistryException.class, () -> new GlueSchemaRegistryConfiguration(configWithoutRegion)); diff --git a/integration-tests/docker-compose.yml b/integration-tests/docker-compose.yml index 7a599707..a7d133aa 100644 --- a/integration-tests/docker-compose.yml +++ b/integration-tests/docker-compose.yml @@ -21,3 +21,18 @@ services: - KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 - ALLOW_PLAINTEXT_LISTENER=yes + + localstack: + container_name: "${LOCALSTACK_DOCKER_NAME-localstack_main}" + image: 'public.ecr.aws/localstack/localstack:latest' + ports: + - "127.0.0.1:4566:4566" + environment: + - SERVICES=cloudwatch, dynamodb, kinesis + - DEBUG=1 + - DOCKER_HOST=unix:///var/run/docker.sock + - DEFAULT_REGION=us-east-2 + - PARITY_AWS_ACCESS_KEY_ID=1 + volumes: + - "${LOCALSTACK_VOLUME_DIR:-./volume}:/var/lib/localstack" + - "/var/run/docker.sock:/var/run/docker.sock" diff --git a/integration-tests/run-local-tests.sh b/integration-tests/run-local-tests.sh index d9b6cf42..66c797cf 100644 --- a/integration-tests/run-local-tests.sh +++ b/integration-tests/run-local-tests.sh @@ -115,7 +115,7 @@ cleanUpConnectFiles() { cleanUpDockerResources || true # Start Kafka using docker command asynchronously -docker-compose up & +docker-compose up --no-attach localstack & sleep 10 ## Run mvn tests for Kafka and Kinesis Platforms cd .. && mvn --file integration-tests/pom.xml verify -Psurefire -X && cd integration-tests @@ -131,7 +131,7 @@ downloadMongoDBConnector copyGSRConverters runConnectTests() { - docker-compose up & + docker-compose up --no-attach localstack & setUpMongoDBLocal startKafkaConnectTasks ${1} echo "Waiting for Sink task to pick up data.." diff --git a/integration-tests/src/test/java/com/amazonaws/services/schemaregistry/integrationtests/kafka/KafkaHelper.java b/integration-tests/src/test/java/com/amazonaws/services/schemaregistry/integrationtests/kafka/KafkaHelper.java index 18baeb51..83c0996d 100644 --- a/integration-tests/src/test/java/com/amazonaws/services/schemaregistry/integrationtests/kafka/KafkaHelper.java +++ b/integration-tests/src/test/java/com/amazonaws/services/schemaregistry/integrationtests/kafka/KafkaHelper.java @@ -229,7 +229,7 @@ public void doKafkaStreamsProcess(final ProducerProperties producerProperties) t final KafkaStreams streams = new KafkaStreams(builder.build(), properties); streams.cleanUp(); streams.start(); - Thread.sleep(1000L); + Thread.sleep(5000L); streams.close(); log.info("Finish processing {} message streaming via Kafka.", producerProperties.getDataFormat()); @@ -360,9 +360,9 @@ private Properties getKafkaConsumerProperties(final ConsumerProperties consumerP private Properties getKafkaStreamsProperties(final ProducerProperties producerProperties) { Properties properties = new Properties(); - properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-test"); + properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-test-"+producerProperties.getDataFormat()); properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapBrokers); - properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + properties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GlueSchemaRegistryKafkaStreamsSerde.class); properties.put(AWSSchemaRegistryConstants.DATA_FORMAT, producerProperties.getDataFormat()); diff --git a/integration-tests/src/test/java/com/amazonaws/services/schemaregistry/integrationtests/kinesis/GlueSchemaRegistryKinesisIntegrationTest.java b/integration-tests/src/test/java/com/amazonaws/services/schemaregistry/integrationtests/kinesis/GlueSchemaRegistryKinesisIntegrationTest.java index 68a03098..608b8eaf 100644 --- a/integration-tests/src/test/java/com/amazonaws/services/schemaregistry/integrationtests/kinesis/GlueSchemaRegistryKinesisIntegrationTest.java +++ b/integration-tests/src/test/java/com/amazonaws/services/schemaregistry/integrationtests/kinesis/GlueSchemaRegistryKinesisIntegrationTest.java @@ -17,9 +17,6 @@ import cloud.localstack.Constants; import cloud.localstack.ServiceName; -import cloud.localstack.awssdkv2.TestUtils; -import cloud.localstack.docker.LocalstackDockerExtension; -import cloud.localstack.docker.annotation.LocalstackDockerProperties; import com.amazonaws.services.kinesis.producer.KinesisProducer; import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration; import com.amazonaws.services.kinesis.producer.UserRecordResult; @@ -49,7 +46,6 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -106,15 +102,11 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; -@ExtendWith(LocalstackDockerExtension.class) -@LocalstackDockerProperties(services = {ServiceName.KINESIS, ServiceName.DYNAMO, ServiceName.CLOUDWATCH}, imageName = - "public.ecr.aws/d4c7g6k3/localstack", imageTag = "0.12.10") public class GlueSchemaRegistryKinesisIntegrationTest { private static final Logger LOGGER = LogManager.getLogger(GlueSchemaRegistryKinesisIntegrationTest.class); - private static final DynamoDbAsyncClient dynamoClient = TestUtils.getClientDyanamoAsyncV2(); - private static final CloudWatchAsyncClient cloudWatchClient = TestUtils.getClientCloudWatchAsyncV2(); private static final String LOCALSTACK_HOSTNAME = "localhost"; private static final int LOCALSTACK_KINESIS_PORT = 4566; + private static final String LOCALSTACK_ENDPOINT = String.format("http://%s:%d",LOCALSTACK_HOSTNAME,LOCALSTACK_KINESIS_PORT); private static final int LOCALSTACK_CLOUDWATCH_PORT = Constants.DEFAULT_PORTS.get(ServiceName.CLOUDWATCH) .intValue(); private static final int KCL_SCHEDULER_START_UP_WAIT_TIME_SECONDS = 15; @@ -140,6 +132,26 @@ public class GlueSchemaRegistryKinesisIntegrationTest { private static AwsCredentialsProvider awsCredentialsProvider = DefaultCredentialsProvider.builder() .build(); + private static final DynamoDbAsyncClient dynamoClient; + private static final CloudWatchAsyncClient cloudWatchClient; + + static { + try { + dynamoClient = DynamoDbAsyncClient.builder() + .endpointOverride(new URI(LOCALSTACK_ENDPOINT)) + .region(Region.of(GlueSchemaRegistryConnectionProperties.REGION)) + .credentialsProvider(awsCredentialsProvider) + .build(); + cloudWatchClient = CloudWatchAsyncClient.builder() + .endpointOverride(new URI(LOCALSTACK_ENDPOINT)) + .region(Region.of(GlueSchemaRegistryConnectionProperties.REGION)) + .credentialsProvider(awsCredentialsProvider) + .build(); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } + private static List schemasToCleanUp = new ArrayList<>(); private final TestDataGeneratorFactory testDataGeneratorFactory = new TestDataGeneratorFactory(); private final GlueSchemaRegistrySerializerFactory glueSchemaRegistrySerializerFactory = @@ -219,9 +231,12 @@ private static Stream testSingleKCLKPLDataProvider() { } @BeforeEach - public void setUp() throws InterruptedException, ExecutionException { + public void setUp() throws InterruptedException, ExecutionException, URISyntaxException { System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false"); - kinesisClient = TestUtils.getClientKinesisAsyncV2(); + kinesisClient = KinesisAsyncClient.builder() + .endpointOverride(new URI(LOCALSTACK_ENDPOINT)) + .region(Region.of(GlueSchemaRegistryConnectionProperties.REGION)) + .build(); streamName = String.format("%s%s", TEST_KINESIS_STREAM_PREFIX, RandomStringUtils.randomAlphanumeric(4)); LOGGER.info("Creating Kinesis Stream : {} with {} shards on localStack..", streamName, SHARD_COUNT); @@ -496,7 +511,7 @@ private String produceRecordsWithKPL(String streamName, byte[] serializedBytes = dataFormatSerializer.serialize(record); putFutures.add(producer.addUserRecord(streamName, Long.toString(timestamp.toEpochMilli()), null, - ByteBuffer.wrap(serializedBytes), gsrSchema)); + ByteBuffer.wrap(serializedBytes),gsrSchema)); } String shardId = null; diff --git a/pom.xml b/pom.xml index b5b7c49b..e36c2a72 100644 --- a/pom.xml +++ b/pom.xml @@ -80,10 +80,10 @@ UTF-8 UTF-8 software.amazon.glue - 2.18.4 - 1.12.151 + 2.22.12 + 1.12.633 2.12 - 3.6.0 + 3.6.1 1.11.3 1.0.39 1.14.2 @@ -105,16 +105,16 @@ 1.1 2.2.9 - LATEST + + 0.15.8 1.6.2 1.3.2 32.0.0-jre 3.8.1 1.2 3.0.0 - 0.2.11 3.19.6 - 0.2.11 + 0.2.23 1.1.5