Skip to content

Commit

Permalink
[fix][io] KCA: 'desanitize' topic name for the pulsar's ctx calls (ap…
Browse files Browse the repository at this point in the history
…ache#19756)

(cherry picked from commit d4930a3)
  • Loading branch information
dlg99 authored and nicoloboschi committed Mar 9, 2023
1 parent c8e1d1a commit bbfaa1e
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ public class KafkaConnectSink implements Sink<GenericObject> {
CacheBuilder.newBuilder().maximumSize(1000)
.expireAfterAccess(30, TimeUnit.MINUTES).build();

// Can't really safely expire these entries. If we do, we could end up with
// a sanitized topic name that used in e.g. resume() after a long pause but can't be
// // re-resolved into a form usable for Pulsar.
private final Cache<String, String> desanitizedTopicCache =
CacheBuilder.newBuilder().build();

private int maxBatchBitsForOffset = 12;
private boolean useIndexAsOffset = true;

Expand Down Expand Up @@ -185,7 +191,18 @@ public void open(Map<String, Object> config, SinkContext ctx) throws Exception {
});
task = (SinkTask) taskClass.getConstructor().newInstance();
taskContext =
new PulsarKafkaSinkTaskContext(configs.get(0), ctx, task::open);
new PulsarKafkaSinkTaskContext(configs.get(0), ctx, task::open, kafkaName -> {
if (sanitizeTopicName) {
String pulsarTopicName = desanitizedTopicCache.getIfPresent(kafkaName);
if (log.isDebugEnabled()) {
log.debug("desanitizedTopicCache got: kafkaName: {}, pulsarTopicName: {}",
kafkaName, pulsarTopicName);
}
return pulsarTopicName != null ? pulsarTopicName : kafkaName;
} else {
return kafkaName;
}
});
task.initialize(taskContext);
task.start(configs.get(0));

Expand Down Expand Up @@ -487,6 +504,9 @@ protected String sanitizeNameIfNeeded(String name, boolean sanitize) {
if (sanitizedName.matches("^[^a-zA-Z_].*")) {
sanitizedName = "_" + sanitizedName;
}
// do this once, sanitize() can be called on already sanitized name
// so avoid replacing with (sanitizedName -> sanitizedName).
desanitizedTopicCache.get(sanitizedName, () -> name);
return sanitizedName;
});
} catch (ExecutionException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
Expand All @@ -50,6 +51,7 @@ public class PulsarKafkaSinkTaskContext implements SinkTaskContext {
private final SinkContext ctx;

private final OffsetBackingStore offsetStore;
private Function<String, String> desanitizeTopicName;
private final String topicNamespace;
private final Consumer<Collection<TopicPartition>> onPartitionChange;
private final AtomicBoolean runRepartition = new AtomicBoolean(false);
Expand All @@ -58,11 +60,13 @@ public class PulsarKafkaSinkTaskContext implements SinkTaskContext {

public PulsarKafkaSinkTaskContext(Map<String, String> config,
SinkContext ctx,
Consumer<Collection<TopicPartition>> onPartitionChange) {
Consumer<Collection<TopicPartition>> onPartitionChange,
Function<String, String> desanitizeTopicName) {
this.config = config;
this.ctx = ctx;

offsetStore = new PulsarOffsetBackingStore(ctx.getPulsarClient());
this.desanitizeTopicName = desanitizeTopicName;
PulsarKafkaWorkerConfig pulsarKafkaWorkerConfig = new PulsarKafkaWorkerConfig(config);
offsetStore.configure(pulsarKafkaWorkerConfig);
offsetStore.start();
Expand Down Expand Up @@ -145,7 +149,9 @@ private void fillOffsetMap(Map<ByteBuffer, ByteBuffer> offsetMap, TopicPartition

private void seekAndUpdateOffset(TopicPartition topicPartition, long offset) {
try {
ctx.seek(topicPartition.topic(), topicPartition.partition(), MessageIdUtils.getMessageId(offset));
ctx.seek(desanitizeTopicName.apply(topicPartition.topic()),
topicPartition.partition(),
MessageIdUtils.getMessageId(offset));
} catch (PulsarClientException e) {
log.error("Failed to seek topic {} partition {} offset {}",
topicPartition.topic(), topicPartition.partition(), offset, e);
Expand Down Expand Up @@ -203,7 +209,7 @@ public Set<TopicPartition> assignment() {
public void pause(TopicPartition... topicPartitions) {
for (TopicPartition tp: topicPartitions) {
try {
ctx.pause(tp.topic(), tp.partition());
ctx.pause(desanitizeTopicName.apply(tp.topic()), tp.partition());
} catch (PulsarClientException e) {
log.error("Failed to pause topic {} partition {}", tp.topic(), tp.partition(), e);
throw new RuntimeException("Failed to pause topic " + tp.topic() + " partition " + tp.partition(), e);
Expand All @@ -215,7 +221,7 @@ public void pause(TopicPartition... topicPartitions) {
public void resume(TopicPartition... topicPartitions) {
for (TopicPartition tp: topicPartitions) {
try {
ctx.resume(tp.topic(), tp.partition());
ctx.resume(desanitizeTopicName.apply(tp.topic()), tp.partition());
} catch (PulsarClientException e) {
log.error("Failed to resume topic {} partition {}", tp.topic(), tp.partition(), e);
throw new RuntimeException("Failed to resume topic " + tp.topic() + " partition " + tp.partition(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,50 @@ public void seekPauseResumeTest() throws Exception {
sink.close();
}

@Test
public void seekPauseResumeWithSanitizeTest() throws Exception {
KafkaConnectSink sink = new KafkaConnectSink();
props.put("sanitizeTopicName", "true");
sink.open(props, context);

String pulsarTopicName = "persistent://a-b/c-d/fake-topic.a";

final GenericRecord rec = getGenericRecord("value", Schema.STRING);
Message msg = mock(MessageImpl.class);
when(msg.getValue()).thenReturn(rec);
final MessageId msgId = new MessageIdImpl(10, 10, 0);
when(msg.getMessageId()).thenReturn(msgId);

final AtomicInteger status = new AtomicInteger(0);
Record<GenericObject> record = PulsarRecord.<String>builder()
.topicName(pulsarTopicName)
.message(msg)
.ackFunction(status::incrementAndGet)
.failFunction(status::decrementAndGet)
.schema(Schema.STRING)
.build();

sink.write(record);
sink.flush();

assertEquals(status.get(), 1);

final TopicPartition tp = new TopicPartition(sink.sanitizeNameIfNeeded(pulsarTopicName, true), 0);
assertNotEquals(MessageIdUtils.getOffset(msgId), 0);
assertEquals(sink.currentOffset(tp.topic(), tp.partition()), MessageIdUtils.getOffset(msgId));

sink.taskContext.offset(tp, 0);
verify(context, times(1)).seek(pulsarTopicName,
tp.partition(), MessageIdUtils.getMessageId(0));
assertEquals(sink.currentOffset(tp.topic(), tp.partition()), 0);

sink.taskContext.pause(tp);
verify(context, times(1)).pause(pulsarTopicName, tp.partition());
sink.taskContext.resume(tp);
verify(context, times(1)).resume(pulsarTopicName, tp.partition());

sink.close();
}

@Test
public void subscriptionTypeTest() throws Exception {
Expand Down

0 comments on commit bbfaa1e

Please sign in to comment.