Skip to content

Commit

Permalink
Make the implicit subscription name unique per-namespace
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli committed May 16, 2022
1 parent 9c1128c commit 9212fab
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,18 @@ public String extractSubscriptionName() {
// queue:subscription
// persistent://public/default/queue:subscription
int slash = topicName.lastIndexOf("/");
// we include the short topic name in the subscription name
// because in Pulsar subscription level permissions are namespace scoped
// and not topic-scoped, so it is better that the subscription name
// is unique in the scope of a namespace
if (slash < 0 || slash < pos) {
return topicName.substring(pos + 1);
if (slash < 0) {
// queue:subscription
return topicName;
} else {
// persistent://public/default/queue:subscription
return topicName.substring(slash + 1);
}
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ public void testExtractSubscriptionNameForQueue() {
assertNull(topic.extractSubscriptionName());

topic = new PulsarQueue("test:sub");
assertEquals("sub", topic.extractSubscriptionName());
assertEquals("test:sub", topic.extractSubscriptionName());

topic = new PulsarQueue("test:sub");
assertEquals("sub", topic.extractSubscriptionName());
assertEquals("test:sub", topic.extractSubscriptionName());

topic = new PulsarQueue("persistent://public/default/test:sub");
assertEquals("sub", topic.extractSubscriptionName());
assertEquals("test:sub", topic.extractSubscriptionName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -370,8 +370,8 @@ public void customSubscriptionName() throws Exception {
cluster.getService().getAdminClient().topics().getStats(fullTopicName);
log.info("Subscriptions {}", stats.getSubscriptions().keySet());
assertNotNull(stats.getSubscriptions().get("default-sub-name"));
assertNotNull(stats.getSubscriptions().get("sub1"));
assertNotNull(stats.getSubscriptions().get("sub2"));
assertNotNull(stats.getSubscriptions().get(shortTopicName + ":sub1"));
assertNotNull(stats.getSubscriptions().get(shortTopicName + ":sub2"));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.datastax.oss.pulsar.jms;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assumptions.assumeFalse;
import static org.junit.jupiter.api.Assumptions.assumeTrue;
Expand All @@ -41,6 +42,7 @@
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -563,7 +565,7 @@ public void sendUsingExistingPulsarSubscriptionWithServerSideFilterForQueue() th
// because it is always safe
properties.put("jms.enableClientSideEmulation", "false");

String topicName = "queue-with-sub-" + useServerSideFiltering + "_" + enableBatching;
String topicName = "sendUsingExistingPulsarSubscriptionWithServerSideFilterForQueue_" + enableBatching;
cluster.getService().getAdminClient().topics().createNonPartitionedTopic(topicName);

String subscriptionName = "the-sub";
Expand All @@ -579,7 +581,7 @@ public void sendUsingExistingPulsarSubscriptionWithServerSideFilterForQueue() th
.getService()
.getClient()
.newConsumer()
.subscriptionName(subscriptionName)
.subscriptionName(topicName + ":" + subscriptionName) // real subscription name is short topic name + subname
.subscriptionType(SubscriptionType.Shared)
.subscriptionMode(SubscriptionMode.Durable)
.subscriptionProperties(subscriptionProperties)
Expand Down Expand Up @@ -628,6 +630,10 @@ public void sendUsingExistingPulsarSubscriptionWithServerSideFilterForQueue() th
assertNull(consumer1.receiveNoWait());
}
}

// ensure subscription exists
TopicStats stats = cluster.getService().getAdminClient().topics().getStats(topicName);
assertNotNull(stats.getSubscriptions().get(topicName + ":" + subscriptionName));
}
}
}
Expand Down

0 comments on commit 9212fab

Please sign in to comment.