From 9212fab50244f714f9c4d41e7010d07992744d5d Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Mon, 16 May 2022 18:01:19 +0200 Subject: [PATCH] Make the implicit subscription name unique per-namespace --- .../com/datastax/oss/pulsar/jms/PulsarQueue.java | 12 +++++++++++- .../oss/pulsar/jms/PulsarDestinationTest.java | 6 +++--- .../java/com/datastax/oss/pulsar/jms/QueueTest.java | 4 ++-- .../datastax/oss/pulsar/jms/SelectorsTestsBase.java | 10 ++++++++-- 4 files changed, 24 insertions(+), 8 deletions(-) diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarQueue.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarQueue.java index 8d1d0fa9..fcafa1c7 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarQueue.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarQueue.java @@ -64,8 +64,18 @@ public String extractSubscriptionName() { // queue:subscription // persistent://public/default/queue:subscription int slash = topicName.lastIndexOf("/"); + // we include the short topic name in the subscription name + // because in Pulsar subscription level permissions are namespace scoped + // and not topic-scoped, so it is better that the subscription name + // is unique in the scope of a namespace if (slash < 0 || slash < pos) { - return topicName.substring(pos + 1); + if (slash < 0) { + // queue:subscription + return topicName; + } else { + // persistent://public/default/queue:subscription + return topicName.substring(slash + 1); + } } return null; } diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PulsarDestinationTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PulsarDestinationTest.java index 6ebbcce1..35ab6464 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PulsarDestinationTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PulsarDestinationTest.java @@ -37,12 +37,12 @@ public void testExtractSubscriptionNameForQueue() { assertNull(topic.extractSubscriptionName()); topic = new PulsarQueue("test:sub"); - assertEquals("sub", topic.extractSubscriptionName()); + assertEquals("test:sub", topic.extractSubscriptionName()); topic = new PulsarQueue("test:sub"); - assertEquals("sub", topic.extractSubscriptionName()); + assertEquals("test:sub", topic.extractSubscriptionName()); topic = new PulsarQueue("persistent://public/default/test:sub"); - assertEquals("sub", topic.extractSubscriptionName()); + assertEquals("test:sub", topic.extractSubscriptionName()); } } diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/QueueTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/QueueTest.java index efa2b2aa..6f825a7a 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/QueueTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/QueueTest.java @@ -370,8 +370,8 @@ public void customSubscriptionName() throws Exception { cluster.getService().getAdminClient().topics().getStats(fullTopicName); log.info("Subscriptions {}", stats.getSubscriptions().keySet()); assertNotNull(stats.getSubscriptions().get("default-sub-name")); - assertNotNull(stats.getSubscriptions().get("sub1")); - assertNotNull(stats.getSubscriptions().get("sub2")); + assertNotNull(stats.getSubscriptions().get(shortTopicName + ":sub1")); + assertNotNull(stats.getSubscriptions().get(shortTopicName + ":sub2")); } } } diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/SelectorsTestsBase.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/SelectorsTestsBase.java index 5f541452..6c09d6f1 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/SelectorsTestsBase.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/SelectorsTestsBase.java @@ -16,6 +16,7 @@ package com.datastax.oss.pulsar.jms; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assumptions.assumeFalse; import static org.junit.jupiter.api.Assumptions.assumeTrue; @@ -41,6 +42,7 @@ import org.apache.pulsar.client.api.SubscriptionMode; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.BatchMessageIdImpl; +import org.apache.pulsar.common.policies.data.TopicStats; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -563,7 +565,7 @@ public void sendUsingExistingPulsarSubscriptionWithServerSideFilterForQueue() th // because it is always safe properties.put("jms.enableClientSideEmulation", "false"); - String topicName = "queue-with-sub-" + useServerSideFiltering + "_" + enableBatching; + String topicName = "sendUsingExistingPulsarSubscriptionWithServerSideFilterForQueue_" + enableBatching; cluster.getService().getAdminClient().topics().createNonPartitionedTopic(topicName); String subscriptionName = "the-sub"; @@ -579,7 +581,7 @@ public void sendUsingExistingPulsarSubscriptionWithServerSideFilterForQueue() th .getService() .getClient() .newConsumer() - .subscriptionName(subscriptionName) + .subscriptionName(topicName + ":" + subscriptionName) // real subscription name is short topic name + subname .subscriptionType(SubscriptionType.Shared) .subscriptionMode(SubscriptionMode.Durable) .subscriptionProperties(subscriptionProperties) @@ -628,6 +630,10 @@ public void sendUsingExistingPulsarSubscriptionWithServerSideFilterForQueue() th assertNull(consumer1.receiveNoWait()); } } + + // ensure subscription exists + TopicStats stats = cluster.getService().getAdminClient().topics().getStats(topicName); + assertNotNull(stats.getSubscriptions().get(topicName + ":" + subscriptionName)); } } }