Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: runlength decoding after no-progress commit #563

Merged
merged 1 commit into from
Mar 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ ifndef::github_name[]
toc::[]
endif::[]

== 0.5.2.6

=== Fixes

* fix: RunLength offset decoding returns 0 base offset after no-progress commit - related to (#546)

== 0.5.2.5

=== Fixes
Expand Down
8 changes: 8 additions & 0 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1312,6 +1312,12 @@ ifndef::github_name[]
toc::[]
endif::[]

== 0.5.2.6

=== Fixes

* fix: RunLength offset decoding returns 0 base offset after no-progress commit - related to (#546)

== 0.5.2.5

=== Fixes
Expand Down Expand Up @@ -1363,12 +1369,14 @@ endif::[]
** build(deps-dev): bump Confluent Platform Kafka Broker to 7.2.2 (#421)
** build(deps): Upgrade to AK 3.3.0 (#309)


=== Fixes

* fixes #419: NoSuchElementException during race condition in PartitionState (#422)
* Fixes #412: ClassCastException with retryDelayProvider (#417)
* fixes ShardManager retryQueue ordering and set issues due to poor Comparator implementation (#423)


== v0.5.2.2

=== Fixes
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.confluent.parallelconsumer.offsets;

/*-
* Copyright (C) 2020-2022 Confluent, Inc.
* Copyright (C) 2020-2023 Confluent, Inc.
*/

import io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.HighestOffsetAndIncompletes;
Expand Down Expand Up @@ -83,7 +83,14 @@ static HighestOffsetAndIncompletes runLengthDecodeToIncompletes(OffsetEncoding e

final var incompletes = new TreeSet<Long>();

long highestSeenOffset = 0L;
/*
Set highestSeenOffset to baseOffset -1 initially - in case the metadata doesn't actually contain any data and
highestSeenOffset would remain at 0 otherwise.
That may cause warning / state truncation.
Issue #546 - https://github.com/confluentinc/parallel-consumer/issues/546
*/
//TODO: look at offset encoding logic - maybe in those cases we should not create metadata at all?
long highestSeenOffset = (baseOffset > 0) ? (baseOffset - 1) : 0L;

Supplier<Boolean> hasRemainingTest = () -> {
return switch (encoding.version) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,11 +323,12 @@ private void maybeTruncateBelowOrAbove(long bootstrapPolledOffset) {

if (pollAboveExpected) {
// previously committed offset record has been removed from the topic, so we need to truncate up to it
log.warn("Truncating state - removing records lower than {} from partition {}. Offsets have been removed from the partition " +
log.warn("Truncating state - removing records lower than {} from partition {} of topic {}. Offsets have been removed from the partition " +
"by the broker or committed offset has been raised. Bootstrap polled {} but expected {} from loaded commit data. " +
"Could be caused by record retention or compaction and offset reset policy LATEST.",
bootstrapPolledOffset,
this.tp.partition(),
this.tp.topic(),
bootstrapPolledOffset,
expectedBootstrapRecordOffset);

Expand All @@ -336,10 +337,12 @@ private void maybeTruncateBelowOrAbove(long bootstrapPolledOffset) {
incompletesToPrune.forEach(incompleteOffsets::remove);
} else if (pollBelowExpected) {
// reset to lower offset detected, so we need to reset our state to match
log.warn("Bootstrap polled offset has been reset to an earlier offset ({}) - truncating state - all records " +
log.warn("Bootstrap polled offset has been reset to an earlier offset ({}) for partition {} of topic {} - truncating state - all records " +
"above (including this) will be replayed. Was expecting {} but bootstrap poll was {}. " +
"Could be caused by record retention or compaction and offset reset policy EARLIEST.",
bootstrapPolledOffset,
this.tp.partition(),
this.tp.topic(),
expectedBootstrapRecordOffset,
bootstrapPolledOffset
);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.confluent.parallelconsumer.offsets;

/*-
* Copyright (C) 2020-2022 Confluent, Inc.
* Copyright (C) 2020-2023 Confluent, Inc.
*/

import com.google.common.truth.Truth;
Expand Down Expand Up @@ -117,6 +117,51 @@ void largeIncompleteOffsetValues(long nextExpectedOffset) {
OffsetSimultaneousEncoder.compressionForced = false;
}

/**
* Verifying that encoding / decoding returns correct highest seen offset when nextExpectedOffset is below the
* baseOffsetToCommit
*/
@SneakyThrows
@Test
@ResourceLock(value = OffsetSimultaneousEncoder.COMPRESSION_FORCED_RESOURCE_LOCK, mode = READ_WRITE)
void verifyEncodingWithNextExpectedBelowWatermark() {
long baseOffsetToCommit = 123L;
long highestSucceededOffset = 122L;
var incompletes = new TreeSet<>(UniSets.of(2345L, 8765L)); // no incompletes below low watermark or next expected

OffsetSimultaneousEncoder encoder = new OffsetSimultaneousEncoder(baseOffsetToCommit, highestSucceededOffset, incompletes);
OffsetSimultaneousEncoder.compressionForced = true;

//
encoder.invoke();
Map<OffsetEncoding, byte[]> encodingMap = encoder.getEncodingMap();

//
byte[] smallestBytes = encoder.packSmallest();
EncodedOffsetPair unwrap = EncodedOffsetPair.unwrap(smallestBytes);
OffsetMapCodecManager.HighestOffsetAndIncompletes decodedIncompletes = unwrap.getDecodedIncompletes(baseOffsetToCommit);
assertThat(decodedIncompletes.getIncompleteOffsets()).isEmpty();
assertThat(decodedIncompletes.getHighestSeenOffset().isPresent()).isTrue();
assertThat(decodedIncompletes.getHighestSeenOffset().get()).isEqualTo(highestSucceededOffset);

//
for (OffsetEncoding encodingToUse : OffsetEncoding.values()) {
log.info("Testing {}", encodingToUse);
byte[] bitsetBytes = encodingMap.get(encodingToUse);
if (bitsetBytes != null) {
EncodedOffsetPair bitsetUnwrap = EncodedOffsetPair.unwrap(encoder.packEncoding(new EncodedOffsetPair(encodingToUse, ByteBuffer.wrap(bitsetBytes))));
OffsetMapCodecManager.HighestOffsetAndIncompletes decodedBitsets = bitsetUnwrap.getDecodedIncompletes(baseOffsetToCommit);
assertThat(decodedBitsets.getIncompleteOffsets()).isEmpty();
assertThat(decodedBitsets.getHighestSeenOffset().isPresent()).isTrue();
assertThat(decodedBitsets.getHighestSeenOffset().get()).isEqualTo(highestSucceededOffset);
} else {
log.info("Encoding not performed: " + encodingToUse);
}
}

OffsetSimultaneousEncoder.compressionForced = false;
}

/**
* Test for offset encoding when there is a very large range of offsets, and where the offsets aren't sequential.
* <p>
Expand Down