Skip to content

Commit

Permalink
Fix for race condition in partition state clean/dirty tracking (#666)
Browse files Browse the repository at this point in the history
* Fix for race condition in partition state clean/dirty tracking
  • Loading branch information
rkolesnev authored Nov 20, 2023
1 parent b279800 commit 7231f62
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 9 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ endif::[]
== 0.5.2.8

=== Fixes

* fix: Fix equality and hash code for ShardKey with array key (#638), resolves (#579)
* fix: Fix target loading computation for inflight records (#662)
* fix: Fix synchronisation logic for transactional producer commit affecting non-transactional usage (#665), resolves (#637)
* fix: Fix for race condition in partition state clean/dirty tracking (#666), resolves (#664)

== 0.5.2.7

Expand Down
2 changes: 2 additions & 0 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1517,9 +1517,11 @@ endif::[]
== 0.5.2.8

=== Fixes

* fix: Fix equality and hash code for ShardKey with array key (#638), resolves (#579)
* fix: Fix target loading computation for inflight records (#662)
* fix: Fix synchronisation logic for transactional producer commit affecting non-transactional usage (#665), resolves (#637)
* fix: Fix for race condition in partition state clean/dirty tracking (#666), resolves (#664)

== 0.5.2.7

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,17 @@ public class PartitionState<K, V> {
private DistributionSummary ratioMetadataSpaceUsedDistributionSummary;
private final PCMetrics pcMetrics;

/**
* Additional flag to prevent overwriting dirty state that was updated during commit execution window - so that any
* subsequent offsets completed while commit is being performed could mark state as dirty and retain the dirty state
* on commit completion. In tight race condition - it may be set just before offset is completed and included in
* commit data collection - so it is a little bit pessimistic - that may cause an additional unnecessary commit on
* next commit cycle - but it is highly unlikely as throughput has to be high for this to occur - but with high
* throughput there will be other offsets ready to commit anyway.
*/
private boolean stateChangedSinceCommitStart = false;


public PartitionState(long newEpoch,
PCModule<K, V> pcModule,
TopicPartition topicPartition,
Expand Down Expand Up @@ -209,10 +220,13 @@ public void onOffsetCommitSuccess(OffsetAndMetadata committed) { //NOSONAR
}

private void setClean() {
setDirty(false);
if (!stateChangedSinceCommitStart) {
setDirty(false);
}
}

private void setDirty() {
stateChangedSinceCommitStart = true;
setDirty(true);
}

Expand Down Expand Up @@ -382,9 +396,13 @@ public boolean isRemoved() {
}

public Optional<OffsetAndMetadata> getCommitDataIfDirty() {
return isDirty() ?
of(createOffsetAndMetadata()) :
empty();
if (isDirty()) {
// setting the flag so that any subsequent offset completed while commit is being performed could mark state as dirty
// and retain the dirty state on commit completion.
stateChangedSinceCommitStart = false;
return of(createOffsetAndMetadata());
}
return empty();
}

// visible for testing
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.confluent.parallelconsumer.state;

/*-
* Copyright (C) 2020-2022 Confluent, Inc.
* Copyright (C) 2020-2023 Confluent, Inc.
*/

import com.google.common.truth.Truth;
Expand All @@ -16,10 +16,7 @@
import pl.tlinkowski.unij.api.UniLists;
import pl.tlinkowski.unij.api.UniSets;

import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.*;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -177,5 +174,30 @@ void bootstrapPollOffsetHigherDueToRetentionOrCompaction() {
assertThat(state).getAllIncompleteOffsets().containsExactlyElementsIn(expectedTruncatedIncompletes);
}

@Test
void workCompletedDuringAsyncCommitShouldKeepStateAsDirty(){
final long completedOffset = 1L;
final long incompleteOffset = 2L;

final HighestOffsetAndIncompletes offsetData = new HighestOffsetAndIncompletes(Optional.of(incompleteOffset),
new TreeSet<>(Arrays.asList(completedOffset, incompleteOffset)));
PartitionState<String, String> state = new PartitionState<>(0, mu.getModule(), tp, offsetData);
state.onSuccess(completedOffset);

// fetch committable/completed offset
OffsetAndMetadata offsetAndMetadata = state.getCommitDataIfDirty().get();

assertThat(offsetAndMetadata).getOffset().isEqualTo(completedOffset+1);

// mark incomplete work as complete
state.onSuccess(incompleteOffset);
assertThat(state).isDirty();

//mark fetched offset as committed
state.onOffsetCommitSuccess(offsetAndMetadata);

// partition should stay dirty, since the newly completed work could be committed now.
assertThat(state).isDirty();
}

}

0 comments on commit 7231f62

Please sign in to comment.