From 99026962d486de19688074e7b8fe57d5ebec5439 Mon Sep 17 00:00:00 2001 From: Andrey Yegorov Date: Fri, 24 Mar 2023 15:40:58 -0700 Subject: [PATCH 1/2] KCA: option to collapse partitioned topics --- pulsar-io/kafka-connect-adaptor/pom.xml | 7 ++ .../io/kafka/connect/KafkaConnectSink.java | 19 +++- .../connect/PulsarKafkaConnectSinkConfig.java | 6 ++ .../kafka/connect/KafkaConnectSinkTest.java | 88 +++++++++++++++++++ 4 files changed, 118 insertions(+), 2 deletions(-) diff --git a/pulsar-io/kafka-connect-adaptor/pom.xml b/pulsar-io/kafka-connect-adaptor/pom.xml index 5e192be95ecf4..12ebda087eb3d 100644 --- a/pulsar-io/kafka-connect-adaptor/pom.xml +++ b/pulsar-io/kafka-connect-adaptor/pom.xml @@ -39,6 +39,13 @@ provided + + ${project.groupId} + pulsar-common + ${project.version} + compile + + com.fasterxml.jackson.core jackson-databind diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java index 06f66f60380d9..763996c39cf9d 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java @@ -54,6 +54,7 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.schema.GenericObject; import org.apache.pulsar.client.api.schema.KeyValueSchema; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.functions.api.Record; @@ -91,6 +92,8 @@ public class KafkaConnectSink implements Sink { protected String topicName; private boolean sanitizeTopicName = false; + private boolean collapsePartitionedTopics = false; + private final Cache sanitizedTopicCache = CacheBuilder.newBuilder().maximumSize(1000) .expireAfterAccess(30, TimeUnit.MINUTES).build(); @@ -159,6 +162,7 @@ public void open(Map config, SinkContext ctx) throws Exception { topicName = kafkaSinkConfig.getTopic(); unwrapKeyValueIfAvailable = kafkaSinkConfig.isUnwrapKeyValueIfAvailable(); sanitizeTopicName = kafkaSinkConfig.isSanitizeTopicName(); + collapsePartitionedTopics = kafkaSinkConfig.isCollapsePartitionedTopics(); useIndexAsOffset = kafkaSinkConfig.isUseIndexAsOffset(); maxBatchBitsForOffset = kafkaSinkConfig.getMaxBatchBitsForOffset(); @@ -417,8 +421,19 @@ static BatchMessageSequenceRef getMessageSequenceRefForBatchMessage(MessageId me @SuppressWarnings("rawtypes") protected SinkRecord toSinkRecord(Record sourceRecord) { - final int partition = sourceRecord.getPartitionIndex().orElse(0); - final String topic = sanitizeNameIfNeeded(sourceRecord.getTopicName().orElse(topicName), sanitizeTopicName); + final int partition; + final String topic; + + if (collapsePartitionedTopics + && sourceRecord.getTopicName().isPresent() + && TopicName.get(sourceRecord.getTopicName().get()).isPartitioned()) { + TopicName tn = TopicName.get(sourceRecord.getTopicName().get()); + partition = tn.getPartitionIndex(); + topic = sanitizeNameIfNeeded(tn.getPartitionedTopicName(), sanitizeTopicName); + } else { + partition = sourceRecord.getPartitionIndex().orElse(0); + topic = sanitizeNameIfNeeded(sourceRecord.getTopicName().orElse(topicName), sanitizeTopicName); + } final Object key; final Object value; final Schema keySchema; diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java index 19dd784578915..decd8b0f22013 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java @@ -94,6 +94,12 @@ public class PulsarKafkaConnectSinkConfig implements Serializable { + "In some cases it may result in topic name collisions (topic_a and topic.a will become the same)") private boolean sanitizeTopicName = false; + @FieldDoc( + defaultValue = "false", + help = "Supply kafka record with topic name without -partition- suffix for partitioned topics. \n" + + "Thi si sa workaround for https://github.com/apache/pulsar/issues/19922") + private boolean collapsePartitionedTopics = false; + public static PulsarKafkaConnectSinkConfig load(String yamlFile) throws IOException { ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); return mapper.readValue(new File(yamlFile), PulsarKafkaConnectSinkConfig.class); diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java index e9d454ed2fd5a..6ccfa3b71a2f7 100644 --- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java +++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java @@ -1571,6 +1571,94 @@ public void testGetMessageSequenceRefForBatchMessage() throws Exception { assertEquals(ref.getBatchIdx(), batchIdx); } + @Test + public void collapsePartitionedTopicEnabledTest() throws Exception { + testCollapsePartitionedTopic(true, + "persistent://a/b/fake-topic-partition-0", + "persistent://a/b/fake-topic", + 0); + + testCollapsePartitionedTopic(true, + "persistent://a/b/fake-topic-partition-1", + "persistent://a/b/fake-topic", + 1); + + testCollapsePartitionedTopic(true, + "persistent://a/b/fake-topic", + "persistent://a/b/fake-topic", + 0); + + testCollapsePartitionedTopic(true, + "fake-topic-partition-5", + "persistent://public/default/fake-topic", + 5); + } + + @Test + public void collapsePartitionedTopicDisabledTest() throws Exception { + testCollapsePartitionedTopic(false, + "persistent://a/b/fake-topic-partition-0", + "persistent://a/b/fake-topic-partition-0", + 0); + + testCollapsePartitionedTopic(false, + "persistent://a/b/fake-topic-partition-1", + "persistent://a/b/fake-topic-partition-1", + 0); + + testCollapsePartitionedTopic(false, + "persistent://a/b/fake-topic", + "persistent://a/b/fake-topic", + 0); + + testCollapsePartitionedTopic(false, + "fake-topic-partition-5", + "fake-topic-partition-5", + 0); + } + + private void testCollapsePartitionedTopic(boolean isEnabled, + String pulsarTopic, + String expectedKafkaTopic, + int expectedPartition) throws Exception { + props.put("kafkaConnectorSinkClass", SchemaedFileStreamSinkConnector.class.getCanonicalName()); + props.put("collapsePartitionedTopics", Boolean.toString(isEnabled)); + + KafkaConnectSink sink = new KafkaConnectSink(); + sink.open(props, context); + + AvroSchema pulsarAvroSchema + = AvroSchema.of(PulsarSchemaToKafkaSchemaTest.StructWithAnnotations.class); + + final GenericData.Record obj = new GenericData.Record(pulsarAvroSchema.getAvroSchema()); + obj.put("field1", (byte) 10); + obj.put("field2", "test"); + obj.put("field3", (short) 100); + + final GenericRecord rec = getGenericRecord(obj, pulsarAvroSchema); + Message msg = mock(MessageImpl.class); + when(msg.getValue()).thenReturn(rec); + when(msg.getKey()).thenReturn("key"); + when(msg.hasKey()).thenReturn(true); + when(msg.getMessageId()).thenReturn(new MessageIdImpl(1, 0, 0)); + + final AtomicInteger status = new AtomicInteger(0); + Record record = PulsarRecord.builder() + .topicName(pulsarTopic) + .message(msg) + .schema(pulsarAvroSchema) + .ackFunction(status::incrementAndGet) + .failFunction(status::decrementAndGet) + .build(); + + SinkRecord sinkRecord = sink.toSinkRecord(record); + + Assert.assertEquals(sinkRecord.topic(), expectedKafkaTopic); + Assert.assertEquals(sinkRecord.kafkaPartition(), expectedPartition); + + sink.close(); + } + @SneakyThrows private java.util.Date getDateFromString(String dateInString) { SimpleDateFormat formatter = new SimpleDateFormat("dd/MM/yyyy hh:mm:ss"); From c90ef9d6d6534af56f008d279e9b0ed42c7a6c72 Mon Sep 17 00:00:00 2001 From: Andrey Yegorov Date: Mon, 27 Mar 2023 17:21:15 -0700 Subject: [PATCH 2/2] CR feedback --- .../org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java | 1 + .../pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java | 3 +-- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java index 763996c39cf9d..475724cc4e545 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java @@ -92,6 +92,7 @@ public class KafkaConnectSink implements Sink { protected String topicName; private boolean sanitizeTopicName = false; + // Thi is a workaround for https://github.com/apache/pulsar/issues/19922 private boolean collapsePartitionedTopics = false; private final Cache sanitizedTopicCache = diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java index decd8b0f22013..2525081a41ebb 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java @@ -96,8 +96,7 @@ public class PulsarKafkaConnectSinkConfig implements Serializable { @FieldDoc( defaultValue = "false", - help = "Supply kafka record with topic name without -partition- suffix for partitioned topics. \n" - + "Thi si sa workaround for https://github.com/apache/pulsar/issues/19922") + help = "Supply kafka record with topic name without -partition- suffix for partitioned topics.") private boolean collapsePartitionedTopics = false; public static PulsarKafkaConnectSinkConfig load(String yamlFile) throws IOException {