From 4b17c0d23655873aac27806e638fe3f3bde3cc4c Mon Sep 17 00:00:00 2001 From: Ehud Lev Date: Sat, 4 Mar 2023 23:23:52 +0200 Subject: [PATCH] Adding parameters validations --- .../ParallelConsumerOptions.java | 17 +++++++ .../state/ParallelConsumerOptionsTest.java | 46 +++++++++++++++++++ 2 files changed, 63 insertions(+) create mode 100644 parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/ParallelConsumerOptionsTest.java diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java index 7c1ac01cb..9b088191a 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java @@ -419,6 +419,23 @@ public void validate() { Objects.requireNonNull(consumer, "A consumer must be supplied"); transactionsValidation(); + validateMinBatchParameters(); + } + + private void validateMinBatchParameters() { + if (isEnforceMinBatch()){ + if (minBatchSize > batchSize) + throw new IllegalArgumentException( + msg("minBatchSize can not by bigger than batchSize: {} > {}", + minBatchSize, + batchSize)); + } + if (minBatchTimeoutInMillis < 0){ + throw new IllegalArgumentException( + msg("minBatchTimeoutInMillis should be non negative: {}", + minBatchTimeoutInMillis + )); + } } private void transactionsValidation() { diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/ParallelConsumerOptionsTest.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/ParallelConsumerOptionsTest.java new file mode 100644 index 000000000..b4e04c5c6 --- /dev/null +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/ParallelConsumerOptionsTest.java @@ -0,0 +1,46 @@ +package io.confluent.parallelconsumer.state; + +/*- + * Copyright (C) 2020-2023 Confluent, Inc. + */ + +import io.confluent.parallelconsumer.ParallelConsumerOptions; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import static org.junit.jupiter.api.Assertions.assertThrows; + + +@Slf4j +public class ParallelConsumerOptionsTest { + + private final Consumer mockConsumer = Mockito.mock(Consumer.class); + + + @Test + void validateMinBatchParameters(){ + assertThrows( + IllegalArgumentException.class, + () -> ParallelConsumerOptions.builder() + .minBatchSize(10) + .minBatchTimeoutInMillis(100) + .consumer(mockConsumer) + .batchSize(5) + .build() + .validate() + ); + + assertThrows( + IllegalArgumentException.class, + () -> ParallelConsumerOptions.builder() + .minBatchSize(3) + .minBatchTimeoutInMillis(-1) + .consumer(mockConsumer) + .batchSize(5) + .build() + .validate() + ); + } +}