Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][misc] Sync commits from apache into 3.1_ds #304

Merged
merged 19 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
70964e0
[fix][client] TransactionCoordinatorClient support retry (#23081)
chenhongSZ Jul 30, 2024
1a87104
[improve][broker]Reuse method getAvailableBrokersAsync (#23099)
crossoverJie Jul 31, 2024
20d2660
[fix] [broker] fix replicated namespaces filter in filterAndUnloadMat…
iosdev747 Jul 31, 2024
01412f8
[fix][broker] fix exception may hidden and result in stuck when topic…
TakaHiR07 Aug 2, 2024
d39b300
[fix][broker] type cast on exceptions in exceptionally can lead to lo…
shibd Aug 2, 2024
78aed0c
[fix][broker] Fix authenticate order in AuthenticationProviderList (#…
coderzc Aug 5, 2024
0d551d5
[fix][broker]A failed consumer/producer future in ServerCnx can never…
poorbarcode Aug 6, 2024
90263b1
[fix][broker] Handle the case when `getOwnedServiceUnits` fails grace…
Demogorgon314 Aug 6, 2024
0489f93
[improve][misc] Improve AES-GCM cipher performance (#23122)
ocadaruma Aug 5, 2024
0b778f1
[improve][broker] Support to specify auth-plugin, auth-parameters and…
Demogorgon314 Aug 6, 2024
59ae85e
[improve][broker] Reduce the CPU pressure from the transaction buffer…
BewareMyPower Jul 29, 2024
6144f89
[improve][misc] Optimize TLS performance by omitting extra buffer cop…
lhotari Aug 6, 2024
63401a3
[fix][fn] Make python install dependencies from requirements.txt (#20…
jiangpengcheng Oct 18, 2023
2c89529
[fix][client] Fix timeout handling in Pulsar Admin client (#23128)
lhotari Aug 7, 2024
cc7750d
[improve][fn] Add support for overriding additionalJavaRuntimeArgumen…
lhotari Aug 7, 2024
0bef002
Merge branch '3.1_ds' into cherry-picks
nikhil-ctds Aug 9, 2024
086429d
[fix][broker] Fix the bug that elected leader thinks it's a follower …
heesung-sn Aug 8, 2024
eb1102f
[improve][misc] Upgrade jersey to 2.41 (#22290)
nodece Mar 18, 2024
6164310
[improve][client] Add maxConnectionsPerHost and connectionMaxIdleSeco…
lhotari Aug 8, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions distribution/server/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,8 @@ The Apache Software License, Version 2.0
- io.reactivex.rxjava3-rxjava-3.0.1.jar
* RoaringBitmap
- org.roaringbitmap-RoaringBitmap-1.2.0.jar
* Spotify completable-futures
- com.spotify-completable-futures-0.3.6.jar

BSD 3-clause "New" or "Revised" License
* Google auth library
Expand Down Expand Up @@ -543,16 +545,16 @@ CDDL-1.1 -- ../licenses/LICENSE-CDDL-1.1.txt
- org.glassfish.hk2-osgi-resource-locator-1.0.3.jar
- org.glassfish.hk2.external-aopalliance-repackaged-2.6.1.jar
* Jersey
- org.glassfish.jersey.containers-jersey-container-servlet-2.34.jar
- org.glassfish.jersey.containers-jersey-container-servlet-core-2.34.jar
- org.glassfish.jersey.core-jersey-client-2.34.jar
- org.glassfish.jersey.core-jersey-common-2.34.jar
- org.glassfish.jersey.core-jersey-server-2.34.jar
- org.glassfish.jersey.ext-jersey-entity-filtering-2.34.jar
- org.glassfish.jersey.media-jersey-media-json-jackson-2.34.jar
- org.glassfish.jersey.media-jersey-media-multipart-2.34.jar
- org.glassfish.jersey.inject-jersey-hk2-2.34.jar
* Mimepull -- org.jvnet.mimepull-mimepull-1.9.13.jar
- org.glassfish.jersey.containers-jersey-container-servlet-2.42.jar
- org.glassfish.jersey.containers-jersey-container-servlet-core-2.42.jar
- org.glassfish.jersey.core-jersey-client-2.42.jar
- org.glassfish.jersey.core-jersey-common-2.42.jar
- org.glassfish.jersey.core-jersey-server-2.42.jar
- org.glassfish.jersey.ext-jersey-entity-filtering-2.42.jar
- org.glassfish.jersey.media-jersey-media-json-jackson-2.42.jar
- org.glassfish.jersey.media-jersey-media-multipart-2.42.jar
- org.glassfish.jersey.inject-jersey-hk2-2.42.jar
* Mimepull -- org.jvnet.mimepull-mimepull-1.9.15.jar

Eclipse Distribution License 1.0 -- ../licenses/LICENSE-EDL-1.0.txt
* Jakarta Activation
Expand Down
15 changes: 8 additions & 7 deletions distribution/shell/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,7 @@ The Apache Software License, Version 2.0
* Apache Avro
- avro-1.11.3.jar
- avro-protobuf-1.11.3.jar
* Spotify completable-futures -- completable-futures-0.3.6.jar

BSD 3-clause "New" or "Revised" License
* JSR305 -- jsr305-3.0.2.jar -- ../licenses/LICENSE-JSR305.txt
Expand Down Expand Up @@ -438,13 +439,13 @@ CDDL-1.1 -- ../licenses/LICENSE-CDDL-1.1.txt
- aopalliance-repackaged-2.6.1.jar
- osgi-resource-locator-1.0.3.jar
* Jersey
- jersey-client-2.34.jar
- jersey-common-2.34.jar
- jersey-entity-filtering-2.34.jar
- jersey-media-json-jackson-2.34.jar
- jersey-media-multipart-2.34.jar
- jersey-hk2-2.34.jar
* Mimepull -- mimepull-1.9.13.jar
- jersey-client-2.42.jar
- jersey-common-2.42.jar
- jersey-entity-filtering-2.42.jar
- jersey-media-json-jackson-2.42.jar
- jersey-media-multipart-2.42.jar
- jersey-hk2-2.42.jar
* Mimepull -- mimepull-1.9.15.jar

Eclipse Distribution License 1.0 -- ../licenses/LICENSE-EDL-1.0.txt
* Jakarta Activation
Expand Down
3 changes: 3 additions & 0 deletions docker/pulsar/scripts/gen-yml-from-env.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@
'brokerClientTlsProtocols',
'webServiceTlsCiphers',
'webServiceTlsProtocols',
'additionalJavaRuntimeArguments',
'additionalEnabledConnectorUrlPatterns',
'additionalEnabledFunctionsUrlPatterns'
]

PF_ENV_PREFIX = 'PF_'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,8 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
}, null);
return future;
}).thenAccept(ml -> callback.openLedgerComplete(ml, ctx)).exceptionally(exception -> {
callback.openLedgerFailed((ManagedLedgerException) exception.getCause(), ctx);
callback.openLedgerFailed(ManagedLedgerException
.getManagedLedgerException(FutureUtil.unwrapCompletionException(exception)), ctx);
return null;
});
}
Expand All @@ -444,7 +445,8 @@ public void asyncOpenReadOnlyManagedLedger(String managedLedgerName,

}).exceptionally(e -> {
log.error("[{}] Failed to initialize Read-only managed ledger", managedLedgerName, e);
callback.openReadOnlyManagedLedgerFailed((ManagedLedgerException) e.getCause(), ctx);
callback.openReadOnlyManagedLedgerFailed(ManagedLedgerException
.getManagedLedgerException(FutureUtil.unwrapCompletionException(e)), ctx);
return null;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -983,7 +983,8 @@ public synchronized void asyncOpenCursor(final String cursorName, final InitialP
if (uninitializedCursors.containsKey(cursorName)) {
uninitializedCursors.get(cursorName).thenAccept(cursor -> callback.openCursorComplete(cursor, ctx))
.exceptionally(ex -> {
callback.openCursorFailed((ManagedLedgerException) ex, ctx);
callback.openCursorFailed(ManagedLedgerException
.getManagedLedgerException(FutureUtil.unwrapCompletionException(ex)), ctx);
return null;
});
return;
Expand Down Expand Up @@ -3014,9 +3015,8 @@ public void asyncDelete(final DeleteLedgerCallback callback, final Object ctx) {
truncateFuture.whenComplete((ignore, exc) -> {
if (exc != null) {
log.error("[{}] Error truncating ledger for deletion", name, exc);
callback.deleteLedgerFailed(exc instanceof ManagedLedgerException
? (ManagedLedgerException) exc : new ManagedLedgerException(exc),
ctx);
callback.deleteLedgerFailed(ManagedLedgerException.getManagedLedgerException(
FutureUtil.unwrapCompletionException(exc)), ctx);
} else {
asyncDeleteInternal(callback, ctx);
}
Expand Down
9 changes: 8 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ flexible messaging model and an intuitive client API.</description>
<netty-iouring.version>0.0.21.Final</netty-iouring.version>
<jetty.version>9.4.54.v20240208</jetty.version>
<conscrypt.version>2.5.2</conscrypt.version>
<jersey.version>2.34</jersey.version>
<jersey.version>2.42</jersey.version>
<athenz.version>1.10.50</athenz.version>
<prometheus.version>0.16.0</prometheus.version>
<vertx.version>4.5.8</vertx.version>
Expand Down Expand Up @@ -255,6 +255,7 @@ flexible messaging model and an intuitive client API.</description>
<disruptor.version>3.4.3</disruptor.version>
<zstd-jni.version>1.5.2-3</zstd-jni.version>
<netty-reactive-streams.version>2.0.6</netty-reactive-streams.version>
<completable-futures.version>0.3.6</completable-futures.version>
<failsafe.version>3.3.2</failsafe.version>

<!-- test dependencies -->
Expand Down Expand Up @@ -646,6 +647,12 @@ flexible messaging model and an intuitive client API.</description>
<version>${bookkeeper.version}</version>
</dependency>

<dependency>
<groupId>com.spotify</groupId>
<artifactId>completable-futures</artifactId>
<version>${completable-futures.version}</version>
</dependency>

<dependency>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ public CompletableFuture<AuthData> authenticateAsync(AuthData authData) {
if (log.isDebugEnabled()) {
log.debug("Authentication failed for auth provider " + authState.getClass() + ": ", ex);
}
authenticateRemainingAuthStates(authChallengeFuture, authData, ex, states.size() - 1);
authenticateRemainingAuthStates(authChallengeFuture, authData, ex,
states.isEmpty() ? -1 : 0);
}
});
return authChallengeFuture;
Expand All @@ -130,7 +131,7 @@ private void authenticateRemainingAuthStates(CompletableFuture<AuthData> authCha
AuthData clientAuthData,
Throwable previousException,
int index) {
if (index < 0) {
if (index < 0 || index >= states.size()) {
if (previousException == null) {
previousException = new AuthenticationException("Authentication required");
}
Expand All @@ -142,7 +143,7 @@ private void authenticateRemainingAuthStates(CompletableFuture<AuthData> authCha
AuthenticationState state = states.get(index);
if (state == authState) {
// Skip the current auth state
authenticateRemainingAuthStates(authChallengeFuture, clientAuthData, null, index - 1);
authenticateRemainingAuthStates(authChallengeFuture, clientAuthData, null, index + 1);
} else {
state.authenticateAsync(clientAuthData)
.whenComplete((authChallenge, ex) -> {
Expand All @@ -155,7 +156,7 @@ private void authenticateRemainingAuthStates(CompletableFuture<AuthData> authCha
log.debug("Authentication failed for auth provider "
+ authState.getClass() + ": ", ex);
}
authenticateRemainingAuthStates(authChallengeFuture, clientAuthData, ex, index - 1);
authenticateRemainingAuthStates(authChallengeFuture, clientAuthData, ex, index + 1);
}
});
}
Expand Down Expand Up @@ -228,15 +229,15 @@ public String getAuthMethodName() {
@Override
public CompletableFuture<String> authenticateAsync(AuthenticationDataSource authData) {
CompletableFuture<String> roleFuture = new CompletableFuture<>();
authenticateRemainingAuthProviders(roleFuture, authData, null, providers.size() - 1);
authenticateRemainingAuthProviders(roleFuture, authData, null, providers.isEmpty() ? -1 : 0);
return roleFuture;
}

private void authenticateRemainingAuthProviders(CompletableFuture<String> roleFuture,
AuthenticationDataSource authData,
Throwable previousException,
int index) {
if (index < 0) {
if (index < 0 || index >= providers.size()) {
if (previousException == null) {
previousException = new AuthenticationException("Authentication required");
}
Expand All @@ -254,7 +255,7 @@ private void authenticateRemainingAuthProviders(CompletableFuture<String> roleFu
if (log.isDebugEnabled()) {
log.debug("Authentication failed for auth provider " + provider.getClass() + ": ", ex);
}
authenticateRemainingAuthProviders(roleFuture, authData, ex, index - 1);
authenticateRemainingAuthProviders(roleFuture, authData, ex, index + 1);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,19 @@ private static class Arguments {
description = "Broker-service URL for new cluster with TLS encryption", required = false)
private String clusterBrokerServiceUrlTls;

@Parameter(names = {"-te",
"--tls-enable"},
description = "Enable TLS connection for new cluster")
private Boolean clusterBrokerClientTlsEnabled;

@Parameter(names = "--auth-plugin",
description = "The authentication plugin for new cluster")
protected String clusterAuthenticationPlugin;

@Parameter(names = "--auth-parameters",
description = "The authentication parameters for new cluster")
protected String clusterAuthenticationParameters;

@Parameter(names = { "-zk",
"--zookeeper" }, description = "Local ZooKeeper quorum connection string",
required = false,
Expand Down Expand Up @@ -315,14 +328,36 @@ private static void initializeCluster(Arguments arguments, int bundleNumberForDe

PulsarResources resources = new PulsarResources(localStore, configStore);

ClusterData clusterData = ClusterData.builder()
.serviceUrl(arguments.clusterWebServiceUrl)
.serviceUrlTls(arguments.clusterWebServiceUrlTls)
.brokerServiceUrl(arguments.clusterBrokerServiceUrl)
.brokerServiceUrlTls(arguments.clusterBrokerServiceUrlTls)
.proxyServiceUrl(arguments.clusterProxyUrl)
.proxyProtocol(arguments.clusterProxyProtocol)
.build();
ClusterData.Builder clusterDataBuilder = ClusterData.builder();
if (arguments.clusterWebServiceUrl != null) {
clusterDataBuilder.serviceUrl(arguments.clusterWebServiceUrl);
}
if (arguments.clusterWebServiceUrlTls != null) {
clusterDataBuilder.serviceUrlTls(arguments.clusterWebServiceUrlTls);
}
if (arguments.clusterBrokerServiceUrl != null) {
clusterDataBuilder.brokerServiceUrl(arguments.clusterBrokerServiceUrl);
}
if (arguments.clusterBrokerServiceUrlTls != null) {
clusterDataBuilder.brokerServiceUrlTls(arguments.clusterBrokerServiceUrlTls);
}
if (arguments.clusterBrokerClientTlsEnabled != null) {
clusterDataBuilder.brokerClientTlsEnabled(arguments.clusterBrokerClientTlsEnabled);
}
if (arguments.clusterAuthenticationPlugin != null) {
clusterDataBuilder.authenticationPlugin(arguments.clusterAuthenticationPlugin);
}
if (arguments.clusterAuthenticationParameters != null) {
clusterDataBuilder.authenticationParameters(arguments.clusterAuthenticationParameters);
}
if (arguments.clusterProxyUrl != null) {
clusterDataBuilder.proxyServiceUrl(arguments.clusterProxyUrl);
}
if (arguments.clusterProxyProtocol != null) {
clusterDataBuilder.proxyProtocol(arguments.clusterProxyProtocol);
}

ClusterData clusterData = clusterDataBuilder.build();
if (!resources.getClusterResources().clusterExists(arguments.cluster)) {
resources.getClusterResources().createCluster(arguments.cluster, clusterData);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -882,7 +882,7 @@ public void start() throws PulsarServerException {
MLTransactionMetadataStoreProvider.initBufferedWriterMetrics(getAdvertisedAddress());
MLPendingAckStoreProvider.initBufferedWriterMetrics(getAdvertisedAddress());

this.transactionBufferSnapshotServiceFactory = new TransactionBufferSnapshotServiceFactory(getClient());
this.transactionBufferSnapshotServiceFactory = new TransactionBufferSnapshotServiceFactory(this);

this.transactionTimer =
new HashedWheelTimer(new DefaultThreadFactory("pulsar-transaction-timer"));
Expand Down
Loading
Loading