Skip to content

Commit 2ebbb7a

Browse files
equanznikhil-ctds
authored andcommitted
[improve][proxy] Reuse authentication instance in pulsar-proxy (apache#23113)
(cherry picked from commit 3e461c0) (cherry picked from commit b805a4a)
1 parent 3425830 commit 2ebbb7a

40 files changed

+570
-159
lines changed

pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,11 @@ void testAuthentication() throws Exception {
258258
proxyConfig.setForwardAuthorizationCredentials(true);
259259
AuthenticationService authenticationService = new AuthenticationService(
260260
PulsarConfigurationLoader.convertFrom(proxyConfig));
261-
ProxyService proxyService = new ProxyService(proxyConfig, authenticationService);
261+
@Cleanup
262+
final Authentication proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
263+
proxyConfig.getBrokerClientAuthenticationParameters());
264+
proxyClientAuthentication.start();
265+
ProxyService proxyService = new ProxyService(proxyConfig, authenticationService, proxyClientAuthentication);
262266

263267
proxyService.start();
264268
final String proxyServiceUrl = "pulsar://localhost:" + proxyService.getListenPort().get();

pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java

+5-18
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import java.util.Collections;
3030
import java.util.HashSet;
3131
import java.util.Iterator;
32-
import java.util.Objects;
3332
import java.util.Set;
3433
import java.util.concurrent.Executor;
3534
import javax.net.ssl.SSLContext;
@@ -40,7 +39,6 @@
4039
import org.apache.pulsar.broker.web.AuthenticationFilter;
4140
import org.apache.pulsar.client.api.Authentication;
4241
import org.apache.pulsar.client.api.AuthenticationDataProvider;
43-
import org.apache.pulsar.client.api.AuthenticationFactory;
4442
import org.apache.pulsar.client.api.KeyStoreParams;
4543
import org.apache.pulsar.client.api.PulsarClientException;
4644
import org.apache.pulsar.common.util.SecurityUtility;
@@ -87,12 +85,15 @@ class AdminProxyHandler extends ProxyServlet {
8785

8886
private final ProxyConfiguration config;
8987
private final BrokerDiscoveryProvider discoveryProvider;
88+
private final Authentication proxyClientAuthentication;
9089
private final String brokerWebServiceUrl;
9190
private final String functionWorkerWebServiceUrl;
9291

93-
AdminProxyHandler(ProxyConfiguration config, BrokerDiscoveryProvider discoveryProvider) {
92+
AdminProxyHandler(ProxyConfiguration config, BrokerDiscoveryProvider discoveryProvider,
93+
Authentication proxyClientAuthentication) {
9494
this.config = config;
9595
this.discoveryProvider = discoveryProvider;
96+
this.proxyClientAuthentication = proxyClientAuthentication;
9697
this.brokerWebServiceUrl = config.isTlsEnabledWithBroker() ? config.getBrokerWebServiceURLTLS()
9798
: config.getBrokerWebServiceURL();
9899
this.functionWorkerWebServiceUrl = config.isTlsEnabledWithBroker() ? config.getFunctionWorkerWebServiceURLTLS()
@@ -256,22 +257,13 @@ protected ContentProvider proxyRequestContent(HttpServletRequest request,
256257
@Override
257258
protected HttpClient newHttpClient() {
258259
try {
259-
Authentication auth = AuthenticationFactory.create(
260-
config.getBrokerClientAuthenticationPlugin(),
261-
config.getBrokerClientAuthenticationParameters()
262-
);
263-
264-
Objects.requireNonNull(auth, "No supported auth found for proxy");
265-
266-
auth.start();
267-
268260
if (config.isTlsEnabledWithBroker()) {
269261
try {
270262
X509Certificate[] trustCertificates = SecurityUtility
271263
.loadCertificatesFromPemFile(config.getBrokerClientTrustCertsFilePath());
272264

273265
SSLContext sslCtx;
274-
AuthenticationDataProvider authData = auth.getAuthData();
266+
AuthenticationDataProvider authData = proxyClientAuthentication.getAuthData();
275267
if (config.isBrokerClientTlsEnabledWithKeyStore()) {
276268
KeyStoreParams params = authData.hasDataForTls() ? authData.getTlsKeyStoreParams() : null;
277269
sslCtx = KeyStoreSSLContext.createClientSslContext(
@@ -311,11 +303,6 @@ protected HttpClient newHttpClient() {
311303
return new JettyHttpClient(contextFactory);
312304
} catch (Exception e) {
313305
LOG.error("new jetty http client exception ", e);
314-
try {
315-
auth.close();
316-
} catch (IOException ioe) {
317-
LOG.error("Failed to close the authentication service", ioe);
318-
}
319306
throw new PulsarClientException.InvalidConfigurationException(e.getMessage());
320307
}
321308
}

pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@
5252
import org.apache.pulsar.PulsarVersion;
5353
import org.apache.pulsar.client.api.Authentication;
5454
import org.apache.pulsar.client.api.AuthenticationDataProvider;
55-
import org.apache.pulsar.client.api.AuthenticationFactory;
5655
import org.apache.pulsar.client.api.PulsarClientException;
5756
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
5857
import org.apache.pulsar.common.api.AuthData;
@@ -114,8 +113,7 @@ public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection)
114113

115114
if (!isEmpty(config.getBrokerClientAuthenticationPlugin())) {
116115
try {
117-
authData = AuthenticationFactory.create(config.getBrokerClientAuthenticationPlugin(),
118-
config.getBrokerClientAuthenticationParameters()).getAuthData();
116+
authData = authentication.getAuthData();
119117
} catch (PulsarClientException e) {
120118
throw new RuntimeException(e);
121119
}

pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java

+3-9
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,6 @@
6363
import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
6464
import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServlets;
6565
import org.apache.pulsar.client.api.Authentication;
66-
import org.apache.pulsar.client.api.AuthenticationFactory;
67-
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
6866
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
6967
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
7068
import org.apache.pulsar.common.util.netty.DnsResolverUtil;
@@ -152,7 +150,8 @@ public class ProxyService implements Closeable {
152150
private final ConnectionController connectionController;
153151

154152
public ProxyService(ProxyConfiguration proxyConfig,
155-
AuthenticationService authenticationService) throws Exception {
153+
AuthenticationService authenticationService,
154+
Authentication proxyClientAuthentication) throws Exception {
156155
requireNonNull(proxyConfig);
157156
this.proxyConfig = proxyConfig;
158157
this.clientCnxs = Sets.newConcurrentHashSet();
@@ -201,12 +200,7 @@ public ProxyService(ProxyConfiguration proxyConfig,
201200
});
202201
}, 60, TimeUnit.SECONDS);
203202
this.proxyAdditionalServlets = AdditionalServlets.load(proxyConfig);
204-
if (proxyConfig.getBrokerClientAuthenticationPlugin() != null) {
205-
proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
206-
proxyConfig.getBrokerClientAuthenticationParameters());
207-
} else {
208-
proxyClientAuthentication = AuthenticationDisabled.INSTANCE;
209-
}
203+
this.proxyClientAuthentication = proxyClientAuthentication;
210204
this.connectionController = new ConnectionController.DefaultConnectionController(
211205
proxyConfig.getMaxConcurrentInboundConnections(),
212206
proxyConfig.getMaxConcurrentInboundConnectionsPerIp());

pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java

+40-6
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,13 @@
3131
import io.prometheus.client.Gauge;
3232
import io.prometheus.client.Gauge.Child;
3333
import io.prometheus.client.hotspot.DefaultExports;
34+
import java.io.IOException;
3435
import java.text.DateFormat;
3536
import java.text.SimpleDateFormat;
3637
import java.util.Collection;
3738
import java.util.Collections;
3839
import java.util.Date;
40+
import java.util.Objects;
3941
import java.util.function.Consumer;
4042
import lombok.Getter;
4143
import org.apache.logging.log4j.LogManager;
@@ -45,6 +47,10 @@
4547
import org.apache.pulsar.broker.authentication.AuthenticationService;
4648
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
4749
import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServletWithClassLoader;
50+
import org.apache.pulsar.client.api.Authentication;
51+
import org.apache.pulsar.client.api.AuthenticationFactory;
52+
import org.apache.pulsar.client.api.PulsarClientException;
53+
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
4854
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
4955
import org.apache.pulsar.common.configuration.VipStatus;
5056
import org.apache.pulsar.common.policies.data.ClusterData;
@@ -99,6 +105,9 @@ public class ProxyServiceStarter {
99105

100106
private ProxyConfiguration config;
101107

108+
@Getter
109+
private Authentication proxyClientAuthentication;
110+
102111
@Getter
103112
private ProxyService proxyService;
104113

@@ -239,8 +248,27 @@ public static void main(String[] args) throws Exception {
239248
public void start() throws Exception {
240249
AuthenticationService authenticationService = new AuthenticationService(
241250
PulsarConfigurationLoader.convertFrom(config));
251+
252+
if (config.getBrokerClientAuthenticationPlugin() != null) {
253+
proxyClientAuthentication = AuthenticationFactory.create(config.getBrokerClientAuthenticationPlugin(),
254+
config.getBrokerClientAuthenticationParameters());
255+
Objects.requireNonNull(proxyClientAuthentication, "No supported auth found for proxy");
256+
try {
257+
proxyClientAuthentication.start();
258+
} catch (Exception e) {
259+
try {
260+
proxyClientAuthentication.close();
261+
} catch (IOException ioe) {
262+
log.error("Failed to close the authentication service", ioe);
263+
}
264+
throw new PulsarClientException.InvalidConfigurationException(e.getMessage());
265+
}
266+
} else {
267+
proxyClientAuthentication = AuthenticationDisabled.INSTANCE;
268+
}
269+
242270
// create proxy service
243-
proxyService = new ProxyService(config, authenticationService);
271+
proxyService = new ProxyService(config, authenticationService, proxyClientAuthentication);
244272
// create a web-service
245273
server = new WebServer(config, authenticationService);
246274

@@ -287,7 +315,8 @@ public double get() {
287315
metricsInitialized = true;
288316
}
289317

290-
addWebServerHandlers(server, config, proxyService, proxyService.getDiscoveryProvider());
318+
addWebServerHandlers(server, config, proxyService, proxyService.getDiscoveryProvider(),
319+
proxyClientAuthentication);
291320

292321
// start web-service
293322
server.start();
@@ -301,6 +330,9 @@ public void close() {
301330
if (server != null) {
302331
server.stop();
303332
}
333+
if (proxyClientAuthentication != null) {
334+
proxyClientAuthentication.close();
335+
}
304336
} catch (Exception e) {
305337
log.warn("server couldn't stop gracefully {}", e.getMessage(), e);
306338
} finally {
@@ -311,9 +343,10 @@ public void close() {
311343
}
312344

313345
public static void addWebServerHandlers(WebServer server,
314-
ProxyConfiguration config,
315-
ProxyService service,
316-
BrokerDiscoveryProvider discoveryProvider) throws Exception {
346+
ProxyConfiguration config,
347+
ProxyService service,
348+
BrokerDiscoveryProvider discoveryProvider,
349+
Authentication proxyClientAuthentication) throws Exception {
317350
// We can make 'status.html' publicly accessible without authentication since
318351
// it does not contain any sensitive data.
319352
server.addRestResource("/", VipStatus.ATTRIBUTE_STATUS_FILE_PATH, config.getStatusFilePath(),
@@ -330,7 +363,8 @@ public static void addWebServerHandlers(WebServer server,
330363
}
331364
}
332365

333-
AdminProxyHandler adminProxyHandler = new AdminProxyHandler(config, discoveryProvider);
366+
AdminProxyHandler adminProxyHandler = new AdminProxyHandler(config, discoveryProvider,
367+
proxyClientAuthentication);
334368
ServletHolder servletHolder = new ServletHolder(adminProxyHandler);
335369
server.addServlet("/admin", servletHolder);
336370
server.addServlet("/lookup", servletHolder);

pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java

+11-1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import org.apache.commons.io.IOUtils;
2727
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
2828
import org.apache.pulsar.broker.authentication.AuthenticationService;
29+
import org.apache.pulsar.client.api.Authentication;
30+
import org.apache.pulsar.client.api.AuthenticationFactory;
2931
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
3032
import org.apache.pulsar.common.util.PortManager;
3133
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
@@ -121,6 +123,7 @@ public void close() {
121123
private ProxyService proxyService;
122124
private boolean useSeparateThreadPoolForProxyExtensions;
123125
private ProxyConfiguration proxyConfig = new ProxyConfiguration();
126+
private Authentication proxyClientAuthentication;
124127

125128
public SimpleProxyExtensionTestBase(boolean useSeparateThreadPoolForProxyExtensions) {
126129
this.useSeparateThreadPoolForProxyExtensions = useSeparateThreadPoolForProxyExtensions;
@@ -141,8 +144,12 @@ protected void setup() throws Exception {
141144
proxyConfig.setMetadataStoreUrl(DUMMY_VALUE);
142145
proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
143146

147+
proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
148+
proxyConfig.getBrokerClientAuthenticationParameters());
149+
proxyClientAuthentication.start();
150+
144151
proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService(
145-
PulsarConfigurationLoader.convertFrom(proxyConfig))));
152+
PulsarConfigurationLoader.convertFrom(proxyConfig)), proxyClientAuthentication));
146153
doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
147154
doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
148155

@@ -172,6 +179,9 @@ public void testBootstrapProtocolHandler() throws Exception {
172179
protected void cleanup() throws Exception {
173180
super.internalCleanup();
174181
proxyService.close();
182+
if (proxyClientAuthentication != null) {
183+
proxyClientAuthentication.close();
184+
}
175185

176186
if (tempDirectory != null) {
177187
FileUtils.deleteDirectory(tempDirectory);

pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerKeystoreTLSTest.java

+17-8
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,18 @@
1818
*/
1919
package org.apache.pulsar.proxy.server;
2020

21+
import static org.mockito.Mockito.doReturn;
22+
import static org.mockito.Mockito.spy;
23+
import java.util.HashSet;
24+
import java.util.Optional;
25+
import java.util.Set;
2126
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
2227
import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
2328
import org.apache.pulsar.broker.authentication.AuthenticationService;
2429
import org.apache.pulsar.broker.resources.PulsarResources;
2530
import org.apache.pulsar.client.admin.PulsarAdmin;
31+
import org.apache.pulsar.client.api.Authentication;
32+
import org.apache.pulsar.client.api.AuthenticationFactory;
2633
import org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls;
2734
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
2835
import org.apache.pulsar.common.policies.data.ClusterData;
@@ -34,18 +41,13 @@
3441
import org.testng.annotations.BeforeMethod;
3542
import org.testng.annotations.Test;
3643

37-
import java.util.HashSet;
38-
import java.util.Optional;
39-
import java.util.Set;
40-
41-
import static org.mockito.Mockito.doReturn;
42-
import static org.mockito.Mockito.spy;
43-
4444
public class AdminProxyHandlerKeystoreTLSTest extends MockedPulsarServiceBaseTest {
4545

4646

4747
private final ProxyConfiguration proxyConfig = new ProxyConfiguration();
4848

49+
private Authentication proxyClientAuthentication;
50+
4951
private WebServer webServer;
5052

5153
private BrokerDiscoveryProvider discoveryProvider;
@@ -103,12 +105,16 @@ protected void setup() throws Exception {
103105

104106
resource = new PulsarResources(new ZKMetadataStore(mockZooKeeper),
105107
new ZKMetadataStore(mockZooKeeperGlobal));
108+
proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
109+
proxyConfig.getBrokerClientAuthenticationParameters());
110+
proxyClientAuthentication.start();
111+
106112
webServer = new WebServer(proxyConfig, new AuthenticationService(
107113
PulsarConfigurationLoader.convertFrom(proxyConfig)));
108114
discoveryProvider = spy(new BrokerDiscoveryProvider(proxyConfig, resource));
109115
LoadManagerReport report = new LoadReport(brokerUrl.toString(), brokerUrlTls.toString(), null, null);
110116
doReturn(report).when(discoveryProvider).nextBroker();
111-
ServletHolder servletHolder = new ServletHolder(new AdminProxyHandler(proxyConfig, discoveryProvider));
117+
ServletHolder servletHolder = new ServletHolder(new AdminProxyHandler(proxyConfig, discoveryProvider, proxyClientAuthentication));
112118
webServer.addServlet("/admin", servletHolder);
113119
webServer.addServlet("/lookup", servletHolder);
114120
webServer.start();
@@ -118,6 +124,9 @@ protected void setup() throws Exception {
118124
@Override
119125
protected void cleanup() throws Exception {
120126
webServer.stop();
127+
if (proxyClientAuthentication != null) {
128+
proxyClientAuthentication.close();
129+
}
121130
super.internalCleanup();
122131
}
123132

pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerTest.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import javax.servlet.ServletException;
3333
import javax.servlet.http.HttpServletRequest;
3434
import javax.servlet.http.HttpServletResponse;
35+
import org.apache.pulsar.client.api.Authentication;
3536
import org.eclipse.jetty.client.HttpClient;
3637
import org.eclipse.jetty.client.api.Request;
3738
import org.testng.Assert;
@@ -46,7 +47,7 @@ public void setupMocks() throws ServletException {
4647
// given
4748
HttpClient httpClient = mock(HttpClient.class);
4849
adminProxyHandler = new AdminProxyHandler(mock(ProxyConfiguration.class),
49-
mock(BrokerDiscoveryProvider.class)) {
50+
mock(BrokerDiscoveryProvider.class), mock(Authentication.class)) {
5051
@Override
5152
protected HttpClient createHttpClient() throws ServletException {
5253
return httpClient;

0 commit comments

Comments
 (0)