Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Rate-Limiter fails with a huge spike in a traffic, and publish/consume stuck for a longer time #24002

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -193,12 +193,15 @@ private long consumeTokensAndMaybeUpdateTokensBalance(long consumeTokens, boolea
// calculate the token delta by subtracting the consumed tokens from the new tokens
long tokenDelta = newTokens - currentPendingConsumedTokens;
if (tokenDelta != 0 || consumeTokens != 0) {
// prevent tokens to become excessive -ve where it can't recover
long cT = tokens < 0 ? 0 : consumeTokens;
// update the tokens and return the current token value
return TOKENS_UPDATER.updateAndGet(this,
long availableTokens = TOKENS_UPDATER.updateAndGet(this,
// limit the tokens to the capacity of the bucket
currentTokens -> Math.min(currentTokens + tokenDelta, getCapacity())
currentTokens -> Math.min(currentTokens + Math.max(0, tokenDelta), getCapacity())
// subtract the consumed tokens from the capped tokens
- consumeTokens);
- cT);
return availableTokens;
} else {
return tokens;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.testng.annotations.BeforeMethod;
Expand Down Expand Up @@ -48,6 +49,29 @@ private void incrementMillis(long millis) {
manualClockSource.addAndGet(TimeUnit.MILLISECONDS.toNanos(millis));
}

@Test
void testAsyncTokenWithMultiCall() throws Exception {
int rate = 2000;
int resolutionTimeNano = 8;
asyncTokenBucket = AsyncTokenBucket.builder().rate(rate).ratePeriodNanos(TimeUnit.SECONDS.toNanos(1)).clock(
new DefaultMonotonicSnapshotClock(TimeUnit.MILLISECONDS.toNanos(resolutionTimeNano), System::nanoTime))
.build();

for (int i = 0; i < (1000); i++) {
for (int j = 0; j < (1000); j++) {
long token = asyncTokenBucket.getTokens();
if (token < 0) {
// sleep to allow add new more tokens
Thread.sleep(resolutionTimeNano * 5);
assertTrue(asyncTokenBucket.getTokens() > 0);
}
// calling consumeTokens iteratively to simulate calling this method multiple times from multiple
// threads
asyncTokenBucket.consumeTokens(100);
}
}
}

@Test
void shouldAddTokensWithConfiguredRate() {
asyncTokenBucket =
Expand Down
Loading