Skip to content

Commit

Permalink
Updated to camel 4.3.0
Browse files Browse the repository at this point in the history
  • Loading branch information
valdar committed Feb 13, 2024
1 parent 3d0ab78 commit e835377
Show file tree
Hide file tree
Showing 21 changed files with 97 additions and 33 deletions.
7 changes: 3 additions & 4 deletions parent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

<kafka.version>3.5.1</kafka.version>
<camel.version>4.0.0</camel.version>
<camel.kamelet.catalog.version>4.0.0</camel.kamelet.catalog.version>
<camel.version>4.3.0</camel.version>
<camel.kamelet.catalog.version>4.3.0</camel.kamelet.catalog.version>
<apicurio.registry.version>1.3.2.Final</apicurio.registry.version>
<resteasy.version>4.5.6.Final</resteasy.version>
<version.java>17</version.java>
Expand Down Expand Up @@ -58,8 +58,7 @@
<version.plexus.build.api>0.0.7</version.plexus.build.api>

<mycila-license-version>3.0</mycila-license-version>
<gmavenplus-plugin-version>1.11.1</gmavenplus-plugin-version>
<groovy-version>3.0.12</groovy-version>
<groovy-version>3.0.20</groovy-version>

<itest.zookeeper.container.image>quay.io/strimzi/kafka:0.32.0-kafka-3.2.1</itest.zookeeper.container.image>
<itest.strimzi.container.image>quay.io/strimzi/kafka:0.32.0-kafka-3.2.1</itest.strimzi.container.image>
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<parent>
<groupId>org.apache.camel</groupId>
<artifactId>camel-dependencies</artifactId>
<version>4.0.0</version>
<version>4.3.0</version>
</parent>

<groupId>org.apache.camel.kafkaconnector</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,12 @@
import software.amazon.awssdk.services.sqs.model.CreateQueueResponse;
import software.amazon.awssdk.services.sqs.model.DeleteQueueRequest;
import software.amazon.awssdk.services.sqs.model.DeleteQueueResponse;
import software.amazon.awssdk.services.sqs.model.GetQueueAttributesRequest;
import software.amazon.awssdk.services.sqs.model.GetQueueAttributesResponse;
import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest;
import software.amazon.awssdk.services.sqs.model.GetQueueUrlResponse;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
import software.amazon.awssdk.services.sqs.model.QueueDoesNotExistException;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
Expand All @@ -56,7 +59,6 @@ public String getQueue(String queue) {
return getQueueUrlResult.queueUrl();
}


public String createQueue(String queue) {
final CreateQueueRequest createFifoQueueRequest = CreateQueueRequest.builder()
.queueName(queue)
Expand Down Expand Up @@ -161,4 +163,13 @@ public String getOrCreateQueue(String queue) {

return queueUrl;
}

public String getQueueArnFromUrl(String queueUrl) {
GetQueueAttributesRequest getQueueAttributesRequest = GetQueueAttributesRequest.builder()
.queueUrl(queueUrl)
.attributeNames(QueueAttributeName.QUEUE_ARN).build();
GetQueueAttributesResponse getQueueAttributesResponse = sqs.getQueueAttributes(getQueueAttributesRequest);

return getQueueAttributesResponse.attributes().get(QueueAttributeName.QUEUE_ARN);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

import org.apache.camel.kafkaconnector.CamelSinkTask;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.clients.kafka.ByteProducerPropertyFactory;
import org.apache.camel.kafkaconnector.common.clients.kafka.ByteArrayProducerPropertyFactory;
import org.apache.camel.kafkaconnector.common.clients.kafka.ConsumerPropertyFactory;
import org.apache.camel.kafkaconnector.common.clients.kafka.DefaultConsumerPropertyFactory;
import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
Expand All @@ -40,7 +40,6 @@
import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
import org.apache.camel.test.infra.aws2.services.AWSServiceFactoryWithTimeout;
import org.apache.camel.test.infra.common.TestUtils;
import org.apache.kafka.common.utils.Bytes;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
Expand Down Expand Up @@ -70,32 +69,33 @@ public class CamelSinkLambdaITCase extends CamelSinkTestSupport {
private final int expect = 1;


private static class CustomProducer extends AbstractTestMessageProducer<Bytes> {
private static class CustomProducer extends AbstractTestMessageProducer<byte[]> {
public CustomProducer(String bootstrapServer, String topicName, int count) {
super(bootstrapServer, topicName, count);
}

@Override
protected KafkaClient<String, Bytes> createKafkaClient(String bootstrapServer) {
protected KafkaClient<String, byte[]> createKafkaClient(String bootstrapServer) {
ConsumerPropertyFactory consumerPropertyFactory = new DefaultConsumerPropertyFactory(bootstrapServer);
ProducerPropertyFactory producerPropertyFactory = new ByteProducerPropertyFactory(bootstrapServer);
ProducerPropertyFactory producerPropertyFactory = new ByteArrayProducerPropertyFactory(bootstrapServer);

return new KafkaClient<>(consumerPropertyFactory, producerPropertyFactory);
}

@Override
public Bytes testMessageContent(int current) {
public byte[] testMessageContent(int current) {

try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
ZipOutputStream zip = new ZipOutputStream(out);

ZipEntry entry = new ZipEntry("test");
ZipEntry entry = new ZipEntry("test.class");
zip.putNextEntry(entry);
zip.write("hello test".getBytes());
zip.closeEntry();
zip.finish();

return Bytes.wrap(out.toByteArray());
// return Bytes.wrap(out.toByteArray());
return out.toByteArray();
} catch (IOException e) {
LOG.error("I/O error writing zip entry: {}", e.getMessage(), e);
fail("I/O error writing zip entry");
Expand All @@ -105,16 +105,16 @@ public Bytes testMessageContent(int current) {
}

@Override
public Map<String, String> messageHeaders(Bytes text, int current) {
public Map<String, String> messageHeaders(byte[] text, int current) {
Map<String, String> headers = new HashMap<>();

headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsLambdaOperation",
"createFunction");

headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsLambdaRole",
"admin");
"arn:aws:iam::123456789012:role/admin");
headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsLambdaRuntime",
"java8");
software.amazon.awssdk.services.lambda.model.Runtime.JAVA8.toString());
headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsLambdaHandler",
"org.apache.camel.kafkaconnector.SomeHandler");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,15 @@ public void setUp() {
awsSnsClient = new AWSSNSClient(AWSSDKClientUtils.newSNSClient());

queueName = AWSCommon.DEFAULT_SQS_QUEUE_FOR_SNS + "-" + TestUtils.randomWithRange(0, 1000);
sqsQueueUrl = awsSqsClient.createQueue(queueName);
sqsQueueUrl = awsSqsClient.getOrCreateQueue(queueName);

LOG.info("Created SQS queue {}", sqsQueueUrl);

snsTopicUrl = awsSnsClient.createTopic(queueName);

LOG.info("Created SNS topic {}", snsTopicUrl);

awsSnsClient.subscribeSQS(snsTopicUrl, sqsQueueUrl);
awsSnsClient.subscribeSQS(snsTopicUrl, awsSqsClient.getQueueArnFromUrl(sqsQueueUrl));

LOG.info("Created subscription between SQS queue {} and SNS topic {}", sqsQueueUrl, snsTopicUrl);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void setUp() {
queueName = AWSCommon.BASE_SQS_QUEUE_NAME + "-" + TestUtils.randomWithRange(0, 1000);

// TODO: this is a work-around for CAMEL-15833
awssqsClient.createQueue(queueName);
awssqsClient.getOrCreateQueue(queueName);
}

@AfterEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@

public class AWSCloudWatchLocalContainerServiceWithTimeout extends AWSLocalContainerService {
public AWSCloudWatchLocalContainerServiceWithTimeout() {
super(new AWSContainerWithTimeout(System.getProperty("aws.container", "localstack/localstack:1.3.0"), new Service[]{Service.CLOUD_WATCH}));
super(new AWSContainerWithTimeout(System.getProperty("aws.container", "localstack/localstack:3.0.2"), new Service[]{Service.CLOUD_WATCH}));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@

public class AWSDynamodbLocalContainerServiceWithTimeout extends AWSLocalContainerService {
public AWSDynamodbLocalContainerServiceWithTimeout() {
super(new AWSContainerWithTimeout(System.getProperty("aws.container", "localstack/localstack:1.3.0"), new Service[]{Service.DYNAMODB}));
super(new AWSContainerWithTimeout(System.getProperty("aws.container", "localstack/localstack:3.0.2"), new Service[]{Service.DYNAMODB}));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@

public class AWSEC2LocalContainerServiceWithTimeout extends AWSLocalContainerService {
public AWSEC2LocalContainerServiceWithTimeout() {
super(new AWSContainerWithTimeout(System.getProperty("aws.container", "localstack/localstack:1.3.0"), new Service[]{Service.EC2}));
super(new AWSContainerWithTimeout(System.getProperty("aws.container", "localstack/localstack:3.0.2"), new Service[]{Service.EC2}));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@

public class AWSEventBridgeLocalContainerServiceWithTimeout extends AWSLocalContainerService {
public AWSEventBridgeLocalContainerServiceWithTimeout() {
super(new AWSContainerWithTimeout(System.getProperty("aws.container", "localstack/localstack:1.3.0"), new Service[]{Service.EVENT_BRIDGE}));
super(new AWSContainerWithTimeout(System.getProperty("aws.container", "localstack/localstack:3.0.2"), new Service[]{Service.EVENT_BRIDGE}));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@

public class AWSIAMLocalContainerServiceWithTimeout extends AWSLocalContainerService {
public AWSIAMLocalContainerServiceWithTimeout() {
super(new AWSContainerWithTimeout(System.getProperty("aws.container", "localstack/localstack:1.3.0"), new Service[]{Service.IAM}));
super(new AWSContainerWithTimeout(System.getProperty("aws.container", "localstack/localstack:3.0.2"), new Service[]{Service.IAM}));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@

public class AWSKMSLocalContainerServiceWithTimeout extends AWSLocalContainerService {
public AWSKMSLocalContainerServiceWithTimeout() {
super(new AWSContainerWithTimeout(System.getProperty("aws.container", "localstack/localstack:1.3.0"), new Service[]{Service.KMS}));
super(new AWSContainerWithTimeout(System.getProperty("aws.container", "localstack/localstack:3.0.2"), new Service[]{Service.KMS}));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class AWSKinesisLocalContainerServiceWithTimeout extends AWSLocalContaine
private static final Logger LOG = LoggerFactory.getLogger(AWSKinesisLocalContainerService.class);

public AWSKinesisLocalContainerServiceWithTimeout() {
super(new AWSContainerWithTimeout(System.getProperty("aws.container", "localstack/localstack:1.3.0"), new Service[]{Service.KINESIS}));
super(new AWSContainerWithTimeout(System.getProperty("aws.container", "localstack/localstack:3.0.2"), new Service[]{Service.KINESIS}));
}

static {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@

public class AWSLambdaLocalContainerServiceWithTimeout extends AWSLocalContainerService {
public AWSLambdaLocalContainerServiceWithTimeout() {
super(new AWSContainerWithTimeout(System.getProperty("aws.container", "localstack/localstack:1.3.0"), new Service[]{Service.LAMBDA}));
super(new AWSContainerWithTimeout(System.getProperty("aws.container", "localstack/localstack:1.4.0"), new Service[]{Service.LAMBDA}));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@

public class AWSS3LocalContainerServiceWithTimeout extends AWSLocalContainerService {
public AWSS3LocalContainerServiceWithTimeout() {
super(new AWSContainerWithTimeout(System.getProperty("aws.container", "localstack/localstack:1.3.0"), new Service[]{Service.S3}));
super(new AWSContainerWithTimeout(System.getProperty("aws.container", "localstack/localstack:3.0.2"), new Service[]{Service.S3}));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@

public class AWSSNSLocalContainerServiceWithTimeout extends AWSLocalContainerService {
public AWSSNSLocalContainerServiceWithTimeout() {
super(new AWSContainerWithTimeout(System.getProperty("aws.container", "localstack/localstack:1.3.0"), new Service[]{Service.SNS}));
super(new AWSContainerWithTimeout(System.getProperty("aws.container", "localstack/localstack:3.0.2"), new Service[]{Service.SNS}));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@

public class AWSSQSLocalContainerServiceWithTimeout extends AWSLocalContainerService {
public AWSSQSLocalContainerServiceWithTimeout() {
super(new AWSContainerWithTimeout(System.getProperty("aws.container", "localstack/localstack:1.3.0"), new Service[]{Service.SQS}));
super(new AWSContainerWithTimeout(System.getProperty("aws.container", "localstack/localstack:3.0.2"), new Service[]{Service.SQS}));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@

public class AWSSTSLocalContainerServiceWithTimeout extends AWSLocalContainerService {
public AWSSTSLocalContainerServiceWithTimeout() {
super(new AWSContainerWithTimeout(System.getProperty("aws.container", "localstack/localstack:1.3.0"), new Service[]{Service.STS}));
super(new AWSContainerWithTimeout(System.getProperty("aws.container", "localstack/localstack:3.0.2"), new Service[]{Service.STS}));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.camel.kafkaconnector.common.clients.kafka;

import java.util.Properties;
import java.util.UUID;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;

public class ByteArrayProducerPropertyFactory implements ProducerPropertyFactory {
private final String bootstrapServer;

/**
* Constructs the properties using the given bootstrap server
* @param bootstrapServer the address of the server in the format
* PLAINTEXT://${address}:${port}
*/
public ByteArrayProducerPropertyFactory(String bootstrapServer) {
this.bootstrapServer = bootstrapServer;
}

@Override
public Properties getProperties() {
Properties props = new Properties();

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getName());

return props;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.camel.test.infra.hdfs.v2.services.HDFSServiceFactory;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.junit.Ignore;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -45,6 +46,7 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

@Ignore("Waiting forhttps://issues.apache.org/jira/browse/CAMEL-20399 to be released i.e. camel 4.4.0")
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class CamelSinkHDFSITCase extends CamelSinkTestSupport {
@RegisterExtension
Expand Down
2 changes: 1 addition & 1 deletion tooling/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
<itf-jupiter-extension-version>0.9.0</itf-jupiter-extension-version>
<jakarta-jaxb-version>2.3.2</jakarta-jaxb-version>
<jandex-version>2.1.1.Final</jandex-version>
<mvel-version>2.4.12.Final</mvel-version>
<mvel-version>2.5.2.Final</mvel-version>
<roaster-version>2.20.1.Final</roaster-version>
</properties>

Expand Down

0 comments on commit e835377

Please sign in to comment.