Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] DLQ to handle bytes key properly #19

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,66 @@ public void testDeadLetterTopicWithMessageKey() throws Exception {
consumer.close();
}

@Test
public void testDeadLetterTopicWithBinaryMessageKey() throws Exception {
final String topic = "persistent://my-property/my-ns/dead-letter-topic";

final int maxRedeliveryCount = 1;

final int sendMessages = 100;

Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic(topic)
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(1, TimeUnit.SECONDS)
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build())
.receiverQueueSize(100)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();

@Cleanup
PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
Consumer<byte[]> deadLetterConsumer = newPulsarClient.newConsumer(Schema.BYTES)
.topic("persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ")
.subscriptionName("my-subscription")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();

Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic(topic)
.create();

byte[] key = new byte[]{1, 2, 3, 4};
for (int i = 0; i < sendMessages; i++) {
producer.newMessage()
.keyBytes(key)
.value(String.format("Hello Pulsar [%d]", i).getBytes())
.send();
}

producer.close();

int totalReceived = 0;
do {
Message<byte[]> message = consumer.receive();
log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
totalReceived++;
} while (totalReceived < sendMessages * (maxRedeliveryCount + 1));

int totalInDeadLetter = 0;
do {
Message message = deadLetterConsumer.receive();
assertEquals(message.getKeyBytes(), key);
log.info("dead letter consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
deadLetterConsumer.acknowledge(message);
totalInDeadLetter++;
} while (totalInDeadLetter < sendMessages);

deadLetterConsumer.close();
consumer.close();
}

public void testDeadLetterTopicWithProducerName() throws Exception {
final String topic = "persistent://my-property/my-ns/dead-letter-topic";
final String subscription = "my-subscription";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
private final AtomicReference<ClientCnx> clientCnxUsedForConsumerRegistration = new AtomicReference<>();
private final List<Throwable> previousExceptions = new CopyOnWriteArrayList<Throwable>();
private volatile boolean hasSoughtByTimestamp = false;

static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client,
String topic,
ConsumerConfigurationData<T> conf,
Expand Down Expand Up @@ -280,10 +281,12 @@ static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client,
}

protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
ExecutorProvider executorProvider, int partitionIndex, boolean hasParentConsumer,
boolean parentConsumerHasListener, CompletableFuture<Consumer<T>> subscribeFuture, MessageId startMessageId,
long startMessageRollbackDurationInSec, Schema<T> schema, ConsumerInterceptors<T> interceptors,
boolean createTopicIfDoesNotExist) {
ExecutorProvider executorProvider, int partitionIndex, boolean hasParentConsumer,
boolean parentConsumerHasListener, CompletableFuture<Consumer<T>> subscribeFuture,
MessageId startMessageId,
long startMessageRollbackDurationInSec, Schema<T> schema,
ConsumerInterceptors<T> interceptors,
boolean createTopicIfDoesNotExist) {
super(client, topic, conf, conf.getReceiverQueueSize(), executorProvider, subscribeFuture, schema,
interceptors);
this.consumerId = client.newConsumerId();
Expand Down Expand Up @@ -355,21 +358,21 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
}

this.connectionHandler = new ConnectionHandler(this,
new BackoffBuilder()
.setInitialTime(client.getConfiguration().getInitialBackoffIntervalNanos(),
TimeUnit.NANOSECONDS)
.setMax(client.getConfiguration().getMaxBackoffIntervalNanos(), TimeUnit.NANOSECONDS)
.setMandatoryStop(0, TimeUnit.MILLISECONDS)
.create(),
new BackoffBuilder()
.setInitialTime(client.getConfiguration().getInitialBackoffIntervalNanos(),
TimeUnit.NANOSECONDS)
.setMax(client.getConfiguration().getMaxBackoffIntervalNanos(), TimeUnit.NANOSECONDS)
.setMandatoryStop(0, TimeUnit.MILLISECONDS)
.create(),
this);

this.topicName = TopicName.get(topic);
if (this.topicName.isPersistent()) {
this.acknowledgmentsGroupingTracker =
new PersistentAcknowledgmentsGroupingTracker(this, conf, client.eventLoopGroup());
new PersistentAcknowledgmentsGroupingTracker(this, conf, client.eventLoopGroup());
} else {
this.acknowledgmentsGroupingTracker =
NonPersistentAcknowledgmentGroupingTracker.of();
NonPersistentAcknowledgmentGroupingTracker.of();
}

if (conf.getDeadLetterPolicy() != null) {
Expand Down Expand Up @@ -470,16 +473,16 @@ public CompletableFuture<Void> unsubscribeAsync(boolean force) {
log.error("[{}][{}] Failed to unsubscribe: {}", topic, subscription, e.getCause().getMessage());
setState(State.Ready);
unsubscribeFuture.completeExceptionally(
PulsarClientException.wrap(e.getCause(),
String.format("Failed to unsubscribe the subscription %s of topic %s",
subscription, topicName.toString())));
PulsarClientException.wrap(e.getCause(),
String.format("Failed to unsubscribe the subscription %s of topic %s",
subscription, topicName.toString())));
return null;
});
} else {
unsubscribeFuture.completeExceptionally(
new PulsarClientException.NotConnectedException(
String.format("The client is not connected to the broker when unsubscribing the "
+ "subscription %s of the topic %s", subscription, topicName.toString())));
new PulsarClientException.NotConnectedException(
String.format("The client is not connected to the broker when unsubscribing the "
+ "subscription %s of the topic %s", subscription, topicName.toString())));
}
return unsubscribeFuture;
}
Expand Down Expand Up @@ -636,6 +639,15 @@ protected CompletableFuture<Void> doAcknowledge(List<MessageId> messageIdList, A
}
}

private static void copyMessageKeyIfNeeded(Message<?> message, TypedMessageBuilder<?> typedMessageBuilderNew) {
if (message.hasKey()) {
if (message.hasBase64EncodedKey()) {
typedMessageBuilderNew.keyBytes(message.getKeyBytes());
} else {
typedMessageBuilderNew.key(message.getKey());
}
}
}

@SuppressWarnings("unchecked")
@Override
Expand Down Expand Up @@ -720,9 +732,7 @@ protected CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType a
if (delayTime > 0) {
typedMessageBuilderNew.deliverAfter(delayTime, unit);
}
if (message.hasKey()) {
typedMessageBuilderNew.key(message.getKey());
}
copyMessageKeyIfNeeded(message, typedMessageBuilderNew);
typedMessageBuilderNew.sendAsync()
.thenCompose(__ -> doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null))
.thenAccept(v -> result.complete(null))
Expand Down Expand Up @@ -2186,9 +2196,7 @@ private CompletableFuture<Boolean> processPossibleToDLQ(MessageIdAdv messageId)
producerDLQ.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get()))
.value(message.getData())
.properties(getPropertiesMap(message, originMessageIdStr, originTopicNameStr));
if (message.hasKey()) {
typedMessageBuilderNew.key(message.getKey());
}
copyMessageKeyIfNeeded(message, typedMessageBuilderNew);
typedMessageBuilderNew.sendAsync()
.thenAccept(messageIdInDLQ -> {
possibleSendToDeadLetterTopicMessages.remove(messageId);
Expand Down
Loading