Skip to content

Commit

Permalink
[direct-io-kafka] fix bounded read
Browse files Browse the repository at this point in the history
  • Loading branch information
je-ik committed Mar 10, 2025
1 parent 4f06f05 commit d61ed51
Showing 1 changed file with 3 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,7 @@ private void submitConsumerWithObserver(
handleRebalanceInOffsetCommit(kafka, listener);
}
rethrowErrorIfPresent(name, error);
log.debug("Current endOffsets {}, polledOffsets {}", endOffsets, polledOffsets);
terminateIfConsumed(stopAtCurrent, kafka, endOffsets, polledOffsets, completed);

progressWatermarkOnEmptyPartitions(
Expand Down Expand Up @@ -581,6 +582,7 @@ private static void notifyAssignedPartitions(
KafkaConsumer<?, ?> kafka, ConsumerRebalanceListener listener) {

Set<TopicPartition> assignment = kafka.assignment();
log.debug("Assignment before notification is {}", assignment);
if (!assignment.isEmpty()) {
listener.onPartitionsRevoked(assignment);
listener.onPartitionsAssigned(assignment);
Expand Down Expand Up @@ -937,7 +939,7 @@ public void onPartitionsAssigned(Collection<TopicPartition> parts) {
? getCommittedTopicOffsets(currentlyAssigned, c)
: getCurrentTopicOffsets(currentlyAssigned, c);
newOffsets.stream()
.filter(o -> o.getOffset() > 0)
.filter(o -> o.getOffset() >= 0)
.forEach(
o ->
polledOffsets.put(
Expand Down

0 comments on commit d61ed51

Please sign in to comment.