From 3edc0d17611f01bacfd2abe7fd657bf479d0088b Mon Sep 17 00:00:00 2001 From: Suyash Garg Date: Thu, 18 Oct 2018 13:50:32 +0200 Subject: [PATCH 1/2] Send allowed number of events (or less) when memory overflow happens --- .../service/subscription/state/PartitionData.java | 12 ++++++++++++ .../service/subscription/state/StreamingState.java | 5 +++-- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/zalando/nakadi/service/subscription/state/PartitionData.java b/src/main/java/org/zalando/nakadi/service/subscription/state/PartitionData.java index ec2a3b588e..8d159eeb1d 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/state/PartitionData.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/state/PartitionData.java @@ -112,6 +112,18 @@ private List extract(final int count) { return result; } + public List extractMaxEvents(final long currentTimeMillis, final int count) { + final List result = extract(count); + if(!result.isEmpty()) { + lastSendMillis = currentTimeMillis; + } + return result; + } + + public int getNumberOfUnsentEvents() { + return this.nakadiEvents.size(); + } + int getKeepAliveInARow() { return keepAliveInARow; } diff --git a/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java b/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java index 24602e4c7d..0f3808f15b 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java @@ -252,7 +252,7 @@ private void streamToOutput(final boolean streamTimeoutReached) { } long memoryConsumed = offsets.values().stream().mapToLong(PartitionData::getBytesInMemory).sum(); - while (memoryConsumed > getContext().getStreamMemoryLimitBytes()) { + while (memoryConsumed > getContext().getStreamMemoryLimitBytes() && getMessagesAllowedToSend() > 0) { // Select heaviest guy (and on previous step we figured out that we can not send anymore full batches, // therefore we can take all the events from one partition. final Map.Entry heaviestPartition = offsets.entrySet().stream().max( @@ -260,7 +260,8 @@ private void streamToOutput(final boolean streamTimeoutReached) { ).get(); // There is always at least 1 item in list long deltaSize = heaviestPartition.getValue().getBytesInMemory(); - final List events = heaviestPartition.getValue().extractAll(currentTimeMillis); + final List events = heaviestPartition.getValue().extractMaxEvents(currentTimeMillis, + Math.min((int) getMessagesAllowedToSend(), heaviestPartition.getValue().getNumberOfUnsentEvents())); deltaSize -= heaviestPartition.getValue().getBytesInMemory(); sentSomething = true; From b19ca1f2bc83c1cb4aeeabc22dda07275d92e76c Mon Sep 17 00:00:00 2001 From: Suyash Garg Date: Fri, 2 Nov 2018 10:49:18 +0100 Subject: [PATCH 2/2] Don't find min as it is checked already --- .../nakadi/service/subscription/state/PartitionData.java | 4 ---- .../nakadi/service/subscription/state/StreamingState.java | 2 +- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/src/main/java/org/zalando/nakadi/service/subscription/state/PartitionData.java b/src/main/java/org/zalando/nakadi/service/subscription/state/PartitionData.java index 8d159eeb1d..dda6679bf7 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/state/PartitionData.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/state/PartitionData.java @@ -120,10 +120,6 @@ public List extractMaxEvents(final long currentTimeMillis, final return result; } - public int getNumberOfUnsentEvents() { - return this.nakadiEvents.size(); - } - int getKeepAliveInARow() { return keepAliveInARow; } diff --git a/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java b/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java index 0f3808f15b..008517c4a6 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java @@ -261,7 +261,7 @@ private void streamToOutput(final boolean streamTimeoutReached) { long deltaSize = heaviestPartition.getValue().getBytesInMemory(); final List events = heaviestPartition.getValue().extractMaxEvents(currentTimeMillis, - Math.min((int) getMessagesAllowedToSend(), heaviestPartition.getValue().getNumberOfUnsentEvents())); + (int) getMessagesAllowedToSend()); deltaSize -= heaviestPartition.getValue().getBytesInMemory(); sentSomething = true;