Skip to content

Commit

Permalink
Only log slow message warnings periodically, once per sweep
Browse files Browse the repository at this point in the history
  • Loading branch information
astubbs committed Mar 30, 2021
1 parent cf9f948 commit 97d09fc
Showing 1 changed file with 19 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,11 @@ public class WorkManager<K, V> implements ConsumerRebalanceListener {
*/
private Map<TopicPartition, Integer> partitionsAssignmentEpochs = new HashMap<>();

// too aggressive for some situations? make configurable?
private final Duration thresholdForTimeSpentInQueueWarning = Duration.ofSeconds(10);

private final RateLimiter slowWarningRateLimit = new RateLimiter(5);

/**
* Use a private {@link DynamicLoadFactor}, useful for testing.
*/
Expand Down Expand Up @@ -386,6 +391,8 @@ public List<WorkContainer<K, V>> maybeGetWork(int requestedMaxWorkToRetrieve) {

var staleWorkToRemove = new ArrayList<WorkContainer<K, V>>();

var slowWorkCount = 0;

//
for (var shard : it) {
log.trace("Looking for work on shard: {}", shard.getKey());
Expand Down Expand Up @@ -433,6 +440,7 @@ public List<WorkContainer<K, V>> maybeGetWork(int requestedMaxWorkToRetrieve) {
continue;
}

// check if work can be taken
boolean hasNotSucceededAlready = !workContainer.isUserFunctionSucceeded();
boolean delayHasPassed = workContainer.hasDelayPassed(clock);
if (delayHasPassed && workContainer.isNotInFlight() && hasNotSucceededAlready) {
Expand All @@ -442,9 +450,9 @@ public List<WorkContainer<K, V>> maybeGetWork(int requestedMaxWorkToRetrieve) {
} else {
Duration timeInFlight = workContainer.getTimeInFlight();
String msg = "Can't take as work: Work ({}). Must all be true: Delay passed= {}. Is not in flight= {}. Has not succeeded already= {}. Time spent in execution queue: {}.";
int secondsThresholdForTimeSpentInQueueWarning = 10; // too aggressive for some situations? make configurable?
if (toSeconds(timeInFlight) > secondsThresholdForTimeSpentInQueueWarning) {
log.warn("Work has spent over " + secondsThresholdForTimeSpentInQueueWarning + " in queue! "
if (toSeconds(timeInFlight) > toSeconds(thresholdForTimeSpentInQueueWarning)) {
slowWorkCount++;
log.trace("Work has spent over " + thresholdForTimeSpentInQueueWarning + " in queue! "
+ msg, workContainer, delayHasPassed, workContainer.isNotInFlight(), hasNotSucceededAlready, timeInFlight);
} else {
log.trace(msg, workContainer, delayHasPassed, workContainer.isNotInFlight(), hasNotSucceededAlready, timeInFlight);
Expand All @@ -465,6 +473,14 @@ public List<WorkContainer<K, V>> maybeGetWork(int requestedMaxWorkToRetrieve) {
work.addAll(shardWork);
}

if (slowWorkCount > 0) {
final int finalSlowWorkCount = slowWorkCount;
slowWarningRateLimit.performIfNotLimited(() -> log.warn("Warning: {} records in the queue have been " +
"waiting longer than {}.",
finalSlowWorkCount, toSeconds(thresholdForTimeSpentInQueueWarning)));

}

// remove found stale work outside of loop
for (final WorkContainer<K, V> kvWorkContainer : staleWorkToRemove) {
removeWorkFromShard(kvWorkContainer);
Expand Down

0 comments on commit 97d09fc

Please sign in to comment.