Skip to content

Commit

Permalink
[improve][io] KCA: option to collapse partitioned topics (#19923)
Browse files Browse the repository at this point in the history
(cherry picked from commit 7cb48fd)
  • Loading branch information
dlg99 committed Mar 29, 2023
1 parent c513482 commit b2f734f
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 2 deletions.
1 change: 1 addition & 0 deletions pulsar-io/kafka-connect-adaptor/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-common</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.client.api.schema.KeyValueSchema;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.functions.api.Record;
Expand Down Expand Up @@ -92,6 +93,9 @@ public class KafkaConnectSink implements Sink<GenericObject> {
protected String topicName;

private boolean sanitizeTopicName = false;
// Thi is a workaround for https://github.com/apache/pulsar/issues/19922
private boolean collapsePartitionedTopics = false;

private final Cache<String, String> sanitizedTopicCache =
CacheBuilder.newBuilder().maximumSize(1000)
.expireAfterAccess(30, TimeUnit.MINUTES).build();
Expand Down Expand Up @@ -160,6 +164,7 @@ public void open(Map<String, Object> config, SinkContext ctx) throws Exception {
topicName = kafkaSinkConfig.getTopic();
unwrapKeyValueIfAvailable = kafkaSinkConfig.isUnwrapKeyValueIfAvailable();
sanitizeTopicName = kafkaSinkConfig.isSanitizeTopicName();
collapsePartitionedTopics = kafkaSinkConfig.isCollapsePartitionedTopics();

useIndexAsOffset = kafkaSinkConfig.isUseIndexAsOffset();
maxBatchBitsForOffset = kafkaSinkConfig.getMaxBatchBitsForOffset();
Expand Down Expand Up @@ -418,8 +423,19 @@ static BatchMessageSequenceRef getMessageSequenceRefForBatchMessage(MessageId me

@SuppressWarnings("rawtypes")
protected SinkRecord toSinkRecord(Record<GenericObject> sourceRecord) {
final int partition = sourceRecord.getPartitionIndex().orElse(0);
final String topic = sanitizeNameIfNeeded(sourceRecord.getTopicName().orElse(topicName), sanitizeTopicName);
final int partition;
final String topic;

if (collapsePartitionedTopics
&& sourceRecord.getTopicName().isPresent()
&& TopicName.get(sourceRecord.getTopicName().get()).isPartitioned()) {
TopicName tn = TopicName.get(sourceRecord.getTopicName().get());
partition = tn.getPartitionIndex();
topic = sanitizeNameIfNeeded(tn.getPartitionedTopicName(), sanitizeTopicName);
} else {
partition = sourceRecord.getPartitionIndex().orElse(0);
topic = sanitizeNameIfNeeded(sourceRecord.getTopicName().orElse(topicName), sanitizeTopicName);
}
final Object key;
final Object value;
final Schema keySchema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ public class PulsarKafkaConnectSinkConfig implements Serializable {
+ "In some cases it may result in topic name collisions (topic_a and topic.a will become the same)")
private boolean sanitizeTopicName = false;

@FieldDoc(
defaultValue = "false",
help = "Supply kafka record with topic name without -partition- suffix for partitioned topics.")
private boolean collapsePartitionedTopics = false;

public static PulsarKafkaConnectSinkConfig load(String yamlFile) throws IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
return mapper.readValue(new File(yamlFile), PulsarKafkaConnectSinkConfig.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1567,6 +1567,94 @@ public void testGetMessageSequenceRefForBatchMessage() throws Exception {
assertEquals(ref.getBatchIdx(), batchIdx);
}

@Test
public void collapsePartitionedTopicEnabledTest() throws Exception {
testCollapsePartitionedTopic(true,
"persistent://a/b/fake-topic-partition-0",
"persistent://a/b/fake-topic",
0);

testCollapsePartitionedTopic(true,
"persistent://a/b/fake-topic-partition-1",
"persistent://a/b/fake-topic",
1);

testCollapsePartitionedTopic(true,
"persistent://a/b/fake-topic",
"persistent://a/b/fake-topic",
0);

testCollapsePartitionedTopic(true,
"fake-topic-partition-5",
"persistent://public/default/fake-topic",
5);
}

@Test
public void collapsePartitionedTopicDisabledTest() throws Exception {
testCollapsePartitionedTopic(false,
"persistent://a/b/fake-topic-partition-0",
"persistent://a/b/fake-topic-partition-0",
0);

testCollapsePartitionedTopic(false,
"persistent://a/b/fake-topic-partition-1",
"persistent://a/b/fake-topic-partition-1",
0);

testCollapsePartitionedTopic(false,
"persistent://a/b/fake-topic",
"persistent://a/b/fake-topic",
0);

testCollapsePartitionedTopic(false,
"fake-topic-partition-5",
"fake-topic-partition-5",
0);
}

private void testCollapsePartitionedTopic(boolean isEnabled,
String pulsarTopic,
String expectedKafkaTopic,
int expectedPartition) throws Exception {
props.put("kafkaConnectorSinkClass", SchemaedFileStreamSinkConnector.class.getCanonicalName());
props.put("collapsePartitionedTopics", Boolean.toString(isEnabled));

KafkaConnectSink sink = new KafkaConnectSink();
sink.open(props, context);

AvroSchema<PulsarSchemaToKafkaSchemaTest.StructWithAnnotations> pulsarAvroSchema
= AvroSchema.of(PulsarSchemaToKafkaSchemaTest.StructWithAnnotations.class);

final GenericData.Record obj = new GenericData.Record(pulsarAvroSchema.getAvroSchema());
obj.put("field1", (byte) 10);
obj.put("field2", "test");
obj.put("field3", (short) 100);

final GenericRecord rec = getGenericRecord(obj, pulsarAvroSchema);
Message msg = mock(MessageImpl.class);
when(msg.getValue()).thenReturn(rec);
when(msg.getKey()).thenReturn("key");
when(msg.hasKey()).thenReturn(true);
when(msg.getMessageId()).thenReturn(new MessageIdImpl(1, 0, 0));

final AtomicInteger status = new AtomicInteger(0);
Record<GenericObject> record = PulsarRecord.<String>builder()
.topicName(pulsarTopic)
.message(msg)
.schema(pulsarAvroSchema)
.ackFunction(status::incrementAndGet)
.failFunction(status::decrementAndGet)
.build();

SinkRecord sinkRecord = sink.toSinkRecord(record);

Assert.assertEquals(sinkRecord.topic(), expectedKafkaTopic);
Assert.assertEquals((int)sinkRecord.kafkaPartition(), expectedPartition);

sink.close();
}

@SneakyThrows
private java.util.Date getDateFromString(String dateInString) {
SimpleDateFormat formatter = new SimpleDateFormat("dd/MM/yyyy hh:mm:ss");
Expand Down

0 comments on commit b2f734f

Please sign in to comment.