diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java index 719642edf132c..a8cb38b94804a 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java @@ -96,6 +96,12 @@ public class KafkaConnectSink implements Sink { CacheBuilder.newBuilder().maximumSize(1000) .expireAfterAccess(30, TimeUnit.MINUTES).build(); + // Can't really safely expire these entries. If we do, we could end up with + // a sanitized topic name that used in e.g. resume() after a long pause but can't be + // // re-resolved into a form usable for Pulsar. + private final Cache desanitizedTopicCache = + CacheBuilder.newBuilder().build(); + private int maxBatchBitsForOffset = 12; private boolean useIndexAsOffset = true; @@ -185,7 +191,18 @@ public void open(Map config, SinkContext ctx) throws Exception { }); task = (SinkTask) taskClass.getConstructor().newInstance(); taskContext = - new PulsarKafkaSinkTaskContext(configs.get(0), ctx, task::open); + new PulsarKafkaSinkTaskContext(configs.get(0), ctx, task::open, kafkaName -> { + if (sanitizeTopicName) { + String pulsarTopicName = desanitizedTopicCache.getIfPresent(kafkaName); + if (log.isDebugEnabled()) { + log.debug("desanitizedTopicCache got: kafkaName: {}, pulsarTopicName: {}", + kafkaName, pulsarTopicName); + } + return pulsarTopicName != null ? pulsarTopicName : kafkaName; + } else { + return kafkaName; + } + }); task.initialize(taskContext); task.start(configs.get(0)); @@ -487,6 +504,9 @@ protected String sanitizeNameIfNeeded(String name, boolean sanitize) { if (sanitizedName.matches("^[^a-zA-Z_].*")) { sanitizedName = "_" + sanitizedName; } + // do this once, sanitize() can be called on already sanitized name + // so avoid replacing with (sanitizedName -> sanitizedName). + desanitizedTopicCache.get(sanitizedName, () -> name); return sanitizedName; }); } catch (ExecutionException e) { diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java index 8c4639a237392..c95af0363a6ee 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java @@ -34,6 +34,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; +import java.util.function.Function; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; @@ -50,6 +51,7 @@ public class PulsarKafkaSinkTaskContext implements SinkTaskContext { private final SinkContext ctx; private final OffsetBackingStore offsetStore; + private Function desanitizeTopicName; private final String topicNamespace; private final Consumer> onPartitionChange; private final AtomicBoolean runRepartition = new AtomicBoolean(false); @@ -58,11 +60,13 @@ public class PulsarKafkaSinkTaskContext implements SinkTaskContext { public PulsarKafkaSinkTaskContext(Map config, SinkContext ctx, - Consumer> onPartitionChange) { + Consumer> onPartitionChange, + Function desanitizeTopicName) { this.config = config; this.ctx = ctx; offsetStore = new PulsarOffsetBackingStore(ctx.getPulsarClient()); + this.desanitizeTopicName = desanitizeTopicName; PulsarKafkaWorkerConfig pulsarKafkaWorkerConfig = new PulsarKafkaWorkerConfig(config); offsetStore.configure(pulsarKafkaWorkerConfig); offsetStore.start(); @@ -145,7 +149,9 @@ private void fillOffsetMap(Map offsetMap, TopicPartition private void seekAndUpdateOffset(TopicPartition topicPartition, long offset) { try { - ctx.seek(topicPartition.topic(), topicPartition.partition(), MessageIdUtils.getMessageId(offset)); + ctx.seek(desanitizeTopicName.apply(topicPartition.topic()), + topicPartition.partition(), + MessageIdUtils.getMessageId(offset)); } catch (PulsarClientException e) { log.error("Failed to seek topic {} partition {} offset {}", topicPartition.topic(), topicPartition.partition(), offset, e); @@ -203,7 +209,7 @@ public Set assignment() { public void pause(TopicPartition... topicPartitions) { for (TopicPartition tp: topicPartitions) { try { - ctx.pause(tp.topic(), tp.partition()); + ctx.pause(desanitizeTopicName.apply(tp.topic()), tp.partition()); } catch (PulsarClientException e) { log.error("Failed to pause topic {} partition {}", tp.topic(), tp.partition(), e); throw new RuntimeException("Failed to pause topic " + tp.topic() + " partition " + tp.partition(), e); @@ -215,7 +221,7 @@ public void pause(TopicPartition... topicPartitions) { public void resume(TopicPartition... topicPartitions) { for (TopicPartition tp: topicPartitions) { try { - ctx.resume(tp.topic(), tp.partition()); + ctx.resume(desanitizeTopicName.apply(tp.topic()), tp.partition()); } catch (PulsarClientException e) { log.error("Failed to resume topic {} partition {}", tp.topic(), tp.partition(), e); throw new RuntimeException("Failed to resume topic " + tp.topic() + " partition " + tp.partition(), e); diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java index dddf89f445b7f..2fa53de6b8d71 100644 --- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java +++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java @@ -314,6 +314,50 @@ public void seekPauseResumeTest() throws Exception { sink.close(); } + @Test + public void seekPauseResumeWithSanitizeTest() throws Exception { + KafkaConnectSink sink = new KafkaConnectSink(); + props.put("sanitizeTopicName", "true"); + sink.open(props, context); + + String pulsarTopicName = "persistent://a-b/c-d/fake-topic.a"; + + final GenericRecord rec = getGenericRecord("value", Schema.STRING); + Message msg = mock(MessageImpl.class); + when(msg.getValue()).thenReturn(rec); + final MessageId msgId = new MessageIdImpl(10, 10, 0); + when(msg.getMessageId()).thenReturn(msgId); + + final AtomicInteger status = new AtomicInteger(0); + Record record = PulsarRecord.builder() + .topicName(pulsarTopicName) + .message(msg) + .ackFunction(status::incrementAndGet) + .failFunction(status::decrementAndGet) + .schema(Schema.STRING) + .build(); + + sink.write(record); + sink.flush(); + + assertEquals(status.get(), 1); + + final TopicPartition tp = new TopicPartition(sink.sanitizeNameIfNeeded(pulsarTopicName, true), 0); + assertNotEquals(MessageIdUtils.getOffset(msgId), 0); + assertEquals(sink.currentOffset(tp.topic(), tp.partition()), MessageIdUtils.getOffset(msgId)); + + sink.taskContext.offset(tp, 0); + verify(context, times(1)).seek(pulsarTopicName, + tp.partition(), MessageIdUtils.getMessageId(0)); + assertEquals(sink.currentOffset(tp.topic(), tp.partition()), 0); + + sink.taskContext.pause(tp); + verify(context, times(1)).pause(pulsarTopicName, tp.partition()); + sink.taskContext.resume(tp); + verify(context, times(1)).resume(pulsarTopicName, tp.partition()); + + sink.close(); + } @Test public void subscriptionTypeTest() throws Exception {