Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][misc] Sync commits from apache into 3.1_ds #307

Merged
merged 11 commits into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -942,7 +942,7 @@ public void asyncReadEntriesWithSkipOrWait(int maxEntries, long maxSizeBytes, Re

int numberOfEntriesToRead = applyMaxSizeCap(maxEntries, maxSizeBytes);

if (hasMoreEntries()) {
if (hasMoreEntries() && maxPosition.compareTo(readPosition) >= 0) {
// If we have available entries, we can read them immediately
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Read entries immediately", ledger.getName(), name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1682,8 +1682,9 @@ void createNewOpAddEntryForNewLedger() {
if (existsOp.ledger != null) {
existsOp = existsOp.duplicateAndClose(currentLedgerTimeoutTriggered);
} else {
// This scenario should not happen.
log.warn("[{}] An OpAddEntry's ledger is empty.", name);
// It may happen when the following operations execute at the same time, so it is expected.
// - Adding entry.
// - Switching ledger.
existsOp.setTimeoutTriggered(currentLedgerTimeoutTriggered);
}
existsOp.setLedger(currentLedger);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,8 @@ public synchronized void start() throws PulsarServerException {
"topicCompactionStrategyClassName",
ServiceUnitStateCompactionStrategy.class.getName()))
.create();
tableview.listen((key, value) -> handle(key, value));
tableview.listen(this::handleEvent);
tableview.forEach(this::handleExisting);
var strategy = (ServiceUnitStateCompactionStrategy) TopicCompactionStrategy.getInstance(TABLE_VIEW_TAG);
if (strategy == null) {
String err = TABLE_VIEW_TAG + "tag TopicCompactionStrategy is null.";
Expand Down Expand Up @@ -663,7 +664,7 @@ public CompletableFuture<Void> publishSplitEventAsync(Split split) {
}).thenApply(__ -> null);
}

private void handle(String serviceUnit, ServiceUnitStateData data) {
private void handleEvent(String serviceUnit, ServiceUnitStateData data) {
long totalHandledRequests = getHandlerTotalCounter(data).incrementAndGet();
if (debug()) {
log.info("{} received a handle request for serviceUnit:{}, data:{}. totalHandledRequests:{}",
Expand All @@ -689,6 +690,17 @@ private void handle(String serviceUnit, ServiceUnitStateData data) {
}
}

private void handleExisting(String serviceUnit, ServiceUnitStateData data) {
if (debug()) {
log.info("Loaded the service unit state data. serviceUnit: {}, data: {}", serviceUnit, data);
}
ServiceUnitState state = state(data);
if (state.equals(Owned) && isTargetBroker(data.dstBroker())) {
pulsar.getNamespaceService()
.onNamespaceBundleOwned(LoadManagerShared.getNamespaceBundle(pulsar, serviceUnit));
}
}

private static boolean isTransferCommand(ServiceUnitStateData data) {
if (data == null) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1596,6 +1596,11 @@ protected CompletableFuture<Optional<Topic>> loadOrCreatePersistentTopic(final S
Duration.ofSeconds(pulsar.getConfiguration().getTopicLoadTimeoutSeconds()), executor(),
() -> FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION);

topicFuture.exceptionally(t -> {
pulsarStats.recordTopicLoadFailed();
return null;
});

checkTopicNsOwnership(topic)
.thenRun(() -> {
final Semaphore topicLoadSemaphore = topicLoadRequestSemaphore.get();
Expand Down Expand Up @@ -1683,6 +1688,7 @@ private void checkOwnershipAndCreatePersistentTopic(final String topic, boolean
topicFuture.completeExceptionally(new ServiceUnitNotReadyException(msg));
}
}).exceptionally(ex -> {
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
topicFuture.completeExceptionally(ex);
return null;
});
Expand All @@ -1702,11 +1708,6 @@ private void createPersistentTopic(final String topic, boolean createIfMissing,
TopicName topicName = TopicName.get(topic);
final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());

topicFuture.exceptionally(t -> {
pulsarStats.recordTopicLoadFailed();
return null;
});

if (isTransactionInternalName(topicName)) {
String msg = String.format("Can not create transaction system topic %s", topic);
log.warn(msg);
Expand Down Expand Up @@ -1782,10 +1783,16 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime())
- topicCreateTimeMs;
pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
if (topicFuture.isCompletedExceptionally()) {
if (!topicFuture.complete(Optional.of(persistentTopic))) {
// Check create persistent topic timeout.
log.warn("{} future is already completed with failure {}, closing the"
+ " topic", topic, FutureUtil.getException(topicFuture));
if (topicFuture.isCompletedExceptionally()) {
log.warn("{} future is already completed with failure {}, closing"
+ " the topic", topic, FutureUtil.getException(topicFuture));
} else {
// It should not happen.
log.error("{} future is already completed by another thread, "
+ "which is not expected. Closing the current one", topic);
}
executor().submit(() -> {
persistentTopic.close().whenComplete((ignore, ex) -> {
topics.remove(topic, topicFuture);
Expand All @@ -1797,7 +1804,6 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
});
} else {
addTopicToStatsMaps(topicName, persistentTopic);
topicFuture.complete(Optional.of(persistentTopic));
}
})
.exceptionally((ex) -> {
Expand Down Expand Up @@ -1826,6 +1832,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
if (!createIfMissing && exception instanceof ManagedLedgerNotFoundException) {
// We were just trying to load a topic and the topic doesn't exist
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
loadFuture.completeExceptionally(exception);
topicFuture.complete(Optional.empty());
} else {
Expand Down Expand Up @@ -3713,6 +3720,9 @@ public CompletableFuture<SchemaVersion> deleteSchema(TopicName topicName) {
}

private CompletableFuture<Void> checkMaxTopicsPerNamespace(TopicName topicName, int numPartitions) {
if (isSystemTopic(topicName)) {
return CompletableFuture.completedFuture(null);
}
return pulsar.getPulsarResources().getNamespaceResources()
.getPoliciesAsync(topicName.getNamespaceObject())
.thenCompose(optPolicies -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.broker.service.persistent;

import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Range;
import java.util.ArrayList;
Expand Down Expand Up @@ -286,6 +287,12 @@ public void readMoreEntriesAsync() {
}

public synchronized void readMoreEntries() {
if (cursor.isClosed()) {
if (log.isDebugEnabled()) {
log.debug("[{}] Cursor is already closed, skipping read more entries.", cursor.getName());
}
return;
}
if (isSendInProgress()) {
// we cannot read more entries while sending the previous batch
// otherwise we could re-read the same entries and send duplicates
Expand Down Expand Up @@ -853,7 +860,14 @@ public synchronized void readEntriesFailed(ManagedLedgerException exception, Obj
ReadType readType = (ReadType) ctx;
long waitTimeMillis = readFailureBackoff.next();

if (exception instanceof NoMoreEntriesToReadException) {
// Do not keep reading more entries if the cursor is already closed.
if (exception instanceof ManagedLedgerException.CursorAlreadyClosedException) {
if (log.isDebugEnabled()) {
log.debug("[{}] Cursor is already closed, skipping read more entries", cursor.getName());
}
// Set the wait time to -1 to avoid rescheduling the read.
waitTimeMillis = -1;
} else if (exception instanceof NoMoreEntriesToReadException) {
if (cursor.getNumberOfEntriesInBacklog(false) == 0) {
// Topic has been terminated and there are no more entries to read
// Notify the consumer only if all the messages were already acknowledged
Expand Down Expand Up @@ -892,7 +906,14 @@ public synchronized void readEntriesFailed(ManagedLedgerException exception, Obj
}

readBatchSize = serviceConfig.getDispatcherMinReadBatchSize();
// Skip read if the waitTimeMillis is a nagetive value.
if (waitTimeMillis >= 0) {
scheduleReadEntriesWithDelay(exception, readType, waitTimeMillis);
}
}

@VisibleForTesting
void scheduleReadEntriesWithDelay(Exception e, ReadType readType, long waitTimeMillis) {
topic.getBrokerService().executor().schedule(() -> {
synchronized (PersistentDispatcherMultipleConsumers.this) {
// If it's a replay read we need to retry even if there's already
Expand All @@ -902,11 +923,10 @@ public synchronized void readEntriesFailed(ManagedLedgerException exception, Obj
log.info("[{}] Retrying read operation", name);
readMoreEntries();
} else {
log.info("[{}] Skipping read retry: havePendingRead {}", name, havePendingRead, exception);
log.info("[{}] Skipping read retry: havePendingRead {}", name, havePendingRead, e);
}
}
}, waitTimeMillis, TimeUnit.MILLISECONDS);

}

private boolean needTrimAckedMessages() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
import com.google.common.annotations.VisibleForTesting;
import io.netty.util.Recycler;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -318,6 +319,12 @@ public void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl

@Override
protected void readMoreEntries(Consumer consumer) {
if (cursor.isClosed()) {
if (log.isDebugEnabled()) {
log.debug("[{}] Cursor is already closed, skipping read more entries", cursor.getName());
}
return;
}
// consumer can be null when all consumers are disconnected from broker.
// so skip reading more entries if currently there is no active consumer.
if (null == consumer) {
Expand Down Expand Up @@ -489,6 +496,14 @@ private synchronized void internalReadEntriesFailed(ManagedLedgerException excep
Consumer c = readEntriesCtx.getConsumer();
readEntriesCtx.recycle();

// Do not keep reading messages from a closed cursor.
if (exception instanceof ManagedLedgerException.CursorAlreadyClosedException) {
if (log.isDebugEnabled()) {
log.debug("[{}] Cursor was already closed, skipping read more entries", cursor.getName());
}
return;
}

if (exception instanceof ConcurrentWaitCallbackException) {
// At most one pending read request is allowed when there are no more entries, we should not trigger more
// read operations in this case and just wait the existing read operation completes.
Expand Down Expand Up @@ -525,6 +540,11 @@ private synchronized void internalReadEntriesFailed(ManagedLedgerException excep
// Reduce read batch size to avoid flooding bookies with retries
readBatchSize = serviceConfig.getDispatcherMinReadBatchSize();

scheduleReadEntriesWithDelay(c, waitTimeMillis);
}

@VisibleForTesting
void scheduleReadEntriesWithDelay(Consumer c, long delay) {
topic.getBrokerService().executor().schedule(() -> {

// Jump again into dispatcher dedicated thread
Expand All @@ -546,8 +566,7 @@ private synchronized void internalReadEntriesFailed(ManagedLedgerException excep
}
}
});
}, waitTimeMillis, TimeUnit.MILLISECONDS);

}, delay, TimeUnit.MILLISECONDS);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.impl.ShadowManagedLedgerImpl;
import org.apache.bookkeeper.mledger.util.ManagedLedgerImplUtils;
import org.apache.bookkeeper.net.BookieId;
import org.apache.commons.collections4.CollectionUtils;
Expand Down Expand Up @@ -391,7 +390,7 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS
this.transactionBuffer = new TransactionBufferDisable(this);
}
transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) ledger.getLastConfirmedEntry(), true);
if (ledger instanceof ShadowManagedLedgerImpl) {
if (ledger.getConfig().getShadowSource() != null) {
shadowSourceTopic = TopicName.get(ledger.getConfig().getShadowSource());
} else {
shadowSourceTopic = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,56 @@ public boolean test(NamespaceBundle namespaceBundle) {
}
}

@Test(timeOut = 30 * 1000)
public void testNamespaceOwnershipListener() throws Exception {
Pair<TopicName, NamespaceBundle> topicAndBundle =
getBundleIsNotOwnByChangeEventTopic("test-namespace-ownership-listener");
TopicName topicName = topicAndBundle.getLeft();
NamespaceBundle bundle = topicAndBundle.getRight();

String broker = admin.lookups().lookupTopic(topicName.toString());
log.info("Assign the bundle {} to {}", bundle, broker);

checkOwnershipState(broker, bundle);

AtomicInteger onloadCount = new AtomicInteger(0);
AtomicInteger unloadCount = new AtomicInteger(0);

NamespaceBundleOwnershipListener listener = new NamespaceBundleOwnershipListener() {
@Override
public void onLoad(NamespaceBundle bundle) {
onloadCount.incrementAndGet();
}

@Override
public void unLoad(NamespaceBundle bundle) {
unloadCount.incrementAndGet();
}

@Override
public boolean test(NamespaceBundle namespaceBundle) {
return namespaceBundle.equals(bundle);
}
};
pulsar1.getNamespaceService().addNamespaceBundleOwnershipListener(listener);
pulsar2.getNamespaceService().addNamespaceBundleOwnershipListener(listener);

// There are a service unit state channel already started, when add listener, it will trigger the onload event.
Awaitility.await().untilAsserted(() -> {
assertEquals(onloadCount.get(), 1);
assertEquals(unloadCount.get(), 0);
});

ServiceUnitStateChannelImpl channel = new ServiceUnitStateChannelImpl(pulsar1);
channel.start();
Awaitility.await().untilAsserted(() -> {
assertEquals(onloadCount.get(), 2);
assertEquals(unloadCount.get(), 0);
});

channel.close();
}

private void checkOwnershipState(String broker, NamespaceBundle bundle)
throws ExecutionException, InterruptedException {
var targetLoadManager = secondaryLoadManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import java.util.concurrent.atomic.AtomicReference;
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.commons.lang3.tuple.Pair;
Expand Down Expand Up @@ -197,6 +198,7 @@ public void testSplitMapWithRefreshedStatMap() throws Exception {

ManagedLedger ledger = mock(ManagedLedger.class);
when(ledger.getCursors()).thenReturn(new ArrayList<>());
when(ledger.getConfig()).thenReturn(new ManagedLedgerConfig());

doReturn(CompletableFuture.completedFuture(null)).when(MockOwnershipCache).disableOwnership(any(NamespaceBundle.class));
Field ownership = NamespaceService.class.getDeclaredField("ownershipCache");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.netty.channel.ChannelHandlerContext;
import java.net.InetSocketAddress;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
Expand Down Expand Up @@ -74,7 +75,9 @@ public void setup() throws Exception {
.when(serverCnx).getCommandSender();

String topicName = TopicName.get("MessageCumulativeAckTest").toString();
PersistentTopic persistentTopic = new PersistentTopic(topicName, mock(ManagedLedger.class), pulsarTestContext.getBrokerService());
var mockManagedLedger = mock(ManagedLedger.class);
when(mockManagedLedger.getConfig()).thenReturn(new ManagedLedgerConfig());
var persistentTopic = new PersistentTopic(topicName, mockManagedLedger, pulsarTestContext.getBrokerService());
sub = spy(new PersistentSubscription(persistentTopic, "sub-1",
mock(ManagedCursorImpl.class), false));
doNothing().when(sub).acknowledgeMessage(any(), any(), any());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ void setupMLAsyncCallbackMocks() {
cursorMock = mock(ManagedCursorImpl.class);

doReturn(new ArrayList<>()).when(ledgerMock).getCursors();
doReturn(new ManagedLedgerConfig()).when(ledgerMock).getConfig();
doReturn("mockCursor").when(cursorMock).getName();

// call openLedgerComplete with ledgerMock on ML factory asyncOpen
Expand Down
Loading
Loading