diff --git a/CHANGELOG.adoc b/CHANGELOG.adoc index 02871d16d..e9eb0810c 100644 --- a/CHANGELOG.adoc +++ b/CHANGELOG.adoc @@ -14,6 +14,12 @@ ifndef::github_name[] toc::[] endif::[] +== 0.5.2.6 + +=== Fixes + +* fix: RunLength offset decoding returns 0 base offset after no-progress commit - related to (#546) + == 0.5.2.5 === Fixes diff --git a/README.adoc b/README.adoc index 606eb5975..cc3c19d41 100644 --- a/README.adoc +++ b/README.adoc @@ -1312,6 +1312,12 @@ ifndef::github_name[] toc::[] endif::[] +== 0.5.2.6 + +=== Fixes + +* fix: RunLength offset decoding returns 0 base offset after no-progress commit - related to (#546) + == 0.5.2.5 === Fixes @@ -1363,12 +1369,14 @@ endif::[] ** build(deps-dev): bump Confluent Platform Kafka Broker to 7.2.2 (#421) ** build(deps): Upgrade to AK 3.3.0 (#309) + === Fixes * fixes #419: NoSuchElementException during race condition in PartitionState (#422) * Fixes #412: ClassCastException with retryDelayProvider (#417) * fixes ShardManager retryQueue ordering and set issues due to poor Comparator implementation (#423) + == v0.5.2.2 === Fixes diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/OffsetRunLength.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/OffsetRunLength.java index 71199b727..fef1f2ce1 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/OffsetRunLength.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/OffsetRunLength.java @@ -1,7 +1,7 @@ package io.confluent.parallelconsumer.offsets; /*- - * Copyright (C) 2020-2022 Confluent, Inc. + * Copyright (C) 2020-2023 Confluent, Inc. */ import io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.HighestOffsetAndIncompletes; @@ -83,7 +83,14 @@ static HighestOffsetAndIncompletes runLengthDecodeToIncompletes(OffsetEncoding e final var incompletes = new TreeSet(); - long highestSeenOffset = 0L; + /* + Set highestSeenOffset to baseOffset -1 initially - in case the metadata doesn't actually contain any data and + highestSeenOffset would remain at 0 otherwise. + That may cause warning / state truncation. + Issue #546 - https://github.com/confluentinc/parallel-consumer/issues/546 + */ + //TODO: look at offset encoding logic - maybe in those cases we should not create metadata at all? + long highestSeenOffset = (baseOffset > 0) ? (baseOffset - 1) : 0L; Supplier hasRemainingTest = () -> { return switch (encoding.version) { diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java index 8b8dbab8a..65057251e 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java @@ -323,11 +323,12 @@ private void maybeTruncateBelowOrAbove(long bootstrapPolledOffset) { if (pollAboveExpected) { // previously committed offset record has been removed from the topic, so we need to truncate up to it - log.warn("Truncating state - removing records lower than {} from partition {}. Offsets have been removed from the partition " + + log.warn("Truncating state - removing records lower than {} from partition {} of topic {}. Offsets have been removed from the partition " + "by the broker or committed offset has been raised. Bootstrap polled {} but expected {} from loaded commit data. " + "Could be caused by record retention or compaction and offset reset policy LATEST.", bootstrapPolledOffset, this.tp.partition(), + this.tp.topic(), bootstrapPolledOffset, expectedBootstrapRecordOffset); @@ -336,10 +337,12 @@ private void maybeTruncateBelowOrAbove(long bootstrapPolledOffset) { incompletesToPrune.forEach(incompleteOffsets::remove); } else if (pollBelowExpected) { // reset to lower offset detected, so we need to reset our state to match - log.warn("Bootstrap polled offset has been reset to an earlier offset ({}) - truncating state - all records " + + log.warn("Bootstrap polled offset has been reset to an earlier offset ({}) for partition {} of topic {} - truncating state - all records " + "above (including this) will be replayed. Was expecting {} but bootstrap poll was {}. " + "Could be caused by record retention or compaction and offset reset policy EARLIEST.", bootstrapPolledOffset, + this.tp.partition(), + this.tp.topic(), expectedBootstrapRecordOffset, bootstrapPolledOffset ); diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/offsets/OffsetEncodingTests.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/offsets/OffsetEncodingTests.java index 81fb1f0ca..c3e150ce1 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/offsets/OffsetEncodingTests.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/offsets/OffsetEncodingTests.java @@ -1,7 +1,7 @@ package io.confluent.parallelconsumer.offsets; /*- - * Copyright (C) 2020-2022 Confluent, Inc. + * Copyright (C) 2020-2023 Confluent, Inc. */ import com.google.common.truth.Truth; @@ -117,6 +117,51 @@ void largeIncompleteOffsetValues(long nextExpectedOffset) { OffsetSimultaneousEncoder.compressionForced = false; } + /** + * Verifying that encoding / decoding returns correct highest seen offset when nextExpectedOffset is below the + * baseOffsetToCommit + */ + @SneakyThrows + @Test + @ResourceLock(value = OffsetSimultaneousEncoder.COMPRESSION_FORCED_RESOURCE_LOCK, mode = READ_WRITE) + void verifyEncodingWithNextExpectedBelowWatermark() { + long baseOffsetToCommit = 123L; + long highestSucceededOffset = 122L; + var incompletes = new TreeSet<>(UniSets.of(2345L, 8765L)); // no incompletes below low watermark or next expected + + OffsetSimultaneousEncoder encoder = new OffsetSimultaneousEncoder(baseOffsetToCommit, highestSucceededOffset, incompletes); + OffsetSimultaneousEncoder.compressionForced = true; + + // + encoder.invoke(); + Map encodingMap = encoder.getEncodingMap(); + + // + byte[] smallestBytes = encoder.packSmallest(); + EncodedOffsetPair unwrap = EncodedOffsetPair.unwrap(smallestBytes); + OffsetMapCodecManager.HighestOffsetAndIncompletes decodedIncompletes = unwrap.getDecodedIncompletes(baseOffsetToCommit); + assertThat(decodedIncompletes.getIncompleteOffsets()).isEmpty(); + assertThat(decodedIncompletes.getHighestSeenOffset().isPresent()).isTrue(); + assertThat(decodedIncompletes.getHighestSeenOffset().get()).isEqualTo(highestSucceededOffset); + + // + for (OffsetEncoding encodingToUse : OffsetEncoding.values()) { + log.info("Testing {}", encodingToUse); + byte[] bitsetBytes = encodingMap.get(encodingToUse); + if (bitsetBytes != null) { + EncodedOffsetPair bitsetUnwrap = EncodedOffsetPair.unwrap(encoder.packEncoding(new EncodedOffsetPair(encodingToUse, ByteBuffer.wrap(bitsetBytes)))); + OffsetMapCodecManager.HighestOffsetAndIncompletes decodedBitsets = bitsetUnwrap.getDecodedIncompletes(baseOffsetToCommit); + assertThat(decodedBitsets.getIncompleteOffsets()).isEmpty(); + assertThat(decodedBitsets.getHighestSeenOffset().isPresent()).isTrue(); + assertThat(decodedBitsets.getHighestSeenOffset().get()).isEqualTo(highestSucceededOffset); + } else { + log.info("Encoding not performed: " + encodingToUse); + } + } + + OffsetSimultaneousEncoder.compressionForced = false; + } + /** * Test for offset encoding when there is a very large range of offsets, and where the offsets aren't sequential. *