diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 26a00c00b5a6a..dc1cf913ab240 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -29,7 +29,6 @@ import static org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.ignoreUnrecoverableBKException; import static org.apache.pulsar.common.api.proto.ProtocolVersion.v5; import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH; -import static org.apache.pulsar.common.protocol.Commands.newCloseConsumer; import static org.apache.pulsar.common.protocol.Commands.newLookupErrorResponse; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; @@ -71,6 +70,7 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.TransactionMetadataStoreService; @@ -82,6 +82,7 @@ import org.apache.pulsar.broker.limiter.ConnectionController; import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl; import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData; +import org.apache.pulsar.broker.namespace.LookupOptions; import org.apache.pulsar.broker.resources.NamespaceResources; import org.apache.pulsar.broker.resources.TopicResources; import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException; @@ -149,6 +150,7 @@ import org.apache.pulsar.common.compression.CompressionCodec; import org.apache.pulsar.common.compression.CompressionCodecProvider; import org.apache.pulsar.common.intercept.InterceptException; +import org.apache.pulsar.common.lookup.data.LookupData; import org.apache.pulsar.common.naming.Metadata; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.SystemTopicNames; @@ -3186,15 +3188,28 @@ public void closeProducer(Producer producer, Optional assigned closeProducer(producer.getProducerId(), producer.getEpoch(), assignedBrokerLookupData); } + private LookupData getLookupData(BrokerLookupData lookupData) { + LookupOptions.LookupOptionsBuilder builder = LookupOptions.builder(); + if (StringUtils.isNotBlank((listenerName))) { + builder.advertisedListenerName(listenerName); + } + try { + return lookupData.toLookupResult(builder.build()).getLookupData(); + } catch (PulsarServerException e) { + log.error("Failed to get lookup data", e); + throw new RuntimeException(e); + } + } + private void closeProducer(long producerId, long epoch, Optional assignedBrokerLookupData) { if (getRemoteEndpointProtocolVersion() >= v5.getValue()) { - if (assignedBrokerLookupData.isPresent()) { - writeAndFlush(Commands.newCloseProducer(producerId, -1L, - assignedBrokerLookupData.get().pulsarServiceUrl(), - assignedBrokerLookupData.get().pulsarServiceUrlTls())); - } else { - writeAndFlush(Commands.newCloseProducer(producerId, -1L)); - } + assignedBrokerLookupData.ifPresentOrElse(lookup -> { + LookupData lookupData = getLookupData(lookup); + writeAndFlush(Commands.newCloseProducer(producerId, -1L, + lookupData.getBrokerUrl(), + lookupData.getBrokerUrlTls())); + }, + () -> writeAndFlush(Commands.newCloseProducer(producerId, -1L))); // The client does not necessarily know that the producer is closed, but the connection is still // active, and there could be messages in flight already. We want to ignore these messages for a time @@ -3220,9 +3235,13 @@ public void closeConsumer(Consumer consumer, Optional assigned private void closeConsumer(long consumerId, Optional assignedBrokerLookupData) { if (getRemoteEndpointProtocolVersion() >= v5.getValue()) { - writeAndFlush(newCloseConsumer(consumerId, -1L, - assignedBrokerLookupData.map(BrokerLookupData::pulsarServiceUrl).orElse(null), - assignedBrokerLookupData.map(BrokerLookupData::pulsarServiceUrlTls).orElse(null))); + assignedBrokerLookupData.ifPresentOrElse(lookup -> { + LookupData lookupData = getLookupData(lookup); + writeAndFlush(Commands.newCloseConsumer(consumerId, -1L, + lookupData.getBrokerUrl(), + lookupData.getBrokerUrlTls())); + }, + () -> writeAndFlush(Commands.newCloseConsumer(consumerId, -1L, null, null))); } else { close(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java index 32b7c5027281e..cce16061506a1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java @@ -21,7 +21,6 @@ import static org.mockito.Mockito.reset; import static org.mockito.Mockito.spy; import com.google.common.collect.Sets; - import java.util.Optional; import java.util.concurrent.CompletableFuture; import org.apache.commons.lang3.reflect.FieldUtils; @@ -65,7 +64,14 @@ protected ExtensibleLoadManagerImplBaseTest(String defaultTestNamespace) { this.defaultTestNamespace = defaultTestNamespace; } - protected ServiceConfiguration initConfig(ServiceConfiguration conf) { + @Override + protected void doInitConf() throws Exception { + super.doInitConf(); + updateConfig(conf); + } + + + protected ServiceConfiguration updateConfig(ServiceConfiguration conf) { conf.setForceDeleteNamespaceAllowed(true); conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName()); conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName()); @@ -79,10 +85,9 @@ protected ServiceConfiguration initConfig(ServiceConfiguration conf) { @Override @BeforeClass(alwaysRun = true) protected void setup() throws Exception { - initConfig(conf); super.internalSetup(conf); pulsar1 = pulsar; - var conf2 = initConfig(getDefaultConf()); + var conf2 = updateConfig(getDefaultConf()); additionalPulsarTestContext = createAdditionalPulsarTestContext(conf2); pulsar2 = additionalPulsarTestContext.getPulsarService(); @@ -147,7 +152,7 @@ private void setSecondaryLoadManager() throws IllegalAccessException { FieldUtils.readField(secondaryLoadManager, "serviceUnitStateChannel", true); } - protected CompletableFuture getBundleAsync(PulsarService pulsar, TopicName topic) { + protected static CompletableFuture getBundleAsync(PulsarService pulsar, TopicName topic) { return pulsar.getNamespaceService().getBundleAsync(topic); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index 43c50a8ac54f4..4a9b80c798f86 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -111,6 +111,7 @@ import org.apache.pulsar.broker.namespace.NamespaceEphemeralData; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.BrokerServiceException; +import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; @@ -434,6 +435,19 @@ public Object[][] isPersistentTopicSubscriptionTypeTest() { @Test(timeOut = 30_000, dataProvider = "isPersistentTopicSubscriptionTypeTest") public void testTransferClientReconnectionWithoutLookup(TopicDomain topicDomain, SubscriptionType subscriptionType) throws Exception { + testTransferClientReconnectionWithoutLookup(topicDomain, subscriptionType, defaultTestNamespace, admin, + lookupUrl.toString(), pulsar1, pulsar2, primaryLoadManager, secondaryLoadManager); + } + + @Test(enabled = false) + public static void testTransferClientReconnectionWithoutLookup(TopicDomain topicDomain, + SubscriptionType subscriptionType, + String defaultTestNamespace, + PulsarAdmin admin, String brokerServiceUrl, + PulsarService pulsar1, PulsarService pulsar2, + ExtensibleLoadManager primaryLoadManager, + ExtensibleLoadManager secondaryLoadManager) + throws Exception { var id = String.format("test-tx-client-reconnect-%s-%s", subscriptionType, UUID.randomUUID()); var topic = String.format("%s://%s/%s", topicDomain.toString(), defaultTestNamespace, id); var topicName = TopicName.get(topic); @@ -443,7 +457,8 @@ public void testTransferClientReconnectionWithoutLookup(TopicDomain topicDomain, var consumers = new ArrayList>(); try { var lookups = new ArrayList(); - + var pulsarClient = pulsarClient(brokerServiceUrl, 0); + clients.add(pulsarClient); @Cleanup var producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create(); lookups.add(spyLookupService(pulsarClient)); @@ -451,7 +466,7 @@ public void testTransferClientReconnectionWithoutLookup(TopicDomain topicDomain, var consumerCount = subscriptionType == SubscriptionType.Exclusive ? 1 : 3; for (int i = 0; i < consumerCount; i++) { - var client = newPulsarClient(lookupUrl.toString(), 0); + var client = pulsarClient(brokerServiceUrl, 0); clients.add(client); var consumer = client.newConsumer(Schema.STRING). subscriptionName(id). @@ -478,7 +493,7 @@ public void testTransferClientReconnectionWithoutLookup(TopicDomain topicDomain, dstBrokerUrl = pulsar1.getBrokerId(); dstBrokerServiceUrl = pulsar1.getBrokerServiceUrl(); } - checkOwnershipState(broker, bundle); + checkOwnershipState(broker, bundle, primaryLoadManager, secondaryLoadManager, pulsar1); var messageCountBeforeUnloading = 100; var messageCountAfterUnloading = 100; @@ -572,6 +587,17 @@ public void testTransferClientReconnectionWithoutLookup(TopicDomain topicDomain, @Test(timeOut = 30 * 1000, dataProvider = "isPersistentTopicSubscriptionTypeTest") public void testUnloadClientReconnectionWithLookup(TopicDomain topicDomain, SubscriptionType subscriptionType) throws Exception { + testUnloadClientReconnectionWithLookup(topicDomain, subscriptionType, defaultTestNamespace, admin, + lookupUrl.toString(), pulsar1); + } + + @Test(enabled = false) + public static void testUnloadClientReconnectionWithLookup(TopicDomain topicDomain, + SubscriptionType subscriptionType, + String defaultTestNamespace, + PulsarAdmin admin, + String brokerServiceUrl, + PulsarService pulsar1) throws Exception { var id = String.format("test-unload-%s-client-reconnect-%s-%s", topicDomain, subscriptionType, UUID.randomUUID()); var topic = String.format("%s://%s/%s", topicDomain, defaultTestNamespace, id); @@ -580,6 +606,7 @@ public void testUnloadClientReconnectionWithLookup(TopicDomain topicDomain, var consumers = new ArrayList>(); try { @Cleanup + var pulsarClient = pulsarClient(brokerServiceUrl, 0); var producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create(); var consumerCount = subscriptionType == SubscriptionType.Exclusive ? 1 : 3; @@ -651,6 +678,16 @@ public Object[][] isPersistentTopicTest() { @Test(timeOut = 30 * 1000, dataProvider = "isPersistentTopicTest") public void testOptimizeUnloadDisable(TopicDomain topicDomain) throws Exception { + testOptimizeUnloadDisable(topicDomain, defaultTestNamespace, admin, lookupUrl.toString(), pulsar1, pulsar2); + } + + @Test(enabled = false) + public static void testOptimizeUnloadDisable(TopicDomain topicDomain, + String defaultTestNamespace, + PulsarAdmin admin, + String brokerServiceUrl, + PulsarService pulsar1, + PulsarService pulsar2) throws Exception { var id = String.format("test-optimize-unload-disable-%s-%s", topicDomain, UUID.randomUUID()); var topic = String.format("%s://%s/%s", topicDomain, defaultTestNamespace, id); var topicName = TopicName.get(topic); @@ -658,6 +695,9 @@ public void testOptimizeUnloadDisable(TopicDomain topicDomain) throws Exception pulsar1.getConfig().setLoadBalancerMultiPhaseBundleUnload(false); pulsar2.getConfig().setLoadBalancerMultiPhaseBundleUnload(false); + @Cleanup + var pulsarClient = pulsarClient(brokerServiceUrl, 0); + @Cleanup var producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create(); @@ -719,13 +759,16 @@ public void testOptimizeUnloadDisable(TopicDomain topicDomain) throws Exception verify(lookup, times(2)).getBroker(topicName); } - private LookupService spyLookupService(PulsarClient client) throws IllegalAccessException { + protected static LookupService spyLookupService(PulsarClient client) throws IllegalAccessException { LookupService svc = (LookupService) FieldUtils.readDeclaredField(client, "lookup", true); var lookup = spy(svc); FieldUtils.writeDeclaredField(client, "lookup", lookup, true); return lookup; } - private void checkOwnershipState(String broker, NamespaceBundle bundle) + + protected static void checkOwnershipState(String broker, NamespaceBundle bundle, + ExtensibleLoadManager primaryLoadManager, + ExtensibleLoadManager secondaryLoadManager, PulsarService pulsar1) throws ExecutionException, InterruptedException { var targetLoadManager = secondaryLoadManager; var otherLoadManager = primaryLoadManager; @@ -737,6 +780,11 @@ private void checkOwnershipState(String broker, NamespaceBundle bundle) assertFalse(otherLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get()); } + protected void checkOwnershipState(String broker, NamespaceBundle bundle) + throws ExecutionException, InterruptedException { + checkOwnershipState(broker, bundle, primaryLoadManager, secondaryLoadManager, pulsar1); + } + @Test(timeOut = 30 * 1000) public void testSplitBundleAdminAPI() throws Exception { final String namespace = "public/testSplitBundleAdminAPI"; @@ -1745,4 +1793,11 @@ public String name() { } + protected static PulsarClient pulsarClient(String url, int intervalInSecs) throws PulsarClientException { + return + PulsarClient.builder() + .serviceUrl(url) + .statsInterval(intervalInSecs, TimeUnit.SECONDS).build(); + } + } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithAdvertisedListenersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithAdvertisedListenersTest.java new file mode 100644 index 0000000000000..bec7d4d78fe7e --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithAdvertisedListenersTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions; + +import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort; +import java.util.Optional; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.common.naming.TopicDomain; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +/** + * Unit test for {@link ExtensibleLoadManagerImpl with AdvertisedListeners broker configs}. + */ +@Slf4j +@Test(groups = "flaky") +@SuppressWarnings("unchecked") +public class ExtensibleLoadManagerImplWithAdvertisedListenersTest extends ExtensibleLoadManagerImplBaseTest { + + public String brokerServiceUrl; + public ExtensibleLoadManagerImplWithAdvertisedListenersTest() { + super("public/test"); + } + + @Override + protected ServiceConfiguration updateConfig(ServiceConfiguration conf) { + super.updateConfig(conf); + int privatePulsarPort = nextLockedFreePort(); + int publicPulsarPort = nextLockedFreePort(); + conf.setInternalListenerName("internal"); + conf.setBindAddresses("external:pulsar://localhost:" + publicPulsarPort); + conf.setAdvertisedListeners( + "external:pulsar://localhost:" + publicPulsarPort + + ",internal:pulsar://localhost:" + privatePulsarPort); + conf.setWebServicePortTls(Optional.empty()); + conf.setBrokerServicePortTls(Optional.empty()); + conf.setBrokerServicePort(Optional.of(privatePulsarPort)); + conf.setWebServicePort(Optional.of(0)); + brokerServiceUrl = conf.getBindAddresses().replaceAll("external:", ""); + return conf; + } + + @DataProvider(name = "isPersistentTopicSubscriptionTypeTest") + public Object[][] isPersistentTopicSubscriptionTypeTest() { + return new Object[][]{ + {TopicDomain.non_persistent, SubscriptionType.Exclusive}, + {TopicDomain.persistent, SubscriptionType.Key_Shared} + }; + } + + @Test(timeOut = 30_000, dataProvider = "isPersistentTopicSubscriptionTypeTest") + public void testTransferClientReconnectionWithoutLookup(TopicDomain topicDomain, SubscriptionType subscriptionType) + throws Exception { + ExtensibleLoadManagerImplTest.testTransferClientReconnectionWithoutLookup(topicDomain, subscriptionType, + defaultTestNamespace, admin, + brokerServiceUrl, + pulsar1, pulsar2, primaryLoadManager, secondaryLoadManager); + } + + @Test(timeOut = 30 * 1000, dataProvider = "isPersistentTopicSubscriptionTypeTest") + public void testUnloadClientReconnectionWithLookup(TopicDomain topicDomain, + SubscriptionType subscriptionType) throws Exception { + ExtensibleLoadManagerImplTest.testUnloadClientReconnectionWithLookup(topicDomain, subscriptionType, + defaultTestNamespace, admin, + brokerServiceUrl, + pulsar1); + } + + @DataProvider(name = "isPersistentTopicTest") + public Object[][] isPersistentTopicTest() { + return new Object[][]{{TopicDomain.persistent}, {TopicDomain.non_persistent}}; + } + + @Test(timeOut = 30 * 1000, dataProvider = "isPersistentTopicTest") + public void testOptimizeUnloadDisable(TopicDomain topicDomain) throws Exception { + ExtensibleLoadManagerImplTest.testOptimizeUnloadDisable(topicDomain, defaultTestNamespace, admin, + brokerServiceUrl, pulsar1, pulsar2); + } + +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithTransactionCoordinatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithTransactionCoordinatorTest.java index 0c95dd85f28e0..ed99b502b7e29 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithTransactionCoordinatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithTransactionCoordinatorTest.java @@ -31,8 +31,8 @@ public ExtensibleLoadManagerImplWithTransactionCoordinatorTest() { } @Override - protected ServiceConfiguration initConfig(ServiceConfiguration conf) { - conf = super.initConfig(conf); + protected ServiceConfiguration updateConfig(ServiceConfiguration conf) { + conf = super.updateConfig(conf); conf.setTransactionCoordinatorEnabled(true); return conf; }