Skip to content

Commit

Permalink
Merge pull request #8 from zalando-incubator/fix-first-event-lost
Browse files Browse the repository at this point in the history
#6 Replace NakadiPartition.getOldestAvailableOffset() with BEGIN
  • Loading branch information
GJL authored Nov 18, 2016
2 parents e4f7b8f + a7bba84 commit 9e6e523
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,18 @@ public void rebalance(final EventTypePartitions consumerPartitions,

private Function<NakadiPartition, EventTypeCursor> getOffsetSelector(final EventType eventType) {
return
entry -> {
nakadiPartition -> {
final String offset;
if (startNewestAvailableOffset) {
offset = entry.getNewestAvailableOffset();
offset = nakadiPartition.getNewestAvailableOffset();
} else {
offset = entry.getOldestAvailableOffset();
offset = "BEGIN";

// messages will be replayed on each restart
log.warn("Using oldest available offset [{}] without persistent storage.", offset);
}

return EventTypeCursor.of(EventTypePartition.of(eventType, entry.getPartition()), offset);
return EventTypeCursor.of(EventTypePartition.of(eventType, nakadiPartition.getPartition()), offset);
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ private String getNextOffset(final EventType eventType, final NakadiPartition na
}

private String nextOffset(final EventType eventType, final NakadiPartition nakadiPartition) throws Exception {
String result = startNewestAvailableOffset ? nakadiPartition.getNewestAvailableOffset()
: nakadiPartition.getOldestAvailableOffset();
String result = startNewestAvailableOffset ? nakadiPartition.getNewestAvailableOffset() : "BEGIN";

final String path = consumerOffset.getOffsetPath(eventType.getName(), nakadiPartition.getPartition());
final String zkOffset = consumerOffset.getOffset(path);
Expand Down

0 comments on commit 9e6e523

Please sign in to comment.