Skip to content

Commit 3425830

Browse files
poorbarcodenikhil-ctds
authored andcommitted
[improve] [client]Add new ServiceUrlProvider implementation: SameAuthParamsAutoClusterFailover (apache#23129)
- excluding changes to pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java PR apache#23168 (cherry picked from commit 06a2f5c) (cherry picked from commit 286a5dc)
1 parent 1e2dacc commit 3425830

File tree

8 files changed

+548
-15
lines changed

8 files changed

+548
-15
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker;
20+
21+
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.CA_CERT_FILE_PATH;
22+
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.getTlsFileForClient;
23+
import static org.apache.pulsar.client.impl.SameAuthParamsLookupAutoClusterFailover.PulsarServiceState;
24+
import io.netty.channel.EventLoopGroup;
25+
import java.net.ServerSocket;
26+
import java.util.HashMap;
27+
import java.util.Map;
28+
import java.util.concurrent.CompletableFuture;
29+
import java.util.concurrent.TimeUnit;
30+
import org.apache.pulsar.broker.service.NetworkErrorTestBase;
31+
import org.apache.pulsar.broker.service.OneWayReplicatorTestBase;
32+
import org.apache.pulsar.client.api.ClientBuilder;
33+
import org.apache.pulsar.client.api.Producer;
34+
import org.apache.pulsar.client.api.PulsarClient;
35+
import org.apache.pulsar.client.api.Schema;
36+
import org.apache.pulsar.client.impl.SameAuthParamsLookupAutoClusterFailover;
37+
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
38+
import org.awaitility.Awaitility;
39+
import org.awaitility.reflect.WhiteboxImpl;
40+
import org.testng.Assert;
41+
import org.testng.annotations.AfterMethod;
42+
import org.testng.annotations.DataProvider;
43+
import org.testng.annotations.Test;
44+
45+
public class SameAuthParamsLookupAutoClusterFailoverTest extends OneWayReplicatorTestBase {
46+
47+
public void setup() throws Exception {
48+
super.setup();
49+
}
50+
51+
@Override
52+
@AfterMethod(alwaysRun = true, timeOut = 300000)
53+
public void cleanup() throws Exception {
54+
super.cleanup();
55+
}
56+
57+
@DataProvider(name = "enabledTls")
58+
public Object[][] enabledTls () {
59+
return new Object[][] {
60+
{true},
61+
{false}
62+
};
63+
}
64+
65+
@Test(dataProvider = "enabledTls", timeOut = 240 * 1000)
66+
public void testAutoClusterFailover(boolean enabledTls) throws Exception {
67+
// Start clusters.
68+
setup();
69+
ServerSocket dummyServer = new ServerSocket(NetworkErrorTestBase.getOneFreePort());
70+
71+
// Initialize client.
72+
String urlProxy = enabledTls ? "pulsar+tls://127.0.0.1:" + dummyServer.getLocalPort()
73+
: "pulsar://127.0.0.1:" + dummyServer.getLocalPort();
74+
String url1 = enabledTls ? pulsar1.getBrokerServiceUrlTls() : pulsar1.getBrokerServiceUrl();
75+
String url2 = enabledTls ? pulsar2.getBrokerServiceUrlTls() : pulsar2.getBrokerServiceUrl();
76+
final String[] urlArray = new String[]{url1, urlProxy, url2};
77+
final SameAuthParamsLookupAutoClusterFailover failover = SameAuthParamsLookupAutoClusterFailover.builder()
78+
.pulsarServiceUrlArray(urlArray)
79+
.failoverThreshold(5)
80+
.recoverThreshold(5)
81+
.checkHealthyIntervalMs(300)
82+
.testTopic("a/b/c")
83+
.markTopicNotFoundAsAvailable(true)
84+
.build();
85+
ClientBuilder clientBuilder = PulsarClient.builder().serviceUrlProvider(failover);
86+
if (enabledTls) {
87+
Map<String, String> authParams = new HashMap<>();
88+
authParams.put("tlsCertFile", getTlsFileForClient("admin.cert"));
89+
authParams.put("tlsKeyFile", getTlsFileForClient("admin.key-pk8"));
90+
clientBuilder.authentication(AuthenticationTls.class.getName(), authParams)
91+
.enableTls(true)
92+
.allowTlsInsecureConnection(false)
93+
.tlsTrustCertsFilePath(CA_CERT_FILE_PATH);
94+
}
95+
final PulsarClient client = clientBuilder.build();
96+
failover.initialize(client);
97+
final EventLoopGroup executor = WhiteboxImpl.getInternalState(failover, "executor");
98+
final PulsarServiceState[] stateArray =
99+
WhiteboxImpl.getInternalState(failover, "pulsarServiceStateArray");
100+
101+
// Test all things is fine.
102+
final String tp = BrokerTestUtil.newUniqueName(nonReplicatedNamespace + "/tp");
103+
final Producer<String> producer = client.newProducer(Schema.STRING).topic(tp).create();
104+
producer.send("0");
105+
Assert.assertEquals(failover.getCurrentPulsarServiceIndex(), 0);
106+
107+
CompletableFuture<Boolean> checkStatesFuture1 = new CompletableFuture<>();
108+
executor.submit(() -> {
109+
boolean res = stateArray[0] == PulsarServiceState.Healthy;
110+
res = res & stateArray[1] == PulsarServiceState.Healthy;
111+
res = res & stateArray[2] == PulsarServiceState.Healthy;
112+
checkStatesFuture1.complete(res);
113+
});
114+
Assert.assertTrue(checkStatesFuture1.join());
115+
116+
// Test failover 0 --> 3.
117+
pulsar1.close();
118+
Awaitility.await().atMost(60, TimeUnit.SECONDS).untilAsserted(() -> {
119+
CompletableFuture<Boolean> checkStatesFuture2 = new CompletableFuture<>();
120+
executor.submit(() -> {
121+
boolean res = stateArray[0] == PulsarServiceState.Failed;
122+
res = res & stateArray[1] == PulsarServiceState.Failed;
123+
res = res & stateArray[2] == PulsarServiceState.Healthy;
124+
checkStatesFuture2.complete(res);
125+
});
126+
Assert.assertTrue(checkStatesFuture2.join());
127+
producer.send("0->2");
128+
Assert.assertEquals(failover.getCurrentPulsarServiceIndex(), 2);
129+
});
130+
131+
// Test recover 2 --> 1.
132+
executor.execute(() -> {
133+
urlArray[1] = url2;
134+
});
135+
Awaitility.await().atMost(60, TimeUnit.SECONDS).untilAsserted(() -> {
136+
CompletableFuture<Boolean> checkStatesFuture3 = new CompletableFuture<>();
137+
executor.submit(() -> {
138+
boolean res = stateArray[0] == PulsarServiceState.Failed;
139+
res = res & stateArray[1] == PulsarServiceState.Healthy;
140+
res = res & stateArray[2] == PulsarServiceState.Healthy;
141+
checkStatesFuture3.complete(res);
142+
});
143+
Assert.assertTrue(checkStatesFuture3.join());
144+
producer.send("2->1");
145+
Assert.assertEquals(failover.getCurrentPulsarServiceIndex(), 1);
146+
});
147+
148+
// Test recover 1 --> 0.
149+
executor.execute(() -> {
150+
urlArray[0] = url2;
151+
});
152+
Awaitility.await().atMost(60, TimeUnit.SECONDS).untilAsserted(() -> {
153+
CompletableFuture<Boolean> checkStatesFuture4 = new CompletableFuture<>();
154+
executor.submit(() -> {
155+
boolean res = stateArray[0] == PulsarServiceState.Healthy;
156+
res = res & stateArray[1] == PulsarServiceState.Healthy;
157+
res = res & stateArray[2] == PulsarServiceState.Healthy;
158+
checkStatesFuture4.complete(res);
159+
});
160+
Assert.assertTrue(checkStatesFuture4.join());
161+
producer.send("1->0");
162+
Assert.assertEquals(failover.getCurrentPulsarServiceIndex(), 0);
163+
});
164+
165+
// cleanup.
166+
producer.close();
167+
client.close();
168+
dummyServer.close();
169+
}
170+
171+
@Override
172+
protected void cleanupPulsarResources() {
173+
// Nothing to do.
174+
}
175+
176+
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@
8080
public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport {
8181
// All certificate-authority files are copied from the tests/certificate-authority directory and all share the same
8282
// root CA.
83-
protected static String getTlsFileForClient(String name) {
83+
public static String getTlsFileForClient(String name) {
8484
return ResourceUtils.getAbsolutePath(String.format("certificate-authority/client-keys/%s.pem", name));
8585
}
8686
public final static String CA_CERT_FILE_PATH =

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NetworkErrorTestBase.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ protected void startBrokers() throws Exception {
102102
log.info("broker-1: {}, broker-2: {}", broker1.getListenPort(), broker2.getListenPort());
103103
}
104104

105-
protected int getOneFreePort() throws IOException {
105+
public static int getOneFreePort() throws IOException {
106106
ServerSocket serverSocket = new ServerSocket(0);
107107
int port = serverSocket.getLocalPort();
108108
serverSocket.close();

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java

+19-2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
*/
1919
package org.apache.pulsar.broker.service;
2020

21+
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.BROKER_CERT_FILE_PATH;
22+
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.BROKER_KEY_FILE_PATH;
23+
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.CA_CERT_FILE_PATH;
2124
import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
2225
import static org.testng.Assert.assertFalse;
2326
import static org.testng.Assert.assertTrue;
@@ -267,10 +270,18 @@ protected void setConfigDefaults(ServiceConfiguration config, String clusterName
267270
config.setReplicatedSubscriptionsSnapshotFrequencyMillis(1000);
268271
config.setLoadBalancerSheddingEnabled(false);
269272
config.setForceDeleteNamespaceAllowed(true);
273+
config.setTlsCertificateFilePath(BROKER_CERT_FILE_PATH);
274+
config.setTlsKeyFilePath(BROKER_KEY_FILE_PATH);
275+
config.setTlsTrustCertsFilePath(CA_CERT_FILE_PATH);
276+
config.setClusterName(clusterName);
277+
config.setTlsRequireTrustedClientCertOnConnect(false);
278+
Set<String> tlsProtocols = Sets.newConcurrentHashSet();
279+
tlsProtocols.add("TLSv1.3");
280+
tlsProtocols.add("TLSv1.2");
281+
config.setTlsProtocols(tlsProtocols);
270282
}
271283

272-
@Override
273-
protected void cleanup() throws Exception {
284+
protected void cleanupPulsarResources() throws Exception {
274285
// delete namespaces.
275286
waitChangeEventsInit(replicatedNamespace);
276287
admin1.namespaces().setNamespaceReplicationClusters(replicatedNamespace, Sets.newHashSet(cluster1));
@@ -283,6 +294,12 @@ protected void cleanup() throws Exception {
283294
admin2.namespaces().deleteNamespace(replicatedNamespace, true);
284295
admin2.namespaces().deleteNamespace(nonReplicatedNamespace, true);
285296
}
297+
}
298+
299+
@Override
300+
protected void cleanup() throws Exception {
301+
// cleanup pulsar resources.
302+
cleanupPulsarResources();
286303

287304
// shutdown.
288305
markCurrentSetupNumberCleaned();

pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ServiceUrlProvider.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public interface ServiceUrlProvider extends AutoCloseable {
5656
*
5757
*/
5858
@Override
59-
default void close() {
59+
default void close() throws Exception {
6060
// do nothing
6161
}
6262
}

0 commit comments

Comments
 (0)