29
29
import io .netty .handler .ssl .SslHandler ;
30
30
import io .netty .handler .ssl .SslProvider ;
31
31
import java .net .InetSocketAddress ;
32
+ import java .util .Map ;
32
33
import java .util .Objects ;
33
34
import java .util .concurrent .CompletableFuture ;
35
+ import java .util .concurrent .ConcurrentHashMap ;
34
36
import java .util .concurrent .TimeUnit ;
35
37
import java .util .function .Supplier ;
36
38
import lombok .Getter ;
@@ -59,9 +61,9 @@ public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel>
59
61
private final InetSocketAddress socks5ProxyAddress ;
60
62
private final String socks5ProxyUsername ;
61
63
private final String socks5ProxyPassword ;
62
-
63
- private final Supplier <SslContext > sslContextSupplier ;
64
- private NettySSLContextAutoRefreshBuilder nettySSLContextAutoRefreshBuilder ;
64
+ private final ClientConfigurationData conf ;
65
+ private Map < String , Supplier <SslContext >> sslContextSupplierMap ;
66
+ private Map < String , NettySSLContextAutoRefreshBuilder > nettySSLContextAutoRefreshBuilderMap ;
65
67
66
68
private static final long TLS_CERTIFICATE_CACHE_MILLIS = TimeUnit .MINUTES .toMillis (1 );
67
69
@@ -76,15 +78,34 @@ public PulsarChannelInitializer(ClientConfigurationData conf, Supplier<ClientCnx
76
78
this .socks5ProxyPassword = conf .getSocks5ProxyPassword ();
77
79
78
80
this .tlsEnabledWithKeyStore = conf .isUseKeyStoreTls ();
81
+ this .conf = conf .clone ();
82
+ this .sslContextSupplierMap = new ConcurrentHashMap <>();
83
+ this .nettySSLContextAutoRefreshBuilderMap = new ConcurrentHashMap <>();
84
+ }
79
85
80
- if (tlsEnabled ) {
81
- if (tlsEnabledWithKeyStore ) {
82
- AuthenticationDataProvider authData1 = conf .getAuthentication ().getAuthData ();
83
- if (StringUtils .isBlank (conf .getTlsTrustStorePath ())) {
84
- throw new PulsarClientException ("Failed to create TLS context, the tlsTrustStorePath"
85
- + " need to be configured if useKeyStoreTls enabled" );
86
- }
87
- nettySSLContextAutoRefreshBuilder = new NettySSLContextAutoRefreshBuilder (
86
+ @ Override
87
+ public void initChannel (SocketChannel ch ) throws Exception {
88
+ ch .pipeline ().addLast ("consolidation" , new FlushConsolidationHandler (1024 , true ));
89
+
90
+ // Setup channel except for the SsHandler for TLS enabled connections
91
+ ch .pipeline ().addLast ("ByteBufPairEncoder" , ByteBufPair .getEncoder (tlsEnabled ));
92
+
93
+ ch .pipeline ().addLast ("frameDecoder" , new LengthFieldBasedFrameDecoder (
94
+ Commands .DEFAULT_MAX_MESSAGE_SIZE + Commands .MESSAGE_SIZE_FRAME_PADDING , 0 , 4 , 0 , 4 ));
95
+ ChannelHandler clientCnx = clientCnxSupplier .get ();
96
+ ch .pipeline ().addLast ("handler" , clientCnx );
97
+ }
98
+
99
+ private NettySSLContextAutoRefreshBuilder getNettySSLContextAutoRefreshBuilder (String host )
100
+ throws PulsarClientException {
101
+ if (tlsEnabledWithKeyStore ) {
102
+ AuthenticationDataProvider authData1 = conf .getAuthentication ().getAuthData (host );
103
+ if (StringUtils .isBlank (conf .getTlsTrustStorePath ())) {
104
+ throw new PulsarClientException ("Failed to create TLS context, the tlsTrustStorePath"
105
+ + " need to be configured if useKeyStoreTls enabled" );
106
+ }
107
+ return nettySSLContextAutoRefreshBuilderMap .computeIfAbsent (host ,
108
+ key -> new NettySSLContextAutoRefreshBuilder (
88
109
conf .getSslProvider (),
89
110
conf .isTlsAllowInsecureConnection (),
90
111
conf .getTlsTrustStoreType (),
@@ -96,64 +117,52 @@ public PulsarChannelInitializer(ClientConfigurationData conf, Supplier<ClientCnx
96
117
conf .getTlsCiphers (),
97
118
conf .getTlsProtocols (),
98
119
TLS_CERTIFICATE_CACHE_MILLIS ,
99
- authData1 );
100
- }
101
-
102
- sslContextSupplier = new ObjectCache <SslContext >(() -> {
103
- try {
104
- SslProvider sslProvider = null ;
105
- if (conf .getSslProvider () != null ) {
106
- sslProvider = SslProvider .valueOf (conf .getSslProvider ());
107
- }
108
-
109
- // Set client certificate if available
110
- AuthenticationDataProvider authData = conf .getAuthentication ().getAuthData ();
111
- if (authData .hasDataForTls ()) {
112
- return authData .getTlsTrustStoreStream () == null
113
- ? SecurityUtility .createNettySslContextForClient (
114
- sslProvider ,
115
- conf .isTlsAllowInsecureConnection (),
116
- conf .getTlsTrustCertsFilePath (),
117
- authData .getTlsCertificates (),
118
- authData .getTlsPrivateKey (),
119
- conf .getTlsCiphers (),
120
- conf .getTlsProtocols ())
121
- : SecurityUtility .createNettySslContextForClient (sslProvider ,
122
- conf .isTlsAllowInsecureConnection (),
123
- authData .getTlsTrustStoreStream (),
124
- authData .getTlsCertificates (), authData .getTlsPrivateKey (),
125
- conf .getTlsCiphers (),
126
- conf .getTlsProtocols ());
127
- } else {
128
- return SecurityUtility .createNettySslContextForClient (
129
- sslProvider ,
130
- conf .isTlsAllowInsecureConnection (),
131
- conf .getTlsTrustCertsFilePath (),
132
- conf .getTlsCertificateFilePath (),
133
- conf .getTlsKeyFilePath (),
134
- conf .getTlsCiphers (),
135
- conf .getTlsProtocols ());
136
- }
137
- } catch (Exception e ) {
138
- throw new RuntimeException ("Failed to create TLS context" , e );
139
- }
140
- }, TLS_CERTIFICATE_CACHE_MILLIS , TimeUnit .MILLISECONDS );
141
- } else {
142
- sslContextSupplier = null ;
120
+ authData1 ));
143
121
}
122
+ throw new PulsarClientException (
123
+ "Failed to create TLS context, the tlsEnabledWithKeyStore need to be true" );
144
124
}
145
125
146
- @ Override
147
- public void initChannel (SocketChannel ch ) throws Exception {
148
- ch .pipeline ().addLast ("consolidation" , new FlushConsolidationHandler (1024 , true ));
149
-
150
- // Setup channel except for the SsHandler for TLS enabled connections
151
- ch .pipeline ().addLast ("ByteBufPairEncoder" , ByteBufPair .getEncoder (tlsEnabled ));
126
+ private Supplier <SslContext > getSslContextSupplier (String host ) {
127
+ return sslContextSupplierMap .computeIfAbsent (host , key -> new ObjectCache <>(() -> {
128
+ try {
129
+ SslProvider sslProvider = null ;
130
+ if (conf .getSslProvider () != null ) {
131
+ sslProvider = SslProvider .valueOf (conf .getSslProvider ());
132
+ }
152
133
153
- ch .pipeline ().addLast ("frameDecoder" , new LengthFieldBasedFrameDecoder (
154
- Commands .DEFAULT_MAX_MESSAGE_SIZE + Commands .MESSAGE_SIZE_FRAME_PADDING , 0 , 4 , 0 , 4 ));
155
- ChannelHandler clientCnx = clientCnxSupplier .get ();
156
- ch .pipeline ().addLast ("handler" , clientCnx );
134
+ // Set client certificate if available
135
+ AuthenticationDataProvider authData = conf .getAuthentication ().getAuthData (host );
136
+ if (authData .hasDataForTls ()) {
137
+ return authData .getTlsTrustStoreStream () == null
138
+ ? SecurityUtility .createNettySslContextForClient (
139
+ sslProvider ,
140
+ conf .isTlsAllowInsecureConnection (),
141
+ conf .getTlsTrustCertsFilePath (),
142
+ authData .getTlsCertificates (),
143
+ authData .getTlsPrivateKey (),
144
+ conf .getTlsCiphers (),
145
+ conf .getTlsProtocols ())
146
+ : SecurityUtility .createNettySslContextForClient (sslProvider ,
147
+ conf .isTlsAllowInsecureConnection (),
148
+ authData .getTlsTrustStoreStream (),
149
+ authData .getTlsCertificates (), authData .getTlsPrivateKey (),
150
+ conf .getTlsCiphers (),
151
+ conf .getTlsProtocols ());
152
+ } else {
153
+ return SecurityUtility .createNettySslContextForClient (
154
+ sslProvider ,
155
+ conf .isTlsAllowInsecureConnection (),
156
+ conf .getTlsTrustCertsFilePath (),
157
+ conf .getTlsCertificateFilePath (),
158
+ conf .getTlsKeyFilePath (),
159
+ conf .getTlsCiphers (),
160
+ conf .getTlsProtocols ());
161
+ }
162
+ } catch (Exception e ) {
163
+ throw new RuntimeException ("Failed to create TLS context" , e );
164
+ }
165
+ }, TLS_CERTIFICATE_CACHE_MILLIS , TimeUnit .MILLISECONDS ));
157
166
}
158
167
159
168
/**
@@ -175,9 +184,10 @@ CompletableFuture<Channel> initTls(Channel ch, InetSocketAddress sniHost) {
175
184
ch .eventLoop ().execute (() -> {
176
185
try {
177
186
SslHandler handler = tlsEnabledWithKeyStore
178
- ? new SslHandler (nettySSLContextAutoRefreshBuilder .get ()
179
- .createSSLEngine (sniHost .getHostString (), sniHost .getPort ()))
180
- : sslContextSupplier .get ().newHandler (ch .alloc (), sniHost .getHostString (), sniHost .getPort ());
187
+ ? new SslHandler (getNettySSLContextAutoRefreshBuilder (sniHost .getHostName ()).get ()
188
+ .createSSLEngine (sniHost .getHostString (), sniHost .getPort ()))
189
+ : getSslContextSupplier (sniHost .getHostName ()).get ()
190
+ .newHandler (ch .alloc (), sniHost .getHostString (), sniHost .getPort ());
181
191
182
192
if (tlsHostnameVerificationEnabled ) {
183
193
SecurityUtility .configureSSLHandler (handler );
0 commit comments