diff --git a/paradox-nakadi-consumer-core/src/main/java/de/zalando/paradox/nakadi/consumer/core/partitioned/impl/SimplePartitionCoordinator.java b/paradox-nakadi-consumer-core/src/main/java/de/zalando/paradox/nakadi/consumer/core/partitioned/impl/SimplePartitionCoordinator.java index 165fc68..f1554c1 100644 --- a/paradox-nakadi-consumer-core/src/main/java/de/zalando/paradox/nakadi/consumer/core/partitioned/impl/SimplePartitionCoordinator.java +++ b/paradox-nakadi-consumer-core/src/main/java/de/zalando/paradox/nakadi/consumer/core/partitioned/impl/SimplePartitionCoordinator.java @@ -59,18 +59,18 @@ public void rebalance(final EventTypePartitions consumerPartitions, private Function getOffsetSelector(final EventType eventType) { return - entry -> { + nakadiPartition -> { final String offset; if (startNewestAvailableOffset) { - offset = entry.getNewestAvailableOffset(); + offset = nakadiPartition.getNewestAvailableOffset(); } else { - offset = entry.getOldestAvailableOffset(); + offset = "BEGIN"; // messages will be replayed on each restart log.warn("Using oldest available offset [{}] without persistent storage.", offset); } - return EventTypeCursor.of(EventTypePartition.of(eventType, entry.getPartition()), offset); + return EventTypeCursor.of(EventTypePartition.of(eventType, nakadiPartition.getPartition()), offset); }; } diff --git a/paradox-nakadi-consumer-partitioned-zk/src/main/java/de/zalando/paradox/nakadi/consumer/partitioned/zk/AbstractZKConsumerPartitionCoordinator.java b/paradox-nakadi-consumer-partitioned-zk/src/main/java/de/zalando/paradox/nakadi/consumer/partitioned/zk/AbstractZKConsumerPartitionCoordinator.java index b8a6924..727cd75 100644 --- a/paradox-nakadi-consumer-partitioned-zk/src/main/java/de/zalando/paradox/nakadi/consumer/partitioned/zk/AbstractZKConsumerPartitionCoordinator.java +++ b/paradox-nakadi-consumer-partitioned-zk/src/main/java/de/zalando/paradox/nakadi/consumer/partitioned/zk/AbstractZKConsumerPartitionCoordinator.java @@ -71,8 +71,7 @@ private String getNextOffset(final EventType eventType, final NakadiPartition na } private String nextOffset(final EventType eventType, final NakadiPartition nakadiPartition) throws Exception { - String result = startNewestAvailableOffset ? nakadiPartition.getNewestAvailableOffset() - : nakadiPartition.getOldestAvailableOffset(); + String result = startNewestAvailableOffset ? nakadiPartition.getNewestAvailableOffset() : "BEGIN"; final String path = consumerOffset.getOffsetPath(eventType.getName(), nakadiPartition.getPartition()); final String zkOffset = consumerOffset.getOffset(path);