Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Send allowed number of events (or less) when memory overflow happens
Browse files Browse the repository at this point in the history
  • Loading branch information
ferbncode committed Nov 2, 2018
1 parent 02bf998 commit 3edc0d1
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,18 @@ private List<ConsumedEvent> extract(final int count) {
return result;
}

public List<ConsumedEvent> extractMaxEvents(final long currentTimeMillis, final int count) {
final List<ConsumedEvent> result = extract(count);
if(!result.isEmpty()) {
lastSendMillis = currentTimeMillis;
}
return result;
}

public int getNumberOfUnsentEvents() {
return this.nakadiEvents.size();
}

int getKeepAliveInARow() {
return keepAliveInARow;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,15 +252,16 @@ private void streamToOutput(final boolean streamTimeoutReached) {
}

long memoryConsumed = offsets.values().stream().mapToLong(PartitionData::getBytesInMemory).sum();
while (memoryConsumed > getContext().getStreamMemoryLimitBytes()) {
while (memoryConsumed > getContext().getStreamMemoryLimitBytes() && getMessagesAllowedToSend() > 0) {
// Select heaviest guy (and on previous step we figured out that we can not send anymore full batches,
// therefore we can take all the events from one partition.
final Map.Entry<EventTypePartition, PartitionData> heaviestPartition = offsets.entrySet().stream().max(
Comparator.comparing(e -> e.getValue().getBytesInMemory())
).get(); // There is always at least 1 item in list

long deltaSize = heaviestPartition.getValue().getBytesInMemory();
final List<ConsumedEvent> events = heaviestPartition.getValue().extractAll(currentTimeMillis);
final List<ConsumedEvent> events = heaviestPartition.getValue().extractMaxEvents(currentTimeMillis,
Math.min((int) getMessagesAllowedToSend(), heaviestPartition.getValue().getNumberOfUnsentEvents()));
deltaSize -= heaviestPartition.getValue().getBytesInMemory();

sentSomething = true;
Expand Down

0 comments on commit 3edc0d1

Please sign in to comment.