Skip to content

Commit

Permalink
Fixes regression issue with order of state truncation vs commit (#362)
Browse files Browse the repository at this point in the history
It's important that the commit happens before state truncation. This regressed with the merge of the Batching system, it's unknown why. There is now test coverage for the specific behaviour. The effect should be zero duplicates upon healthy rebalancing.
  • Loading branch information
astubbs authored Jul 27, 2022
1 parent 7fd051d commit c097a37
Show file tree
Hide file tree
Showing 8 changed files with 142 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import lombok.experimental.StandardException;

/**
* This exception is only used when there is an exception thrown from code provided by the user.
*/
@StandardException
public class ErrorInUserFunctionException extends ParallelConsumerException {
public ErrorInUserFunctionException(final String message, final Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import lombok.experimental.StandardException;

/**
* Generic Parallel Consumer {@link RuntimeException} parent.
*/
@StandardException
public class ParallelConsumerException extends RuntimeException {
public ParallelConsumerException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/

import io.confluent.csid.utils.TimeUtils;
import io.confluent.parallelconsumer.ErrorInUserFunctionException;
import io.confluent.parallelconsumer.ParallelConsumer;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.PollContextInternal;
Expand Down Expand Up @@ -336,16 +337,23 @@ public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) {
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
log.debug("Partitions revoked {}, state: {}", partitions, state);
numberOfAssignedPartitions = numberOfAssignedPartitions - partitions.size();
try {
wm.onPartitionsRevoked(partitions);

// can't commit for partitions already revoked, but use opportunity as a save point
try {
// commit any offsets from revoked partitions BEFORE truncation
commitOffsetsThatAreReady();

usersConsumerRebalanceListener.ifPresent(x -> x.onPartitionsRevoked(partitions));
// truncate the revoked partitions
wm.onPartitionsRevoked(partitions);
} catch (Exception e) {
throw new InternalRuntimeError("onPartitionsRevoked event error", e);
}

//
try {
usersConsumerRebalanceListener.ifPresent(listener -> listener.onPartitionsRevoked(partitions));
} catch (Exception e) {
throw new ErrorInUserFunctionException("Error from rebalance listener function after #onPartitionsRevoked", e);
}
}

/**
Expand All @@ -363,7 +371,8 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
}

/**
* Delegate to {@link WorkManager}
* Cannot commit any offsets for partitions that have been `lost` (as opposed to revoked). Just delegate to
* {@link WorkManager} for truncation.
*
* @see WorkManager#onPartitionsAssigned
*/
Expand Down Expand Up @@ -566,8 +575,8 @@ private void transitionToDraining() {

/**
* Control thread can be blocked waiting for work, but is interruptible. Interrupting it can be useful to inform
* that work is available when there was none, to make tests run faster, or to move on to shutting down the {@link
* BrokerPollSystem} so that less messages are downloaded and queued.
* that work is available when there was none, to make tests run faster, or to move on to shutting down the
* {@link BrokerPollSystem} so that less messages are downloaded and queued.
*/
private void interruptControlThread() {
if (blockableControlThread != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,14 @@
package io.confluent.parallelconsumer.internal;

/*-
* Copyright (C) 2020-2021 Confluent, Inc.
* Copyright (C) 2020-2022 Confluent, Inc.
*/
public class InternalRuntimeError extends RuntimeException {

public InternalRuntimeError(final String message) {
super(message);
}

public InternalRuntimeError(final String message, final Throwable cause) {
super(message, cause);
}

public InternalRuntimeError(final Throwable cause) {
super(cause);
}
import lombok.experimental.StandardException;

public InternalRuntimeError(final String message, final Throwable cause, final boolean enableSuppression, final boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
/**
* Generic internal runtime error
*/
@StandardException
public class InternalRuntimeError extends RuntimeException {
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import lombok.experimental.StandardException;

/**
* Generic Parallel Consumer parent exception.
*/
@StandardException
public class ParallelConsumerInternalException extends Exception {
public ParallelConsumerInternalException(String msg) {
super(msg);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package io.confluent.parallelconsumer.integrationTests;

/*-
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.integrationTests.utils.KafkaClientUtils;
import lombok.SneakyThrows;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import pl.tlinkowski.unij.api.UniLists;
import pl.tlinkowski.unij.api.UniSets;

import java.time.Duration;
import java.util.concurrent.atomic.AtomicLong;

import static io.confluent.parallelconsumer.ManagedTruth.assertThat;
import static io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder.PARTITION;
import static io.confluent.parallelconsumer.integrationTests.utils.KafkaClientUtils.GroupOption.RESUE_GROUP;
import static org.testcontainers.shaded.org.awaitility.Awaitility.await;
import static org.testcontainers.shaded.org.hamcrest.Matchers.equalTo;
import static org.testcontainers.shaded.org.hamcrest.Matchers.is;

/**
* Tests around what should happen when rebalancing occurs
*
* @author Antony Stubbs
*/
class RebalanceTest extends BrokerIntegrationTest<String, String> {

Consumer<String, String> consumer;

ParallelEoSStreamProcessor<String, String> pc;

public static final Duration INFINITE = Duration.ofDays(1);

{
super.numPartitions = 2;
}

// todo refactor move up
@BeforeEach
void setup() {
setupTopic();
consumer = getKcu().createNewConsumer(KafkaClientUtils.GroupOption.NEW_GROUP);

pc = new ParallelEoSStreamProcessor<>(ParallelConsumerOptions.<String, String>builder()
.consumer(consumer)
.ordering(PARTITION) // just so we dont need to use keys
.build());

pc.subscribe(UniSets.of(topic));
}

/**
* Checks that when a rebalance happens, a final commit is done first for revoked partitions (that will be assigned
* to new consumers), so that the new consumer doesn't reprocess records that are already complete.
*/
@SneakyThrows
@Test
void commitUponRevoke() {
var recordsCount = 20L;
var count = new AtomicLong();

//
kcu.produceMessages(topic, recordsCount);

// effectively disable commit
pc.setTimeBetweenCommits(INFINITE);

// consume all the messages
pc.poll(recordContexts -> count.getAndIncrement());
await().untilAtomic(count, is(equalTo(recordsCount)));

// cause rebalance
var newConsumer = kcu.createNewConsumer(RESUE_GROUP);
newConsumer.subscribe(UniLists.of(topic));
ConsumerRecords<Object, Object> poll = newConsumer.poll(Duration.ofSeconds(5));

// make sure only there are no duplicates
assertThat(poll).hasCountEqualTo(0);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class PCVersion {

@Getter
private AdminClient admin;
private final String groupId = GROUP_ID_PREFIX + nextInt();
private String groupId = GROUP_ID_PREFIX + nextInt();

/**
* todo docs
Expand Down Expand Up @@ -134,23 +134,39 @@ public void close() {
admin.close();
}

public enum GroupOption {
RESUE_GROUP,
NEW_GROUP
}


public <K, V> KafkaConsumer<K, V> createNewConsumer(GroupOption reuseGroup) {
return createNewConsumer(reuseGroup.equals(GroupOption.NEW_GROUP));
}

public <K, V> KafkaConsumer<K, V> createNewConsumer() {
return createNewConsumer(false);
}

@Deprecated
public <K, V> KafkaConsumer<K, V> createNewConsumer(boolean newConsumerGroup) {
return createNewConsumer(newConsumerGroup, new Properties());
}

@Deprecated
public <K, V> KafkaConsumer<K, V> createNewConsumer(Properties options) {
return createNewConsumer(false, options);
}

@Deprecated
public <K, V> KafkaConsumer<K, V> createNewConsumer(boolean newConsumerGroup, Properties options) {
Properties properties = setupConsumerProps();

if (newConsumerGroup) {
properties.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID_PREFIX + nextInt()); // new group
// overwrite the group id with a new one
String newGroupId = GROUP_ID_PREFIX + nextInt();
this.groupId = newGroupId; // save it for reuse later
properties.put(ConsumerConfig.GROUP_ID_CONFIG, newGroupId); // new group
}

// override with custom
Expand Down Expand Up @@ -214,14 +230,15 @@ public List<NewTopic> createTopics(int numTopics) {
return newTopics;
}

public List<String> produceMessages(String inputName, int numberToSend) throws InterruptedException, ExecutionException {
public List<String> produceMessages(String inputName, long numberToSend) throws InterruptedException, ExecutionException {
log.info("Producing {} messages to {}", numberToSend, inputName);
final List<String> expectedKeys = new ArrayList<>();
List<Future<RecordMetadata>> sends = new ArrayList<>();
try (Producer<String, String> kafkaProducer = createNewProducer(false)) {
for (int i = 0; i < numberToSend; i++) {
String key = "key-" + i;
Future<RecordMetadata> send = kafkaProducer.send(new ProducerRecord<>(inputName, key, "value-" + i), (meta, exception) -> {
ProducerRecord<String, String> record = new ProducerRecord<>(inputName, key, "value-" + i);
Future<RecordMetadata> send = kafkaProducer.send(record, (meta, exception) -> {
if (exception != null) {
log.error("Error sending, ", exception);
}
Expand All @@ -238,7 +255,7 @@ public List<String> produceMessages(String inputName, int numberToSend) throws I
boolean b = recordMetadata.hasOffset();
assertThat(b).isTrue();
}
assertThat(sends).hasSize(numberToSend);
assertThat(sends).hasSize(Math.toIntExact(numberToSend));
return expectedKeys;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public static CommitHistorySubject assertThat(final CommitHistory actual) {
return assertAbout(commitHistories()).that(actual);
}

public void atLeastOffset(int needleCommit) {
public void atLeastOffset(long needleCommit) {
Optional<Long> highestCommitOpt = this.actual.highestCommit();
check("highestCommit()").about(OptionalSubject.optionals())
.that(highestCommitOpt)
Expand Down

0 comments on commit c097a37

Please sign in to comment.