Skip to content

Commit

Permalink
Fix synchronisation logic between commits (#665)
Browse files Browse the repository at this point in the history
* Fix synchronisation logic between transactional producer commit and partition revocation commit
Fixes #637
  • Loading branch information
rkolesnev authored Nov 20, 2023
1 parent 0c9b7da commit b279800
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 25 deletions.
5 changes: 2 additions & 3 deletions CHANGELOG.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@ toc::[]
endif::[]
== 0.5.2.8

* fix: Fix target loading computation for inflight records

=== Fixes

* fix: Fix equality and hash code for ShardKey with array key (#638), resolves (#579)
* fix: Fix target loading computation for inflight records (#662)
* fix: Fix synchronisation logic for transactional producer commit affecting non-transactional usage (#665), resolves (#637)

== 0.5.2.7

Expand Down
3 changes: 2 additions & 1 deletion README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1517,8 +1517,9 @@ endif::[]
== 0.5.2.8

=== Fixes

* fix: Fix equality and hash code for ShardKey with array key (#638), resolves (#579)
* fix: Fix target loading computation for inflight records (#662)
* fix: Fix synchronisation logic for transactional producer commit affecting non-transactional usage (#665), resolves (#637)

== 0.5.2.7

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,21 +408,13 @@ public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) {
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
log.debug("Partitions revoked {}, state: {}", partitions, state);
isRebalanceInProgress.set(true);
while (this.producerManager.map(ProducerManager::isTransactionCommittingInProgress).orElse(false))
while (isTransactionCommittingInProgress())
Thread.sleep(100); //wait for the transaction to finish committing

numberOfAssignedPartitions = numberOfAssignedPartitions - partitions.size();

try {
// commit any offsets from revoked partitions BEFORE truncation
this.producerManager.ifPresent(pm -> {
try {
pm.preAcquireOffsetsToCommit();
} catch (Exception exc) {
throw new InternalRuntimeException(exc);
}
});

commitOffsetsThatAreReady();

// truncate the revoked partitions
Expand Down Expand Up @@ -616,7 +608,7 @@ private void doClose(Duration timeout) throws TimeoutException, ExecutionExcepti
processWorkCompleteMailBox(Duration.ZERO);

//
if( Thread.currentThread().isInterrupted()) {
if (Thread.currentThread().isInterrupted()) {
log.warn("control thread interrupted - may lead to issues with transactional commit lock acquisition");
}
commitOffsetsThatAreReady();
Expand Down Expand Up @@ -1430,6 +1422,11 @@ private void clearCommitCommand() {
}
}

private boolean isTransactionCommittingInProgress() {
return options.isUsingTransactionCommitMode() &&
producerManager.map(ProducerManager::isTransactionCommittingInProgress).orElse(false);
}

@Override
public void pauseIfRunning() {
if (this.state == State.RUNNING) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@

/*-
* Copyright (C) 2020-2022 Confluent, Inc.
* Copyright (C) 2020-2023 Confluent, Inc.
*/
package io.confluent.parallelconsumer.integrationTests;
/*-
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import io.confluent.csid.testcontainers.FilteredTestContainerSlf4jLogConsumer;
import io.confluent.parallelconsumer.integrationTests.utils.KafkaClientUtils;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package io.confluent.parallelconsumer.integrationTests;

/*-
* Copyright (C) 2020-2022 Confluent, Inc.
* Copyright (C) 2020-2023 Confluent, Inc.
*/
package io.confluent.parallelconsumer.integrationTests;

import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessor;
Expand All @@ -11,13 +11,19 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import pl.tlinkowski.unij.api.UniLists;
import pl.tlinkowski.unij.api.UniSets;

import java.time.Duration;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Stream;

import static io.confluent.parallelconsumer.ManagedTruth.assertThat;
import static io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder.PARTITION;
Expand Down Expand Up @@ -49,13 +55,27 @@ class RebalanceTest extends BrokerIntegrationTest<String, String> {
void setup() {
setupTopic();
consumer = getKcu().createNewConsumer(KafkaClientUtils.GroupOption.NEW_GROUP);
}

pc = new ParallelEoSStreamProcessor<>(ParallelConsumerOptions.<String, String>builder()
.consumer(consumer)
.ordering(PARTITION) // just so we dont need to use keys
.build());
@AfterEach
void cleanup() {
pc.close();
}

pc.subscribe(UniSets.of(topic));
private ParallelEoSStreamProcessor<String, String> setupPC() {
return setupPC(null);
}

private ParallelEoSStreamProcessor<String, String> setupPC(Function<ParallelConsumerOptions.ParallelConsumerOptionsBuilder<String, String>, ParallelConsumerOptions.ParallelConsumerOptionsBuilder<String, String>> optionsCustomizer) {
ParallelConsumerOptions.ParallelConsumerOptionsBuilder<String, String> optionsBuilder =
ParallelConsumerOptions.<String, String>builder()
.consumer(consumer)
.ordering(PARTITION);
if (optionsCustomizer != null) {
optionsBuilder = optionsCustomizer.apply(optionsBuilder);
}

return new ParallelEoSStreamProcessor<>(optionsBuilder.build());
}

/**
Expand All @@ -67,6 +87,8 @@ void setup() {
void commitUponRevoke() {
var numberOfRecordsToProduce = 20L;
var count = new AtomicLong();
pc = setupPC();
pc.subscribe(UniSets.of(topic));

//
getKcu().produceMessages(topic, numberOfRecordsToProduce);
Expand Down Expand Up @@ -96,4 +118,72 @@ void commitUponRevoke() {
log.debug("Test finished");
}

static Stream<Arguments> rebalanceTestCommitModes() {
return Stream.of(
Arguments.of("Consumer Async", ParallelConsumerOptions.CommitMode.PERIODIC_CONSUMER_ASYNCHRONOUS, null),
Arguments.of("Consumer Sync", ParallelConsumerOptions.CommitMode.PERIODIC_CONSUMER_SYNC, null),
Arguments.of("Consumer Async + Producer Non-transactional", ParallelConsumerOptions.CommitMode.PERIODIC_CONSUMER_ASYNCHRONOUS, KafkaClientUtils.ProducerMode.NOT_TRANSACTIONAL),
Arguments.of("Consumer Sync + Producer Non-transactional", ParallelConsumerOptions.CommitMode.PERIODIC_CONSUMER_ASYNCHRONOUS, KafkaClientUtils.ProducerMode.NOT_TRANSACTIONAL),
Arguments.of("Transactional Producer", ParallelConsumerOptions.CommitMode.PERIODIC_TRANSACTIONAL_PRODUCER, KafkaClientUtils.ProducerMode.TRANSACTIONAL)
);
}

@SneakyThrows
@MethodSource("rebalanceTestCommitModes")
@ParameterizedTest(name = "[{index}] - {0}")
/**
* Tests that re-balance completes on partition revocation in supported commit modes and consumer / producer combinations.
* Issue was raised when producer was set, but not used - https://github.com/confluentinc/parallel-consumer/issues/637
* bug in transactional producer commit synchronisation with re-balance triggered commit was not caught by existing tests.
*/
void rebalanceCompletesForCommitModeVariations(String testName, ParallelConsumerOptions.CommitMode commitMode, KafkaClientUtils.ProducerMode producerMode) {
var numberOfRecordsToProduce = 20L;
var count = new AtomicLong();
pc = setupPC(builder -> {
builder = builder.commitMode(commitMode);
if (producerMode != null) {
builder = builder.producer(getKcu().createNewProducer(producerMode));
}
return builder;
});
pc.subscribe(UniSets.of(topic));
//
getKcu().produceMessages(topic, numberOfRecordsToProduce);

// consume all the messages
pc.poll(recordContexts -> {
count.getAndIncrement();
log.debug("PC1 - Processed record, count now {} - offset: {}", count, recordContexts.offset());
});
await().untilAtomic(count, is(equalTo(numberOfRecordsToProduce)));
log.debug("All records consumed");
Consumer<String, String> newConsumer = getKcu().createNewConsumer(REUSE_GROUP);

ParallelEoSStreamProcessor<String, String> pc2 = setupPC(builder -> builder.consumer(newConsumer));
pc2.subscribe(UniSets.of(topic));

// cause rebalance
pc2.poll(recordContexts -> {
count.getAndIncrement();
log.debug("PC2 - Processed record, count now {} - offset: {}", count, recordContexts.offset());
});

await().untilAsserted(() -> assertThat(pc2).getNumberOfAssignedPartitions().isEqualTo(1));
getKcu().produceMessages(topic, numberOfRecordsToProduce);

await().untilAtomic(count, is(equalTo(numberOfRecordsToProduce * 2)));
pc.closeDrainFirst();
await().untilAsserted(() -> assertThat(pc2).getNumberOfAssignedPartitions().isEqualTo(2));

getKcu().produceMessages(topic, numberOfRecordsToProduce);

await().untilAtomic(count, is(equalTo(numberOfRecordsToProduce * 3)));
pc2.closeDrainFirst();
await().untilAsserted(() -> {
assertThat(pc2).isClosedOrFailed();
org.assertj.core.api.Assertions.assertThat(count).hasValue(numberOfRecordsToProduce * 3);
});
log.debug("Test finished");
}

}

0 comments on commit b279800

Please sign in to comment.