Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][misc] Sync commits from apache into 3.1_ds #297

Merged
merged 9 commits into from
Jul 30, 2024
Merged
2 changes: 1 addition & 1 deletion .github/workflows/ci-maven-cache-update.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ jobs:
name: Update Maven dependency cache for ${{ matrix.name }}
env:
JOB_NAME: Update Maven dependency cache for ${{ matrix.name }}
GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
DEVELOCITY_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
runs-on: ${{ matrix.runs-on }}
timeout-minutes: 45

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci-owasp-dependency-check.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ jobs:
name: Check ${{ matrix.branch }}
env:
JOB_NAME: Check ${{ matrix.branch }}
GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
DEVELOCITY_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
runs-on: ubuntu-22.04
timeout-minutes: 75
strategy:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/pulsar-ci-flaky.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ jobs:
env:
JOB_NAME: Flaky tests suite
COLLECT_COVERAGE: "${{ needs.preconditions.outputs.collect_coverage }}"
GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
DEVELOCITY_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
runs-on: ubuntu-22.04
timeout-minutes: 100
if: ${{ needs.preconditions.outputs.docs_only != 'true' }}
Expand Down
18 changes: 9 additions & 9 deletions .github/workflows/pulsar-ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ jobs:
name: Build and License check
env:
JOB_NAME: Build and License check
GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
DEVELOCITY_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
runs-on: ubuntu-22.04
timeout-minutes: 60
if: ${{ needs.preconditions.outputs.docs_only != 'true' }}
Expand Down Expand Up @@ -172,7 +172,7 @@ jobs:
env:
JOB_NAME: CI - Unit - ${{ matrix.name }}
COLLECT_COVERAGE: "${{ needs.preconditions.outputs.collect_coverage }}"
GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
DEVELOCITY_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
runs-on: ubuntu-22.04
timeout-minutes: ${{ matrix.timeout || 60 }}
needs: ['preconditions', 'build-and-license-check']
Expand Down Expand Up @@ -391,7 +391,7 @@ jobs:
needs: ['preconditions', 'build-and-license-check']
if: ${{ needs.preconditions.outputs.docs_only != 'true'}}
env:
GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
DEVELOCITY_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
steps:
- name: checkout
uses: actions/checkout@v4
Expand Down Expand Up @@ -465,7 +465,7 @@ jobs:
env:
JOB_NAME: CI - Integration - ${{ matrix.name }}
PULSAR_TEST_IMAGE_NAME: apachepulsar/java-test-image:latest
GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
DEVELOCITY_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
strategy:
fail-fast: false
matrix:
Expand Down Expand Up @@ -731,7 +731,7 @@ jobs:
needs: ['preconditions', 'build-and-license-check']
if: ${{ needs.preconditions.outputs.docs_only != 'true' }}
env:
GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
DEVELOCITY_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
steps:
- name: checkout
uses: actions/checkout@v4
Expand Down Expand Up @@ -841,7 +841,7 @@ jobs:
env:
JOB_NAME: CI - System - ${{ matrix.name }}
PULSAR_TEST_IMAGE_NAME: apachepulsar/pulsar-test-latest-version:latest
GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
DEVELOCITY_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
strategy:
fail-fast: false
matrix:
Expand Down Expand Up @@ -1072,7 +1072,7 @@ jobs:
env:
JOB_NAME: CI Flaky - System - ${{ matrix.name }}
PULSAR_TEST_IMAGE_NAME: apachepulsar/pulsar-test-latest-version:latest
GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
DEVELOCITY_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
strategy:
fail-fast: false
matrix:
Expand Down Expand Up @@ -1214,7 +1214,7 @@ jobs:
needs: ['preconditions', 'integration-tests']
if: ${{ needs.preconditions.outputs.docs_only != 'true' }}
env:
GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
DEVELOCITY_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
steps:
- name: checkout
uses: actions/checkout@v4
Expand Down Expand Up @@ -1249,7 +1249,7 @@ jobs:
needs: [ 'preconditions', 'integration-tests' ]
if: ${{ needs.preconditions.outputs.need_owasp == 'true' }}
env:
GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
DEVELOCITY_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
NIST_NVD_API_KEY: ${{ secrets.NIST_NVD_API_KEY }}
steps:
- name: checkout
Expand Down
2 changes: 1 addition & 1 deletion .mvn/extensions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<extension>
<groupId>com.gradle</groupId>
<artifactId>develocity-maven-extension</artifactId>
<version>1.21.4</version>
<version>1.21.6</version>
</extension>
<extension>
<groupId>com.gradle</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.delayed;

import com.google.common.annotations.VisibleForTesting;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import io.netty.util.concurrent.DefaultThreadFactory;
Expand All @@ -33,10 +34,15 @@
import org.apache.pulsar.broker.delayed.bucket.BookkeeperBucketSnapshotStorage;
import org.apache.pulsar.broker.delayed.bucket.BucketDelayedDeliveryTracker;
import org.apache.pulsar.broker.delayed.bucket.BucketSnapshotStorage;
import org.apache.pulsar.broker.delayed.bucket.RecoverDelayedDeliveryTrackerException;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BucketDelayedDeliveryTrackerFactory implements DelayedDeliveryTrackerFactory {
private static final Logger log = LoggerFactory.getLogger(BucketDelayedDeliveryTrackerFactory.class);

BucketSnapshotStorage bucketSnapshotStorage;

Expand Down Expand Up @@ -73,8 +79,28 @@ public void initialize(PulsarService pulsarService) throws Exception {

@Override
public DelayedDeliveryTracker newTracker(PersistentDispatcherMultipleConsumers dispatcher) {
return new BucketDelayedDeliveryTracker(dispatcher, timer, tickTimeMillis, isDelayedDeliveryDeliverAtTimeStrict,
bucketSnapshotStorage, delayedDeliveryMinIndexCountPerBucket,
String topicName = dispatcher.getTopic().getName();
String subscriptionName = dispatcher.getSubscription().getName();
BrokerService brokerService = dispatcher.getTopic().getBrokerService();
DelayedDeliveryTracker tracker;

try {
tracker = newTracker0(dispatcher);
} catch (RecoverDelayedDeliveryTrackerException ex) {
log.warn("Failed to recover BucketDelayedDeliveryTracker, fallback to InMemoryDelayedDeliveryTracker."
+ " topic {}, subscription {}", topicName, subscriptionName, ex);
// If failed to create BucketDelayedDeliveryTracker, fallback to InMemoryDelayedDeliveryTracker
brokerService.initializeFallbackDelayedDeliveryTrackerFactory();
tracker = brokerService.getFallbackDelayedDeliveryTrackerFactory().newTracker(dispatcher);
}
return tracker;
}

@VisibleForTesting
BucketDelayedDeliveryTracker newTracker0(PersistentDispatcherMultipleConsumers dispatcher)
throws RecoverDelayedDeliveryTrackerException {
return new BucketDelayedDeliveryTracker(dispatcher, timer, tickTimeMillis,
isDelayedDeliveryDeliverAtTimeStrict, bucketSnapshotStorage, delayedDeliveryMinIndexCountPerBucket,
TimeUnit.SECONDS.toMillis(delayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds),
delayedDeliveryMaxIndexesPerBucketSnapshotSegment, delayedDeliveryMaxNumBuckets);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,51 @@ public interface DelayedDeliveryTracker extends AutoCloseable {
* Close the subscription tracker and release all resources.
*/
void close();

DelayedDeliveryTracker DISABLE = new DelayedDeliveryTracker() {
@Override
public boolean addMessage(long ledgerId, long entryId, long deliveryAt) {
return false;
}

@Override
public boolean hasMessageAvailable() {
return false;
}

@Override
public long getNumberOfDelayedMessages() {
return 0;
}

@Override
public long getBufferMemoryUsage() {
return 0;
}

@Override
public NavigableSet<PositionImpl> getScheduledMessages(int maxMessages) {
return null;
}

@Override
public boolean shouldPauseAllDeliveries() {
return false;
}

@Override
public void resetTickTime(long tickTime) {

}

@Override
public CompletableFuture<Void> clear() {
return null;
}

@Override
public void close() {

}
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@
*/
package org.apache.pulsar.broker.delayed;

import com.google.common.annotations.VisibleForTesting;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InMemoryDelayedDeliveryTrackerFactory implements DelayedDeliveryTrackerFactory {
private static final Logger log = LoggerFactory.getLogger(InMemoryDelayedDeliveryTrackerFactory.class);

private Timer timer;

Expand All @@ -48,6 +52,21 @@ public void initialize(PulsarService pulsarService) {

@Override
public DelayedDeliveryTracker newTracker(PersistentDispatcherMultipleConsumers dispatcher) {
String topicName = dispatcher.getTopic().getName();
String subscriptionName = dispatcher.getSubscription().getName();
DelayedDeliveryTracker tracker = DelayedDeliveryTracker.DISABLE;
try {
tracker = newTracker0(dispatcher);
} catch (Exception e) {
// it should never go here
log.warn("Failed to create InMemoryDelayedDeliveryTracker, topic {}, subscription {}",
topicName, subscriptionName, e);
}
return tracker;
}

@VisibleForTesting
InMemoryDelayedDeliveryTracker newTracker0(PersistentDispatcherMultipleConsumers dispatcher) {
return new InMemoryDelayedDeliveryTracker(dispatcher, timer, tickTimeMillis,
isDelayedDeliveryDeliverAtTimeStrict, fixedDelayDetectionLookahead);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,22 +105,24 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker
private CompletableFuture<Void> pendingLoad = null;

public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher,
Timer timer, long tickTimeMillis,
boolean isDelayedDeliveryDeliverAtTimeStrict,
BucketSnapshotStorage bucketSnapshotStorage,
long minIndexCountPerBucket, long timeStepPerBucketSnapshotSegmentInMillis,
int maxIndexesPerBucketSnapshotSegment, int maxNumBuckets) {
Timer timer, long tickTimeMillis,
boolean isDelayedDeliveryDeliverAtTimeStrict,
BucketSnapshotStorage bucketSnapshotStorage,
long minIndexCountPerBucket, long timeStepPerBucketSnapshotSegmentInMillis,
int maxIndexesPerBucketSnapshotSegment, int maxNumBuckets)
throws RecoverDelayedDeliveryTrackerException {
this(dispatcher, timer, tickTimeMillis, Clock.systemUTC(), isDelayedDeliveryDeliverAtTimeStrict,
bucketSnapshotStorage, minIndexCountPerBucket, timeStepPerBucketSnapshotSegmentInMillis,
maxIndexesPerBucketSnapshotSegment, maxNumBuckets);
}

public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher,
Timer timer, long tickTimeMillis, Clock clock,
boolean isDelayedDeliveryDeliverAtTimeStrict,
BucketSnapshotStorage bucketSnapshotStorage,
long minIndexCountPerBucket, long timeStepPerBucketSnapshotSegmentInMillis,
int maxIndexesPerBucketSnapshotSegment, int maxNumBuckets) {
Timer timer, long tickTimeMillis, Clock clock,
boolean isDelayedDeliveryDeliverAtTimeStrict,
BucketSnapshotStorage bucketSnapshotStorage,
long minIndexCountPerBucket, long timeStepPerBucketSnapshotSegmentInMillis,
int maxIndexesPerBucketSnapshotSegment, int maxNumBuckets)
throws RecoverDelayedDeliveryTrackerException {
super(dispatcher, timer, tickTimeMillis, clock, isDelayedDeliveryDeliverAtTimeStrict);
this.minIndexCountPerBucket = minIndexCountPerBucket;
this.timeStepPerBucketSnapshotSegmentInMillis = timeStepPerBucketSnapshotSegmentInMillis;
Expand All @@ -133,10 +135,17 @@ public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispat
new MutableBucket(dispatcher.getName(), dispatcher.getCursor(), FutureUtil.Sequencer.create(),
bucketSnapshotStorage);
this.stats = new BucketDelayedMessageIndexStats();
this.numberDelayedMessages = recoverBucketSnapshot();

// Close the tracker if failed to recover.
try {
this.numberDelayedMessages = recoverBucketSnapshot();
} catch (RecoverDelayedDeliveryTrackerException e) {
close();
throw e;
}
}

private synchronized long recoverBucketSnapshot() throws RuntimeException {
private synchronized long recoverBucketSnapshot() throws RecoverDelayedDeliveryTrackerException {
ManagedCursor cursor = this.lastMutableBucket.getCursor();
Map<String, String> cursorProperties = cursor.getCursorProperties();
if (MapUtils.isEmpty(cursorProperties)) {
Expand Down Expand Up @@ -181,7 +190,7 @@ private synchronized long recoverBucketSnapshot() throws RuntimeException {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
throw new RuntimeException(e);
throw new RecoverDelayedDeliveryTrackerException(e);
}

for (Map.Entry<Range<Long>, CompletableFuture<List<DelayedIndex>>> entry : futures.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.delayed.bucket;

public class RecoverDelayedDeliveryTrackerException extends Exception {
public RecoverDelayedDeliveryTrackerException(Throwable cause) {
super(cause);
}
}
Loading
Loading