Skip to content

Commit e04a8eb

Browse files
poorbarcodenikhil-ctds
authored andcommitted
[fix][broker]Fix lookupService.getTopicsUnderNamespace can not work with a quote pattern (apache#23014)
(cherry picked from commit 7c0e827) (cherry picked from commit c0029d7)
1 parent 33cf127 commit e04a8eb

File tree

3 files changed

+128
-6
lines changed

3 files changed

+128
-6
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java

+56
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import com.google.common.collect.Lists;
2828

2929
import java.time.Duration;
30+
import java.util.ArrayList;
31+
import java.util.Collections;
3032
import java.util.List;
3133
import java.util.Optional;
3234
import java.util.concurrent.CompletableFuture;
@@ -48,6 +50,7 @@
4850
import org.apache.pulsar.client.api.Schema;
4951
import org.apache.pulsar.client.api.SubscriptionType;
5052
import org.apache.pulsar.common.api.proto.BaseCommand;
53+
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
5154
import org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess;
5255
import org.apache.pulsar.common.naming.NamespaceName;
5356
import org.apache.pulsar.common.naming.TopicName;
@@ -1113,4 +1116,57 @@ public void testTopicDeletion() throws Exception {
11131116
assertEquals(pulsar.getBrokerService().getTopicIfExists(baseTopicName + "-1").join(), Optional.empty());
11141117
assertTrue(pulsar.getBrokerService().getTopicIfExists(baseTopicName + "-2").join().isPresent());
11151118
}
1119+
1120+
@Test(dataProvider = "partitioned")
1121+
public void testPatternQuote(boolean partitioned) throws Exception {
1122+
final NamespaceName namespace = NamespaceName.get("public/default");
1123+
final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
1124+
final PulsarClientImpl client = (PulsarClientImpl) pulsarClient;
1125+
final LookupService lookup = client.getLookup();
1126+
List<String> expectedRes = new ArrayList<>();
1127+
if (partitioned) {
1128+
admin.topics().createPartitionedTopic(topicName, 2);
1129+
expectedRes.add(TopicName.get(topicName).getPartition(0).toString());
1130+
expectedRes.add(TopicName.get(topicName).getPartition(1).toString());
1131+
Collections.sort(expectedRes);
1132+
} else {
1133+
admin.topics().createNonPartitionedTopic(topicName);
1134+
expectedRes.add(topicName);
1135+
}
1136+
1137+
// Verify 1: "java.util.regex.Pattern.quote".
1138+
String pattern1 = java.util.regex.Pattern.quote(topicName);
1139+
List<String> res1 = lookup.getTopicsUnderNamespace(namespace, CommandGetTopicsOfNamespace.Mode.PERSISTENT,
1140+
pattern1, null).join().getNonPartitionedOrPartitionTopics();
1141+
Collections.sort(res1);
1142+
assertEquals(res1, expectedRes);
1143+
1144+
// Verify 2: "com.google.re2j.Pattern.quote"
1145+
String pattern2 = com.google.re2j.Pattern.quote(topicName);
1146+
List<String> res2 = lookup.getTopicsUnderNamespace(namespace, CommandGetTopicsOfNamespace.Mode.PERSISTENT,
1147+
pattern2, null).join().getNonPartitionedOrPartitionTopics();
1148+
Collections.sort(res2);
1149+
assertEquals(res2, expectedRes);
1150+
1151+
// Verify 3: "java.util.regex.Pattern.quote" & "^$"
1152+
String pattern3 = "^" + java.util.regex.Pattern.quote(topicName) + "$";
1153+
List<String> res3 = lookup.getTopicsUnderNamespace(namespace, CommandGetTopicsOfNamespace.Mode.PERSISTENT,
1154+
pattern3, null).join().getNonPartitionedOrPartitionTopics();
1155+
Collections.sort(res3);
1156+
assertEquals(res3, expectedRes);
1157+
1158+
// Verify 4: "com.google.re2j.Pattern.quote" & "^$"
1159+
String pattern4 = "^" + com.google.re2j.Pattern.quote(topicName) + "$";
1160+
List<String> res4 = lookup.getTopicsUnderNamespace(namespace, CommandGetTopicsOfNamespace.Mode.PERSISTENT,
1161+
pattern4, null).join().getNonPartitionedOrPartitionTopics();
1162+
Collections.sort(res4);
1163+
assertEquals(res4, expectedRes);
1164+
1165+
// cleanup.
1166+
if (partitioned) {
1167+
admin.topics().deletePartitionedTopic(topicName, false);
1168+
} else {
1169+
admin.topics().delete(topicName, false);
1170+
}
1171+
}
11161172
}

pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java

+15-5
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.pulsar.common.topics;
2020

21+
import com.google.common.annotations.VisibleForTesting;
2122
import com.google.common.hash.Hashing;
2223
import java.nio.charset.StandardCharsets;
2324
import java.util.Collection;
@@ -28,6 +29,7 @@
2829
import java.util.stream.Collectors;
2930
import lombok.experimental.UtilityClass;
3031
import org.apache.pulsar.common.naming.SystemTopicNames;
32+
import org.apache.pulsar.common.naming.TopicDomain;
3133
import org.apache.pulsar.common.naming.TopicName;
3234

3335
@UtilityClass
@@ -82,15 +84,23 @@ public static Set<String> minus(Collection<String> list1, Collection<String> lis
8284
return s1;
8385
}
8486

85-
private static String removeTopicDomainScheme(String originalRegexp) {
87+
@VisibleForTesting
88+
static String removeTopicDomainScheme(String originalRegexp) {
8689
if (!originalRegexp.toString().contains(SCHEME_SEPARATOR)) {
8790
return originalRegexp;
8891
}
89-
String removedTopicDomain = SCHEME_SEPARATOR_PATTERN.split(originalRegexp.toString())[1];
90-
if (originalRegexp.contains("^")) {
91-
return String.format("^%s", removedTopicDomain);
92+
String[] parts = SCHEME_SEPARATOR_PATTERN.split(originalRegexp.toString());
93+
String prefix = parts[0];
94+
String removedTopicDomain = parts[1];
95+
if (prefix.equals(TopicDomain.persistent.value()) || prefix.equals(TopicDomain.non_persistent.value())) {
96+
prefix = "";
97+
} else if (prefix.endsWith(TopicDomain.non_persistent.value())) {
98+
prefix = prefix.substring(0, prefix.length() - TopicDomain.non_persistent.value().length());
99+
} else if (prefix.endsWith(TopicDomain.persistent.value())){
100+
prefix = prefix.substring(0, prefix.length() - TopicDomain.persistent.value().length());
92101
} else {
93-
return removedTopicDomain;
102+
throw new IllegalArgumentException("Does not support topic domain: " + prefix);
94103
}
104+
return String.format("%s%s", prefix, removedTopicDomain);
95105
}
96106
}

pulsar-common/src/test/java/org/apache/pulsar/common/topics/TopicListTest.java

+57-1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import static org.testng.Assert.assertEquals;
3131
import static org.testng.Assert.assertNotEquals;
3232
import static org.testng.Assert.assertTrue;
33+
import static org.testng.Assert.fail;
3334

3435
public class TopicListTest {
3536

@@ -107,5 +108,60 @@ public void testCalculateHash() {
107108

108109
}
109110

110-
111+
@Test
112+
public void testRemoveTopicDomainScheme() {
113+
// persistent.
114+
final String tpName1 = "persistent://public/default/tp";
115+
String res1 = TopicList.removeTopicDomainScheme(tpName1);
116+
assertEquals(res1, "public/default/tp");
117+
118+
// non-persistent
119+
final String tpName2 = "non-persistent://public/default/tp";
120+
String res2 = TopicList.removeTopicDomainScheme(tpName2);
121+
assertEquals(res2, "public/default/tp");
122+
123+
// without topic domain.
124+
final String tpName3 = "public/default/tp";
125+
String res3 = TopicList.removeTopicDomainScheme(tpName3);
126+
assertEquals(res3, "public/default/tp");
127+
128+
// persistent & "java.util.regex.Pattern.quote".
129+
final String tpName4 = java.util.regex.Pattern.quote(tpName1);
130+
String res4 = TopicList.removeTopicDomainScheme(tpName4);
131+
assertEquals(res4, java.util.regex.Pattern.quote("public/default/tp"));
132+
133+
// persistent & "java.util.regex.Pattern.quote" & "^$".
134+
final String tpName5 = "^" + java.util.regex.Pattern.quote(tpName1) + "$";
135+
String res5 = TopicList.removeTopicDomainScheme(tpName5);
136+
assertEquals(res5, "^" + java.util.regex.Pattern.quote("public/default/tp") + "$");
137+
138+
// persistent & "com.google.re2j.Pattern.quote".
139+
final String tpName6 = Pattern.quote(tpName1);
140+
String res6 = TopicList.removeTopicDomainScheme(tpName6);
141+
assertEquals(res6, Pattern.quote("public/default/tp"));
142+
143+
// non-persistent & "java.util.regex.Pattern.quote".
144+
final String tpName7 = java.util.regex.Pattern.quote(tpName2);
145+
String res7 = TopicList.removeTopicDomainScheme(tpName7);
146+
assertEquals(res7, java.util.regex.Pattern.quote("public/default/tp"));
147+
148+
// non-persistent & "com.google.re2j.Pattern.quote".
149+
final String tpName8 = Pattern.quote(tpName2);
150+
String res8 = TopicList.removeTopicDomainScheme(tpName8);
151+
assertEquals(res8, Pattern.quote("public/default/tp"));
152+
153+
// non-persistent & "com.google.re2j.Pattern.quote" & "^$".
154+
final String tpName9 = "^" + Pattern.quote(tpName2) + "$";
155+
String res9 = TopicList.removeTopicDomainScheme(tpName9);
156+
assertEquals(res9, "^" + Pattern.quote("public/default/tp") + "$");
157+
158+
// wrong topic domain.
159+
final String tpName10 = "xx://public/default/tp";
160+
try {
161+
TopicList.removeTopicDomainScheme(tpName10);
162+
fail("Does not support the topic domain xx");
163+
} catch (Exception ex) {
164+
// expected error.
165+
}
166+
}
111167
}

0 commit comments

Comments
 (0)