From 895dac0f0bfb3a3ed2f3ecbe2666fe5edb599f1d Mon Sep 17 00:00:00 2001 From: Antony Stubbs Date: Fri, 26 Mar 2021 14:36:32 +1300 Subject: [PATCH] Only log slow message warnings periodically, once per sweep --- .../parallelconsumer/WorkManager.java | 23 ++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/WorkManager.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/WorkManager.java index a41fe9f4a..49fe78369 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/WorkManager.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/WorkManager.java @@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import static io.confluent.csid.utils.BackportUtils.toSeconds; @@ -148,6 +149,11 @@ public class WorkManager implements ConsumerRebalanceListener { */ private Map partitionsAssignmentEpochs = new HashMap<>(); + // too aggressive for some situations? make configurable? + private final Duration thresholdForTimeSpentInQueueWarning = Duration.ofSeconds(10); + + private final RateLimiter slowWarningRateLimit = new RateLimiter(5); + /** * Use a private {@link DynamicLoadFactor}, useful for testing. */ @@ -386,6 +392,8 @@ public List> maybeGetWork(int requestedMaxWorkToRetrieve) { var staleWorkToRemove = new ArrayList>(); + var slowWorkCount = 0; + // for (var shard : it) { log.trace("Looking for work on shard: {}", shard.getKey()); @@ -433,6 +441,7 @@ public List> maybeGetWork(int requestedMaxWorkToRetrieve) { continue; } + // check if work can be taken boolean hasNotSucceededAlready = !workContainer.isUserFunctionSucceeded(); boolean delayHasPassed = workContainer.hasDelayPassed(clock); if (delayHasPassed && workContainer.isNotInFlight() && hasNotSucceededAlready) { @@ -442,9 +451,9 @@ public List> maybeGetWork(int requestedMaxWorkToRetrieve) { } else { Duration timeInFlight = workContainer.getTimeInFlight(); String msg = "Can't take as work: Work ({}). Must all be true: Delay passed= {}. Is not in flight= {}. Has not succeeded already= {}. Time spent in execution queue: {}."; - int secondsThresholdForTimeSpentInQueueWarning = 10; // too aggressive for some situations? make configurable? - if (toSeconds(timeInFlight) > secondsThresholdForTimeSpentInQueueWarning) { - log.warn("Work has spent over " + secondsThresholdForTimeSpentInQueueWarning + " in queue! " + if (toSeconds(timeInFlight) > thresholdForTimeSpentInQueueWarning.toSeconds()) { + slowWorkCount++; + log.trace("Work has spent over " + thresholdForTimeSpentInQueueWarning + " in queue! " + msg, workContainer, delayHasPassed, workContainer.isNotInFlight(), hasNotSucceededAlready, timeInFlight); } else { log.trace(msg, workContainer, delayHasPassed, workContainer.isNotInFlight(), hasNotSucceededAlready, timeInFlight); @@ -465,6 +474,14 @@ public List> maybeGetWork(int requestedMaxWorkToRetrieve) { work.addAll(shardWork); } + if (slowWorkCount > 0) { + final int finalSlowWorkCount = slowWorkCount; + slowWarningRateLimit.performIfNotLimited(() -> log.warn("Warning: {} records in the queue have been " + + "waiting longer than {}.", + finalSlowWorkCount, thresholdForTimeSpentInQueueWarning.toSeconds())); + + } + // remove found stale work outside of loop for (final WorkContainer kvWorkContainer : staleWorkToRemove) { removeWorkFromShard(kvWorkContainer);