Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
CoalescingBatchingEventHandler uses Map<T, Set<DisruptorFuture<R>>> (#…
Browse files Browse the repository at this point in the history
…6194)

CoalescingBatchingEventHandler uses Map<T, Set<DisruptorFuture<R>>>

Flush avoids expensive AbstractMapBasedMultimap.clear() that iterates
and clears each value collection.
  • Loading branch information
schlosna authored Aug 25, 2022
1 parent 94202ec commit 7eaaacf
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,35 @@

package com.palantir.atlasdb.autobatch;

import com.google.common.collect.HashMultimap;
import com.google.common.collect.SetMultimap;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.lmax.disruptor.EventHandler;
import com.palantir.atlasdb.autobatch.DisruptorAutobatcher.DisruptorFuture;
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.logger.SafeLogger;
import com.palantir.logsafe.logger.SafeLoggerFactory;
import java.util.Map;
import java.util.Set;

final class CoalescingBatchingEventHandler<T, R> implements EventHandler<BatchElement<T, R>> {

private static final SafeLogger log = SafeLoggerFactory.get(CoalescingBatchingEventHandler.class);

private final CoalescingRequestFunction<T, R> function;
private final SetMultimap<T, DisruptorAutobatcher.DisruptorFuture<R>> pending;

// explicitly not using Multimap to avoid expensive com.google.common.collect.AbstractMapBasedMultimap.clear()
// that iterates and clears each value collection.
private final Map<T, Set<DisruptorFuture<R>>> pending;

CoalescingBatchingEventHandler(CoalescingRequestFunction<T, R> function, int bufferSize) {
this.function = function;
this.pending = HashMultimap.create(bufferSize, 5);
this.pending = Maps.newHashMapWithExpectedSize(bufferSize);
}

@Override
public void onEvent(BatchElement<T, R> event, long sequence, boolean endOfBatch) {
pending.put(event.argument(), event.result());
pending.computeIfAbsent(event.argument(), _key -> Sets.newHashSetWithExpectedSize(5))
.add(event.result());
if (endOfBatch) {
flush();
}
Expand All @@ -47,18 +53,22 @@ public void onEvent(BatchElement<T, R> event, long sequence, boolean endOfBatch)
private void flush() {
try {
Map<T, R> results = function.apply(pending.keySet());
pending.forEach((argument, future) -> {
if (results.containsKey(argument)) {
future.set(results.get(argument));
} else {
log.warn(
"Coalescing function has violated coalescing function postcondition",
SafeArg.of("functionClass", function.getClass().getCanonicalName()));
future.setException(new PostconditionFailedException(function.getClass()));
pending.forEach((argument, futures) -> {
R result = results.get(argument);
boolean hasResult = result != null || results.containsKey(argument);
for (DisruptorFuture<R> future : futures) {
if (hasResult) {
future.set(result);
} else {
log.warn(
"Coalescing function has violated coalescing function postcondition",
SafeArg.of("functionClass", function.getClass().getCanonicalName()));
future.setException(new PostconditionFailedException(function.getClass()));
}
}
});
} catch (Throwable t) {
pending.forEach((unused, future) -> future.setException(t));
pending.forEach((argument, futures) -> futures.forEach(future -> future.setException(t)));
}
pending.clear();
}
Expand Down
9 changes: 9 additions & 0 deletions changelog/@unreleased/pr-6194.v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
type: improvement
improvement:
description: |-
CoalescingBatchingEventHandler uses Map<T, Set<DisruptorFuture<R>>>
Flush avoids expensive AbstractMapBasedMultimap.clear() that iterates
and clears each value collection.
links:
- https://github.com/palantir/atlasdb/pull/6194

0 comments on commit 7eaaacf

Please sign in to comment.