Skip to content

Commit

Permalink
[Improve][Kafka] Support custom topic for debezium compatible format (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
hailin0 authored Nov 29, 2024
1 parent 29ca928 commit deefe87
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ sink {
plugin_input = "table1"

bootstrap.servers = "localhost:9092"
topic = "${topic}"

# compatible_debezium_json options
format = compatible_debezium_json
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ sink {
plugin_input = "table1"

bootstrap.servers = "localhost:9092"
topic = "${topic}"

# compatible_debezium_json options
format = compatible_debezium_json
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ private static Function<SeaTunnelRow, Iterable<Header>> headersExtractor() {

private static Function<SeaTunnelRow, String> topicExtractor(
String topic, SeaTunnelRowType rowType, MessageFormat format) {
if (MessageFormat.COMPATIBLE_DEBEZIUM_JSON.equals(format)) {
if (MessageFormat.COMPATIBLE_DEBEZIUM_JSON.equals(format) && topic == null) {
int topicFieldIndex =
rowType.indexOf(CompatibleDebeziumJsonDeserializationSchema.FIELD_TOPIC);
return row -> row.getField(topicFieldIndex).toString();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.kafka.serialize;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat;
import org.apache.seatunnel.format.compatible.debezium.json.CompatibleDebeziumJsonDeserializationSchema;

import org.apache.kafka.clients.producer.ProducerRecord;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.Collections;

public class DefaultSeaTunnelRowSerializerTest {

@Test
public void testCustomTopic() {
String topic = null;
SeaTunnelRowType rowType =
CompatibleDebeziumJsonDeserializationSchema.DEBEZIUM_DATA_ROW_TYPE;
MessageFormat format = MessageFormat.COMPATIBLE_DEBEZIUM_JSON;
String delimiter = null;
ReadonlyConfig pluginConfig = ReadonlyConfig.fromMap(Collections.emptyMap());

DefaultSeaTunnelRowSerializer serializer =
DefaultSeaTunnelRowSerializer.create(
topic, rowType, format, delimiter, pluginConfig);
ProducerRecord<byte[], byte[]> record =
serializer.serializeRow(
new SeaTunnelRow(new Object[] {"test.database1.table1", "key1", "value1"}));

Assertions.assertEquals("test.database1.table1", record.topic());
Assertions.assertEquals("key1", new String(record.key()));
Assertions.assertEquals("value1", new String(record.value()));

topic = "test_topic";
serializer =
DefaultSeaTunnelRowSerializer.create(
topic, rowType, format, delimiter, pluginConfig);
record =
serializer.serializeRow(
new SeaTunnelRow(new Object[] {"test.database1.table1", "key1", "value1"}));

Assertions.assertEquals("test_topic", record.topic());
Assertions.assertEquals("key1", new String(record.key()));
Assertions.assertEquals("value1", new String(record.value()));
}
}

0 comments on commit deefe87

Please sign in to comment.