From b279800bd51773e1169a6246d95a41d8944bf6a8 Mon Sep 17 00:00:00 2001 From: Roman Kolesnev <88949424+rkolesnev@users.noreply.github.com> Date: Mon, 20 Nov 2023 09:59:56 +0000 Subject: [PATCH] Fix synchronisation logic between commits (#665) * Fix synchronisation logic between transactional producer commit and partition revocation commit Fixes https://github.com/confluentinc/parallel-consumer/issues/637 --- CHANGELOG.adoc | 5 +- README.adoc | 3 +- .../AbstractParallelEoSStreamProcessor.java | 17 ++- .../BrokerIntegrationTest.java | 5 +- .../integrationTests/RebalanceTest.java | 104 ++++++++++++++++-- 5 files changed, 109 insertions(+), 25 deletions(-) diff --git a/CHANGELOG.adoc b/CHANGELOG.adoc index 790e775da..3acb88a1e 100644 --- a/CHANGELOG.adoc +++ b/CHANGELOG.adoc @@ -15,11 +15,10 @@ toc::[] endif::[] == 0.5.2.8 -* fix: Fix target loading computation for inflight records - === Fixes - * fix: Fix equality and hash code for ShardKey with array key (#638), resolves (#579) +* fix: Fix target loading computation for inflight records (#662) +* fix: Fix synchronisation logic for transactional producer commit affecting non-transactional usage (#665), resolves (#637) == 0.5.2.7 diff --git a/README.adoc b/README.adoc index 81af418e0..eb4039719 100644 --- a/README.adoc +++ b/README.adoc @@ -1517,8 +1517,9 @@ endif::[] == 0.5.2.8 === Fixes - * fix: Fix equality and hash code for ShardKey with array key (#638), resolves (#579) +* fix: Fix target loading computation for inflight records (#662) +* fix: Fix synchronisation logic for transactional producer commit affecting non-transactional usage (#665), resolves (#637) == 0.5.2.7 diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java index deda1074c..b0029988d 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java @@ -408,21 +408,13 @@ public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) { public void onPartitionsRevoked(Collection partitions) { log.debug("Partitions revoked {}, state: {}", partitions, state); isRebalanceInProgress.set(true); - while (this.producerManager.map(ProducerManager::isTransactionCommittingInProgress).orElse(false)) + while (isTransactionCommittingInProgress()) Thread.sleep(100); //wait for the transaction to finish committing numberOfAssignedPartitions = numberOfAssignedPartitions - partitions.size(); try { // commit any offsets from revoked partitions BEFORE truncation - this.producerManager.ifPresent(pm -> { - try { - pm.preAcquireOffsetsToCommit(); - } catch (Exception exc) { - throw new InternalRuntimeException(exc); - } - }); - commitOffsetsThatAreReady(); // truncate the revoked partitions @@ -616,7 +608,7 @@ private void doClose(Duration timeout) throws TimeoutException, ExecutionExcepti processWorkCompleteMailBox(Duration.ZERO); // - if( Thread.currentThread().isInterrupted()) { + if (Thread.currentThread().isInterrupted()) { log.warn("control thread interrupted - may lead to issues with transactional commit lock acquisition"); } commitOffsetsThatAreReady(); @@ -1430,6 +1422,11 @@ private void clearCommitCommand() { } } + private boolean isTransactionCommittingInProgress() { + return options.isUsingTransactionCommitMode() && + producerManager.map(ProducerManager::isTransactionCommittingInProgress).orElse(false); + } + @Override public void pauseIfRunning() { if (this.state == State.RUNNING) { diff --git a/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/BrokerIntegrationTest.java b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/BrokerIntegrationTest.java index aa0fc8295..da8678ea2 100644 --- a/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/BrokerIntegrationTest.java +++ b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/BrokerIntegrationTest.java @@ -1,11 +1,8 @@ /*- - * Copyright (C) 2020-2022 Confluent, Inc. + * Copyright (C) 2020-2023 Confluent, Inc. */ package io.confluent.parallelconsumer.integrationTests; -/*- - * Copyright (C) 2020-2022 Confluent, Inc. - */ import io.confluent.csid.testcontainers.FilteredTestContainerSlf4jLogConsumer; import io.confluent.parallelconsumer.integrationTests.utils.KafkaClientUtils; diff --git a/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/RebalanceTest.java b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/RebalanceTest.java index ec75af5c8..64e227e6f 100644 --- a/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/RebalanceTest.java +++ b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/RebalanceTest.java @@ -1,8 +1,8 @@ -package io.confluent.parallelconsumer.integrationTests; /*- - * Copyright (C) 2020-2022 Confluent, Inc. + * Copyright (C) 2020-2023 Confluent, Inc. */ +package io.confluent.parallelconsumer.integrationTests; import io.confluent.parallelconsumer.ParallelConsumerOptions; import io.confluent.parallelconsumer.ParallelEoSStreamProcessor; @@ -11,13 +11,19 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import pl.tlinkowski.unij.api.UniLists; import pl.tlinkowski.unij.api.UniSets; import java.time.Duration; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; +import java.util.stream.Stream; import static io.confluent.parallelconsumer.ManagedTruth.assertThat; import static io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder.PARTITION; @@ -49,13 +55,27 @@ class RebalanceTest extends BrokerIntegrationTest { void setup() { setupTopic(); consumer = getKcu().createNewConsumer(KafkaClientUtils.GroupOption.NEW_GROUP); + } - pc = new ParallelEoSStreamProcessor<>(ParallelConsumerOptions.builder() - .consumer(consumer) - .ordering(PARTITION) // just so we dont need to use keys - .build()); + @AfterEach + void cleanup() { + pc.close(); + } - pc.subscribe(UniSets.of(topic)); + private ParallelEoSStreamProcessor setupPC() { + return setupPC(null); + } + + private ParallelEoSStreamProcessor setupPC(Function, ParallelConsumerOptions.ParallelConsumerOptionsBuilder> optionsCustomizer) { + ParallelConsumerOptions.ParallelConsumerOptionsBuilder optionsBuilder = + ParallelConsumerOptions.builder() + .consumer(consumer) + .ordering(PARTITION); + if (optionsCustomizer != null) { + optionsBuilder = optionsCustomizer.apply(optionsBuilder); + } + + return new ParallelEoSStreamProcessor<>(optionsBuilder.build()); } /** @@ -67,6 +87,8 @@ void setup() { void commitUponRevoke() { var numberOfRecordsToProduce = 20L; var count = new AtomicLong(); + pc = setupPC(); + pc.subscribe(UniSets.of(topic)); // getKcu().produceMessages(topic, numberOfRecordsToProduce); @@ -96,4 +118,72 @@ void commitUponRevoke() { log.debug("Test finished"); } + static Stream rebalanceTestCommitModes() { + return Stream.of( + Arguments.of("Consumer Async", ParallelConsumerOptions.CommitMode.PERIODIC_CONSUMER_ASYNCHRONOUS, null), + Arguments.of("Consumer Sync", ParallelConsumerOptions.CommitMode.PERIODIC_CONSUMER_SYNC, null), + Arguments.of("Consumer Async + Producer Non-transactional", ParallelConsumerOptions.CommitMode.PERIODIC_CONSUMER_ASYNCHRONOUS, KafkaClientUtils.ProducerMode.NOT_TRANSACTIONAL), + Arguments.of("Consumer Sync + Producer Non-transactional", ParallelConsumerOptions.CommitMode.PERIODIC_CONSUMER_ASYNCHRONOUS, KafkaClientUtils.ProducerMode.NOT_TRANSACTIONAL), + Arguments.of("Transactional Producer", ParallelConsumerOptions.CommitMode.PERIODIC_TRANSACTIONAL_PRODUCER, KafkaClientUtils.ProducerMode.TRANSACTIONAL) + ); + } + + @SneakyThrows + @MethodSource("rebalanceTestCommitModes") + @ParameterizedTest(name = "[{index}] - {0}") + /** + * Tests that re-balance completes on partition revocation in supported commit modes and consumer / producer combinations. + * Issue was raised when producer was set, but not used - https://github.com/confluentinc/parallel-consumer/issues/637 + * bug in transactional producer commit synchronisation with re-balance triggered commit was not caught by existing tests. + */ + void rebalanceCompletesForCommitModeVariations(String testName, ParallelConsumerOptions.CommitMode commitMode, KafkaClientUtils.ProducerMode producerMode) { + var numberOfRecordsToProduce = 20L; + var count = new AtomicLong(); + pc = setupPC(builder -> { + builder = builder.commitMode(commitMode); + if (producerMode != null) { + builder = builder.producer(getKcu().createNewProducer(producerMode)); + } + return builder; + }); + pc.subscribe(UniSets.of(topic)); + // + getKcu().produceMessages(topic, numberOfRecordsToProduce); + + // consume all the messages + pc.poll(recordContexts -> { + count.getAndIncrement(); + log.debug("PC1 - Processed record, count now {} - offset: {}", count, recordContexts.offset()); + }); + await().untilAtomic(count, is(equalTo(numberOfRecordsToProduce))); + log.debug("All records consumed"); + Consumer newConsumer = getKcu().createNewConsumer(REUSE_GROUP); + + ParallelEoSStreamProcessor pc2 = setupPC(builder -> builder.consumer(newConsumer)); + pc2.subscribe(UniSets.of(topic)); + + // cause rebalance + pc2.poll(recordContexts -> { + count.getAndIncrement(); + log.debug("PC2 - Processed record, count now {} - offset: {}", count, recordContexts.offset()); + }); + + await().untilAsserted(() -> assertThat(pc2).getNumberOfAssignedPartitions().isEqualTo(1)); + getKcu().produceMessages(topic, numberOfRecordsToProduce); + + await().untilAtomic(count, is(equalTo(numberOfRecordsToProduce * 2))); + pc.closeDrainFirst(); + await().untilAsserted(() -> assertThat(pc2).getNumberOfAssignedPartitions().isEqualTo(2)); + + getKcu().produceMessages(topic, numberOfRecordsToProduce); + + await().untilAtomic(count, is(equalTo(numberOfRecordsToProduce * 3))); + pc2.closeDrainFirst(); + await().untilAsserted(() -> { + assertThat(pc2).isClosedOrFailed(); + org.assertj.core.api.Assertions.assertThat(count).hasValue(numberOfRecordsToProduce * 3); + }); + log.debug("Test finished"); + } + }