From 7231f62ba352a26ba67183ebce4a9ca1a60c796f Mon Sep 17 00:00:00 2001 From: Roman Kolesnev <88949424+rkolesnev@users.noreply.github.com> Date: Mon, 20 Nov 2023 15:32:35 +0000 Subject: [PATCH] Fix for race condition in partition state clean/dirty tracking (#666) * Fix for race condition in partition state clean/dirty tracking --- CHANGELOG.adoc | 2 ++ README.adoc | 2 ++ .../state/PartitionState.java | 26 ++++++++++++--- .../PartitionStateCommittedOffsetTest.java | 32 ++++++++++++++++--- 4 files changed, 53 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.adoc b/CHANGELOG.adoc index 3acb88a1e..9431c42c7 100644 --- a/CHANGELOG.adoc +++ b/CHANGELOG.adoc @@ -16,9 +16,11 @@ endif::[] == 0.5.2.8 === Fixes + * fix: Fix equality and hash code for ShardKey with array key (#638), resolves (#579) * fix: Fix target loading computation for inflight records (#662) * fix: Fix synchronisation logic for transactional producer commit affecting non-transactional usage (#665), resolves (#637) +* fix: Fix for race condition in partition state clean/dirty tracking (#666), resolves (#664) == 0.5.2.7 diff --git a/README.adoc b/README.adoc index eb4039719..9a49628ac 100644 --- a/README.adoc +++ b/README.adoc @@ -1517,9 +1517,11 @@ endif::[] == 0.5.2.8 === Fixes + * fix: Fix equality and hash code for ShardKey with array key (#638), resolves (#579) * fix: Fix target loading computation for inflight records (#662) * fix: Fix synchronisation logic for transactional producer commit affecting non-transactional usage (#665), resolves (#637) +* fix: Fix for race condition in partition state clean/dirty tracking (#666), resolves (#664) == 0.5.2.7 diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java index 25fa7f0d0..c42b77e7d 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java @@ -172,6 +172,17 @@ public class PartitionState { private DistributionSummary ratioMetadataSpaceUsedDistributionSummary; private final PCMetrics pcMetrics; + /** + * Additional flag to prevent overwriting dirty state that was updated during commit execution window - so that any + * subsequent offsets completed while commit is being performed could mark state as dirty and retain the dirty state + * on commit completion. In tight race condition - it may be set just before offset is completed and included in + * commit data collection - so it is a little bit pessimistic - that may cause an additional unnecessary commit on + * next commit cycle - but it is highly unlikely as throughput has to be high for this to occur - but with high + * throughput there will be other offsets ready to commit anyway. + */ + private boolean stateChangedSinceCommitStart = false; + + public PartitionState(long newEpoch, PCModule pcModule, TopicPartition topicPartition, @@ -209,10 +220,13 @@ public void onOffsetCommitSuccess(OffsetAndMetadata committed) { //NOSONAR } private void setClean() { - setDirty(false); + if (!stateChangedSinceCommitStart) { + setDirty(false); + } } private void setDirty() { + stateChangedSinceCommitStart = true; setDirty(true); } @@ -382,9 +396,13 @@ public boolean isRemoved() { } public Optional getCommitDataIfDirty() { - return isDirty() ? - of(createOffsetAndMetadata()) : - empty(); + if (isDirty()) { + // setting the flag so that any subsequent offset completed while commit is being performed could mark state as dirty + // and retain the dirty state on commit completion. + stateChangedSinceCommitStart = false; + return of(createOffsetAndMetadata()); + } + return empty(); } // visible for testing diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/PartitionStateCommittedOffsetTest.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/PartitionStateCommittedOffsetTest.java index af0aeccdf..9c95aece2 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/PartitionStateCommittedOffsetTest.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/PartitionStateCommittedOffsetTest.java @@ -1,7 +1,7 @@ package io.confluent.parallelconsumer.state; /*- - * Copyright (C) 2020-2022 Confluent, Inc. + * Copyright (C) 2020-2023 Confluent, Inc. */ import com.google.common.truth.Truth; @@ -16,10 +16,7 @@ import pl.tlinkowski.unij.api.UniLists; import pl.tlinkowski.unij.api.UniSets; -import java.util.List; -import java.util.Optional; -import java.util.Set; -import java.util.TreeSet; +import java.util.*; import java.util.concurrent.ConcurrentSkipListMap; import java.util.stream.Collectors; @@ -177,5 +174,30 @@ void bootstrapPollOffsetHigherDueToRetentionOrCompaction() { assertThat(state).getAllIncompleteOffsets().containsExactlyElementsIn(expectedTruncatedIncompletes); } + @Test + void workCompletedDuringAsyncCommitShouldKeepStateAsDirty(){ + final long completedOffset = 1L; + final long incompleteOffset = 2L; + + final HighestOffsetAndIncompletes offsetData = new HighestOffsetAndIncompletes(Optional.of(incompleteOffset), + new TreeSet<>(Arrays.asList(completedOffset, incompleteOffset))); + PartitionState state = new PartitionState<>(0, mu.getModule(), tp, offsetData); + state.onSuccess(completedOffset); + + // fetch committable/completed offset + OffsetAndMetadata offsetAndMetadata = state.getCommitDataIfDirty().get(); + + assertThat(offsetAndMetadata).getOffset().isEqualTo(completedOffset+1); + + // mark incomplete work as complete + state.onSuccess(incompleteOffset); + assertThat(state).isDirty(); + + //mark fetched offset as committed + state.onOffsetCommitSuccess(offsetAndMetadata); + + // partition should stay dirty, since the newly completed work could be committed now. + assertThat(state).isDirty(); + } } \ No newline at end of file