From f78813dba5d0aa375214ad37ed2f18e6e693173f Mon Sep 17 00:00:00 2001 From: nscuro Date: Fri, 2 Feb 2024 17:07:29 +0100 Subject: [PATCH] Implement foundational API for parallel-consumer based Kafka processors Decoupled from https://github.com/DependencyTrack/hyades-apiserver/pull/509 This merely adds an API on top of which processors can be implemented. We can migrate processors one-by-one from Kafka Streams to this API. Majority of this work was already done in https://github.com/DependencyTrack/hyades-apiserver/pull/509, but got out of date due to changed priorities. At the very least said PR is good to take inspiration from. Relates to https://github.com/DependencyTrack/hyades/issues/346 Relates to https://github.com/DependencyTrack/hyades/issues/901 Relates to https://github.com/DependencyTrack/hyades/issues/907 Signed-off-by: nscuro --- pom.xml | 8 + .../org/dependencytrack/common/MdcKeys.java | 16 + .../kafka/processor/ProcessorInitializer.java | 30 ++ .../processor/ProcessorsHealthCheck.java | 17 + .../api/AbstractProcessingStrategy.java | 87 ++++ .../api/BatchProcessingStrategy.java | 69 ++++ .../kafka/processor/api/BatchProcessor.java | 26 ++ .../processor/api/ProcessingStrategy.java | 16 + .../event/kafka/processor/api/Processor.java | 24 ++ .../kafka/processor/api/ProcessorManager.java | 374 ++++++++++++++++++ .../processor/api/ProcessorProperties.java | 25 ++ .../api/SingleRecordProcessingStrategy.java | 81 ++++ .../exception/ProcessingException.java | 29 ++ .../RetryableProcessingException.java | 29 ++ src/main/resources/application.properties | 10 + src/main/webapp/WEB-INF/web.xml | 3 + .../processor/AbstractProcessorTest.java | 59 +++ .../processor/api/ProcessorManagerTest.java | 256 ++++++++++++ 18 files changed, 1159 insertions(+) create mode 100644 src/main/java/org/dependencytrack/common/MdcKeys.java create mode 100644 src/main/java/org/dependencytrack/event/kafka/processor/ProcessorInitializer.java create mode 100644 src/main/java/org/dependencytrack/event/kafka/processor/ProcessorsHealthCheck.java create mode 100644 src/main/java/org/dependencytrack/event/kafka/processor/api/AbstractProcessingStrategy.java create mode 100644 src/main/java/org/dependencytrack/event/kafka/processor/api/BatchProcessingStrategy.java create mode 100644 src/main/java/org/dependencytrack/event/kafka/processor/api/BatchProcessor.java create mode 100644 src/main/java/org/dependencytrack/event/kafka/processor/api/ProcessingStrategy.java create mode 100644 src/main/java/org/dependencytrack/event/kafka/processor/api/Processor.java create mode 100644 src/main/java/org/dependencytrack/event/kafka/processor/api/ProcessorManager.java create mode 100644 src/main/java/org/dependencytrack/event/kafka/processor/api/ProcessorProperties.java create mode 100644 src/main/java/org/dependencytrack/event/kafka/processor/api/SingleRecordProcessingStrategy.java create mode 100644 src/main/java/org/dependencytrack/event/kafka/processor/exception/ProcessingException.java create mode 100644 src/main/java/org/dependencytrack/event/kafka/processor/exception/RetryableProcessingException.java create mode 100644 src/test/java/org/dependencytrack/event/kafka/processor/AbstractProcessorTest.java create mode 100644 src/test/java/org/dependencytrack/event/kafka/processor/api/ProcessorManagerTest.java diff --git a/pom.xml b/pom.xml index b1d6df59f..0ec1bf230 100644 --- a/pom.xml +++ b/pom.xml @@ -100,6 +100,7 @@ 3.6.0 0.2.2 1.4.1 + 0.5.2.8 3.2.0 3.25.2 1.18.3 @@ -299,6 +300,13 @@ packageurl-java ${lib.packageurl.version} + + + io.confluent.parallelconsumer + parallel-consumer-core + ${lib.parallel-consumer.version} + + org.apache.kafka kafka-clients diff --git a/src/main/java/org/dependencytrack/common/MdcKeys.java b/src/main/java/org/dependencytrack/common/MdcKeys.java new file mode 100644 index 000000000..7fcf1077c --- /dev/null +++ b/src/main/java/org/dependencytrack/common/MdcKeys.java @@ -0,0 +1,16 @@ +package org.dependencytrack.common; + +/** + * Common fields for use with SLF4J's {@link org.slf4j.MDC}. + */ +public final class MdcKeys { + + public static final String MDC_KAFKA_RECORD_TOPIC = "kafkaRecordTopic"; + public static final String MDC_KAFKA_RECORD_PARTITION = "kafkaRecordPartition"; + public static final String MDC_KAFKA_RECORD_OFFSET = "kafkaRecordOffset"; + public static final String MDC_KAFKA_RECORD_KEY = "kafkaRecordKey"; + + private MdcKeys() { + } + +} diff --git a/src/main/java/org/dependencytrack/event/kafka/processor/ProcessorInitializer.java b/src/main/java/org/dependencytrack/event/kafka/processor/ProcessorInitializer.java new file mode 100644 index 000000000..81aa599cf --- /dev/null +++ b/src/main/java/org/dependencytrack/event/kafka/processor/ProcessorInitializer.java @@ -0,0 +1,30 @@ +package org.dependencytrack.event.kafka.processor; + +import alpine.common.logging.Logger; +import org.dependencytrack.event.kafka.processor.api.ProcessorManager; + +import javax.servlet.ServletContextEvent; +import javax.servlet.ServletContextListener; + +public class ProcessorInitializer implements ServletContextListener { + + private static final Logger LOGGER = Logger.getLogger(ProcessorInitializer.class); + + static final ProcessorManager PROCESSOR_MANAGER = new ProcessorManager(); + + @Override + public void contextInitialized(final ServletContextEvent event) { + LOGGER.info("Initializing processors"); + + // TODO: Register processor here! + + PROCESSOR_MANAGER.startAll(); + } + + @Override + public void contextDestroyed(final ServletContextEvent event) { + LOGGER.info("Stopping processors"); + PROCESSOR_MANAGER.close(); + } + +} diff --git a/src/main/java/org/dependencytrack/event/kafka/processor/ProcessorsHealthCheck.java b/src/main/java/org/dependencytrack/event/kafka/processor/ProcessorsHealthCheck.java new file mode 100644 index 000000000..ebeabf037 --- /dev/null +++ b/src/main/java/org/dependencytrack/event/kafka/processor/ProcessorsHealthCheck.java @@ -0,0 +1,17 @@ +package org.dependencytrack.event.kafka.processor; + +import org.eclipse.microprofile.health.HealthCheck; +import org.eclipse.microprofile.health.HealthCheckResponse; +import org.eclipse.microprofile.health.Liveness; + +import static org.dependencytrack.event.kafka.processor.ProcessorInitializer.PROCESSOR_MANAGER; + +@Liveness +public class ProcessorsHealthCheck implements HealthCheck { + + @Override + public HealthCheckResponse call() { + return PROCESSOR_MANAGER.probeHealth(); + } + +} diff --git a/src/main/java/org/dependencytrack/event/kafka/processor/api/AbstractProcessingStrategy.java b/src/main/java/org/dependencytrack/event/kafka/processor/api/AbstractProcessingStrategy.java new file mode 100644 index 000000000..684e7c9a2 --- /dev/null +++ b/src/main/java/org/dependencytrack/event/kafka/processor/api/AbstractProcessingStrategy.java @@ -0,0 +1,87 @@ +package org.dependencytrack.event.kafka.processor.api; + +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.http.conn.ConnectTimeoutException; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.serialization.Serde; +import org.datanucleus.api.jdo.exceptions.ConnectionInUseException; +import org.datanucleus.store.query.QueryInterruptedException; +import org.dependencytrack.event.kafka.processor.exception.RetryableProcessingException; +import org.postgresql.util.PSQLState; + +import javax.jdo.JDOOptimisticVerificationException; +import java.net.SocketTimeoutException; +import java.sql.SQLException; +import java.sql.SQLTransientConnectionException; +import java.sql.SQLTransientException; +import java.util.List; +import java.util.concurrent.TimeoutException; + +/** + * An abstract {@link ProcessingStrategy} that provides various shared functionality. + * + * @param Type of the {@link ConsumerRecord} key + * @param Type of the {@link ConsumerRecord} value + */ +abstract class AbstractProcessingStrategy implements ProcessingStrategy { + + private final Serde keySerde; + private final Serde valueSerde; + + AbstractProcessingStrategy(final Serde keySerde, final Serde valueSerde) { + this.keySerde = keySerde; + this.valueSerde = valueSerde; + } + + /** + * @param record The {@link ConsumerRecord} to deserialize key and value of + * @return A {@link ConsumerRecord} with deserialized key and value + * @throws SerializationException When deserializing the {@link ConsumerRecord} failed + */ + ConsumerRecord deserialize(final ConsumerRecord record) { + final K deserializedKey; + final V deserializedValue; + try { + deserializedKey = keySerde.deserializer().deserialize(record.topic(), record.key()); + deserializedValue = valueSerde.deserializer().deserialize(record.topic(), record.value()); + } catch (RuntimeException e) { + if (e instanceof SerializationException) { + throw e; + } + + throw new SerializationException(e); + } + + return new ConsumerRecord<>(record.topic(), record.partition(), record.offset(), + record.timestamp(), record.timestampType(), record.serializedKeySize(), record.serializedValueSize(), + deserializedKey, deserializedValue, record.headers(), record.leaderEpoch()); + } + + private static final List> KNOWN_TRANSIENT_EXCEPTIONS = List.of( + ConnectTimeoutException.class, + ConnectionInUseException.class, + JDOOptimisticVerificationException.class, + QueryInterruptedException.class, + SocketTimeoutException.class, + SQLTransientException.class, + SQLTransientConnectionException.class, + TimeoutException.class + ); + + boolean isRetryableException(final Throwable throwable) { + if (throwable instanceof RetryableProcessingException) { + return true; + } + + final boolean isKnownTransientException = ExceptionUtils.getThrowableList(throwable).stream() + .anyMatch(cause -> KNOWN_TRANSIENT_EXCEPTIONS.contains(cause.getClass())); + if (isKnownTransientException) { + return true; + } + + return ExceptionUtils.getRootCause(throwable) instanceof final SQLException se + && PSQLState.isConnectionError(se.getSQLState()); + } + +} diff --git a/src/main/java/org/dependencytrack/event/kafka/processor/api/BatchProcessingStrategy.java b/src/main/java/org/dependencytrack/event/kafka/processor/api/BatchProcessingStrategy.java new file mode 100644 index 000000000..a292faa6e --- /dev/null +++ b/src/main/java/org/dependencytrack/event/kafka/processor/api/BatchProcessingStrategy.java @@ -0,0 +1,69 @@ +package org.dependencytrack.event.kafka.processor.api; + +import alpine.common.logging.Logger; +import io.confluent.parallelconsumer.PCRetriableException; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.serialization.Serde; +import org.dependencytrack.common.MdcKeys; +import org.dependencytrack.event.kafka.processor.exception.ProcessingException; +import org.slf4j.MDC; + +import java.util.ArrayList; +import java.util.List; + +/** + * A {@link ProcessingStrategy} that processes records in batches. + * + * @param Type of the {@link ConsumerRecord} key + * @param Type of the {@link ConsumerRecord} value + */ +class BatchProcessingStrategy extends AbstractProcessingStrategy { + + private static final Logger LOGGER = Logger.getLogger(BatchProcessingStrategy.class); + + private final BatchProcessor batchProcessor; + + BatchProcessingStrategy(final BatchProcessor batchProcessor, + final Serde keySerde, final Serde valueSerde) { + super(keySerde, valueSerde); + this.batchProcessor = batchProcessor; + } + + /** + * {@inheritDoc} + */ + @Override + public void processRecords(final List> records) { + final var deserializedRecords = new ArrayList>(records.size()); + for (final ConsumerRecord record : records) { + try (var ignoredMdcKafkaRecordTopic = MDC.putCloseable(MdcKeys.MDC_KAFKA_RECORD_TOPIC, record.topic()); + var ignoredMdcKafkaRecordPartition = MDC.putCloseable(MdcKeys.MDC_KAFKA_RECORD_PARTITION, String.valueOf(record.partition())); + var ignoredMdcKafkaRecordOffset = MDC.putCloseable(MdcKeys.MDC_KAFKA_RECORD_OFFSET, String.valueOf(record.offset()))) { + deserializedRecords.add(deserialize(record)); + } catch (SerializationException e) { + // TODO: Consider supporting error handlers, e.g. to send record to DLT. + LOGGER.error("Failed to deserialize record; Skipping", e); + } + } + + if (deserializedRecords.isEmpty()) { + LOGGER.warn("All of the %d records in this batch failed to be deserialized".formatted(records.size())); + return; + } + + try { + batchProcessor.process(deserializedRecords); + } catch (ProcessingException | RuntimeException e) { + if (isRetryableException(e)) { + LOGGER.warn("Encountered retryable exception while processing %d records".formatted(deserializedRecords.size()), e); + throw new PCRetriableException(e); + } + + LOGGER.error("Encountered non-retryable exception while processing %d records; Skipping".formatted(deserializedRecords.size()), e); + // TODO: Consider supporting error handlers, e.g. to send records to DLT. + // Skip records to avoid poison-pill scenario. + } + } + +} diff --git a/src/main/java/org/dependencytrack/event/kafka/processor/api/BatchProcessor.java b/src/main/java/org/dependencytrack/event/kafka/processor/api/BatchProcessor.java new file mode 100644 index 000000000..5e1b6ff03 --- /dev/null +++ b/src/main/java/org/dependencytrack/event/kafka/processor/api/BatchProcessor.java @@ -0,0 +1,26 @@ +package org.dependencytrack.event.kafka.processor.api; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.dependencytrack.event.kafka.processor.exception.ProcessingException; + +import java.util.List; + +/** + * A processor of {@link ConsumerRecord} batches. + * + * @param Type of the {@link ConsumerRecord} key + * @param Type of the {@link ConsumerRecord} value + */ +public interface BatchProcessor { + + /** + * Process a batch of {@link ConsumerRecord}s. + *

+ * This method may be called by multiple threads concurrently and thus MUST be thread safe! + * + * @param records Batch of {@link ConsumerRecord}s to process + * @throws ProcessingException When consuming the batch of {@link ConsumerRecord}s failed + */ + void process(final List> records) throws ProcessingException; + +} diff --git a/src/main/java/org/dependencytrack/event/kafka/processor/api/ProcessingStrategy.java b/src/main/java/org/dependencytrack/event/kafka/processor/api/ProcessingStrategy.java new file mode 100644 index 000000000..9809fbff6 --- /dev/null +++ b/src/main/java/org/dependencytrack/event/kafka/processor/api/ProcessingStrategy.java @@ -0,0 +1,16 @@ +package org.dependencytrack.event.kafka.processor.api; + +import org.apache.kafka.clients.consumer.ConsumerRecord; + +import java.util.List; + +interface ProcessingStrategy { + + /** + * Process zero or more {@link ConsumerRecord}s. + * + * @param records The {@link ConsumerRecord}s to process + */ + void processRecords(final List> records); + +} \ No newline at end of file diff --git a/src/main/java/org/dependencytrack/event/kafka/processor/api/Processor.java b/src/main/java/org/dependencytrack/event/kafka/processor/api/Processor.java new file mode 100644 index 000000000..e905a7937 --- /dev/null +++ b/src/main/java/org/dependencytrack/event/kafka/processor/api/Processor.java @@ -0,0 +1,24 @@ +package org.dependencytrack.event.kafka.processor.api; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.dependencytrack.event.kafka.processor.exception.ProcessingException; + +/** + * A processor of individual {@link ConsumerRecord}s. + * + * @param Type of the {@link ConsumerRecord} key + * @param Type of the {@link ConsumerRecord} value + */ +public interface Processor { + + /** + * Process a {@link ConsumerRecord}. + *

+ * This method may be called by multiple threads concurrently and thus MUST be thread safe! + * + * @param record The {@link ConsumerRecord} to process + * @throws ProcessingException When processing the {@link ConsumerRecord} failed + */ + void process(final ConsumerRecord record) throws ProcessingException; + +} diff --git a/src/main/java/org/dependencytrack/event/kafka/processor/api/ProcessorManager.java b/src/main/java/org/dependencytrack/event/kafka/processor/api/ProcessorManager.java new file mode 100644 index 000000000..bf408ac4c --- /dev/null +++ b/src/main/java/org/dependencytrack/event/kafka/processor/api/ProcessorManager.java @@ -0,0 +1,374 @@ +package org.dependencytrack.event.kafka.processor.api; + +import alpine.Config; +import alpine.common.logging.Logger; +import alpine.common.metrics.Metrics; +import io.confluent.parallelconsumer.ParallelConsumerOptions; +import io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder; +import io.confluent.parallelconsumer.ParallelEoSStreamProcessor; +import io.confluent.parallelconsumer.ParallelStreamProcessor; +import io.github.resilience4j.core.IntervalFunction; +import io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.DescribeTopicsResult; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.dependencytrack.common.ConfigKey; +import org.dependencytrack.event.kafka.KafkaTopics.Topic; +import org.eclipse.microprofile.health.HealthCheckResponse; + +import java.time.Duration; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG; +import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG; +import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG; +import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG; +import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG; +import static org.dependencytrack.common.ConfigKey.KAFKA_BOOTSTRAP_SERVERS; +import static org.dependencytrack.event.kafka.processor.api.ProcessorProperties.PROPERTY_MAX_BATCH_SIZE; +import static org.dependencytrack.event.kafka.processor.api.ProcessorProperties.PROPERTY_MAX_BATCH_SIZE_DEFAULT; +import static org.dependencytrack.event.kafka.processor.api.ProcessorProperties.PROPERTY_MAX_CONCURRENCY; +import static org.dependencytrack.event.kafka.processor.api.ProcessorProperties.PROPERTY_MAX_CONCURRENCY_DEFAULT; +import static org.dependencytrack.event.kafka.processor.api.ProcessorProperties.PROPERTY_PROCESSING_ORDER; +import static org.dependencytrack.event.kafka.processor.api.ProcessorProperties.PROPERTY_PROCESSING_ORDER_DEFAULT; +import static org.dependencytrack.event.kafka.processor.api.ProcessorProperties.PROPERTY_RETRY_INITIAL_DELAY_MS; +import static org.dependencytrack.event.kafka.processor.api.ProcessorProperties.PROPERTY_RETRY_INITIAL_DELAY_MS_DEFAULT; +import static org.dependencytrack.event.kafka.processor.api.ProcessorProperties.PROPERTY_RETRY_MAX_DELAY_MS; +import static org.dependencytrack.event.kafka.processor.api.ProcessorProperties.PROPERTY_RETRY_MAX_DELAY_MS_DEFAULT; +import static org.dependencytrack.event.kafka.processor.api.ProcessorProperties.PROPERTY_RETRY_MULTIPLIER; +import static org.dependencytrack.event.kafka.processor.api.ProcessorProperties.PROPERTY_RETRY_MULTIPLIER_DEFAULT; +import static org.dependencytrack.event.kafka.processor.api.ProcessorProperties.PROPERTY_RETRY_RANDOMIZATION_FACTOR; +import static org.dependencytrack.event.kafka.processor.api.ProcessorProperties.PROPERTY_RETRY_RANDOMIZATION_FACTOR_DEFAULT; + +public class ProcessorManager implements AutoCloseable { + + private static final Logger LOGGER = Logger.getLogger(ProcessorManager.class); + private static final Pattern PROCESSOR_NAME_PATTERN = Pattern.compile("^[a-z.]+$"); + + private final Map managedProcessors = new LinkedHashMap<>(); + private final UUID instanceId; + private final Config config; + + public ProcessorManager() { + this(UUID.randomUUID(), Config.getInstance()); + } + + public ProcessorManager(final UUID instanceId, final Config config) { + this.instanceId = instanceId; + this.config = config; + } + + /** + * Register a new {@link Processor}. + * + * @param name Name of the processor to register + * @param processor The processor to register + * @param topic The topic to have the processor subscribe to + * @param Type of record keys in the topic + * @param Type of record values in the topic + */ + public void registerProcessor(final String name, final Topic topic, final Processor processor) { + requireValidProcessorName(name); + final var processingStrategy = new SingleRecordProcessingStrategy<>(processor, topic.keySerde(), topic.valueSerde()); + final ParallelStreamProcessor parallelConsumer = createParallelConsumer(name, false); + managedProcessors.put(name, new ManagedProcessor(parallelConsumer, processingStrategy, topic.name())); + } + + /** + * Register a new {@link BatchProcessor}. + * + * @param name Name of the processor to register + * @param processor The processor to register + * @param topic The topic to have the processor subscribe to + * @param Type of record keys in the topic + * @param Type of record values in the topic + */ + public void registerBatchProcessor(final String name, final Topic topic, final BatchProcessor processor) { + requireValidProcessorName(name); + final var processingStrategy = new BatchProcessingStrategy<>(processor, topic.keySerde(), topic.valueSerde()); + final ParallelStreamProcessor parallelConsumer = createParallelConsumer(name, true); + managedProcessors.put(name, new ManagedProcessor(parallelConsumer, processingStrategy, topic.name())); + } + + @SuppressWarnings("resource") + public void startAll() { + try (final AdminClient adminClient = createAdminClient()) { + ensureTopicsExist(adminClient); + } + + for (final Map.Entry entry : managedProcessors.entrySet()) { + final String processorName = entry.getKey(); + final ManagedProcessor managedProcessor = entry.getValue(); + + LOGGER.info("Starting processor %s to consume from topic %s".formatted(processorName, managedProcessor.topic())); + managedProcessor.parallelConsumer().subscribe(List.of(managedProcessor.topic())); + managedProcessor.parallelConsumer().poll(pollCtx -> { + // NB: Unless batching is enabled, the below list only ever contains a single record. + final List> polledRecords = pollCtx.getConsumerRecordsFlattened(); + managedProcessor.processingStrategy().processRecords(polledRecords); + }); + } + } + + public HealthCheckResponse probeHealth() { + final var responseBuilder = HealthCheckResponse.named("kafka-processors"); + + boolean isUp = true; + for (final Map.Entry entry : managedProcessors.entrySet()) { + final String processorName = entry.getKey(); + final ParallelStreamProcessor parallelConsumer = entry.getValue().parallelConsumer(); + final boolean isProcessorUp = !parallelConsumer.isClosedOrFailed(); + + responseBuilder.withData(processorName, isProcessorUp + ? HealthCheckResponse.Status.UP.name() + : HealthCheckResponse.Status.DOWN.name()); + if (isProcessorUp + && parallelConsumer instanceof final ParallelEoSStreamProcessor concreteParallelConsumer + && concreteParallelConsumer.getFailureCause() != null) { + responseBuilder.withData("%s_failure_reason".formatted(processorName), + concreteParallelConsumer.getFailureCause().getMessage()); + } + + isUp &= isProcessorUp; + } + + return responseBuilder.status(isUp).build(); + } + + @Override + @SuppressWarnings("resource") + public void close() { + for (final Map.Entry entry : managedProcessors.entrySet()) { + final String processorName = entry.getKey(); + final ManagedProcessor managedProcessor = entry.getValue(); + + LOGGER.info("Stopping processor %s".formatted(processorName)); + managedProcessor.parallelConsumer().closeDontDrainFirst(); + } + } + + private void ensureTopicsExist(final AdminClient adminClient) { + final List topicNames = managedProcessors.values().stream().map(ManagedProcessor::topic).toList(); + final DescribeTopicsResult topicsResult = adminClient.describeTopics(topicNames); + + final var exceptionsByTopicName = new HashMap(); + for (final Map.Entry> entry : topicsResult.topicNameValues().entrySet()) { + final String topicName = entry.getKey(); + try { + entry.getValue().get(); + } catch (ExecutionException e) { + exceptionsByTopicName.put(topicName, e.getCause()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(""" + Thread was interrupted while waiting for broker response. \ + The existence of topic %s can not be determined.""".formatted(topicName), e); + } + } + + if (!exceptionsByTopicName.isEmpty()) { + final String exceptionSummary = exceptionsByTopicName.entrySet().stream() + .map(entry -> "{topic=%s, error=%s}".formatted(entry.getKey(), entry.getValue())) + .collect(Collectors.joining(", ", "[", "]")); + throw new IllegalStateException("Existence of %d topic(s) could not be verified: %s" + .formatted(exceptionsByTopicName.size(), exceptionSummary)); + } + } + + private ParallelStreamProcessor createParallelConsumer(final String processorName, final boolean isBatch) { + final var optionsBuilder = ParallelConsumerOptions.builder() + .consumer(createConsumer(processorName)); + + final Map properties = getPassThroughProperties(processorName.toLowerCase()); + + final ProcessingOrder processingOrder = Optional.ofNullable(properties.get(PROPERTY_PROCESSING_ORDER)) + .map(String::toUpperCase) + .map(ProcessingOrder::valueOf) + .orElse(PROPERTY_PROCESSING_ORDER_DEFAULT); + optionsBuilder.ordering(processingOrder); + + final int maxConcurrency = Optional.ofNullable(properties.get(PROPERTY_MAX_CONCURRENCY)) + .map(Integer::parseInt) + .orElse(PROPERTY_MAX_CONCURRENCY_DEFAULT); + optionsBuilder.maxConcurrency(maxConcurrency); + + final Optional optionalMaxBatchSizeProperty = Optional.ofNullable(properties.get(PROPERTY_MAX_BATCH_SIZE)); + if (isBatch) { + if (processingOrder == ProcessingOrder.PARTITION) { + LOGGER.warn(""" + Processor %s is configured to use batching with processing order %s; \ + Batch sizes are limited by the number of partitions in the topic, \ + and may thus not yield the desired effect \ + (https://github.com/confluentinc/parallel-consumer/issues/551)\ + """.formatted(processorName, processingOrder)); + } + + final int maxBatchSize = optionalMaxBatchSizeProperty + .map(Integer::parseInt) + .orElse(PROPERTY_MAX_BATCH_SIZE_DEFAULT); + optionsBuilder.batchSize(maxBatchSize); + } else if (optionalMaxBatchSizeProperty.isPresent()) { + LOGGER.warn("Processor %s is configured with %s, but it is not a batch processor; Ignoring property" + .formatted(processorName, PROPERTY_MAX_BATCH_SIZE)); + } + + final IntervalFunction retryIntervalFunction = getRetryIntervalFunction(properties); + optionsBuilder.retryDelayProvider(recordCtx -> { + final long delayMillis = retryIntervalFunction.apply(recordCtx.getNumberOfFailedAttempts()); + return Duration.ofMillis(delayMillis); + }); + + if (Config.getInstance().getPropertyAsBoolean(Config.AlpineKey.METRICS_ENABLED)) { + optionsBuilder + .meterRegistry(Metrics.getRegistry()) + .pcInstanceTag(processorName); + } + + final ParallelConsumerOptions options = optionsBuilder.build(); + LOGGER.debug("Creating parallel consumer for processor %s with options %s".formatted(processorName, options)); + return ParallelStreamProcessor.createEosStreamProcessor(options); + } + + private Consumer createConsumer(final String processorName) { + final var consumerConfig = new HashMap(); + consumerConfig.put(BOOTSTRAP_SERVERS_CONFIG, config.getProperty(KAFKA_BOOTSTRAP_SERVERS)); + consumerConfig.put(CLIENT_ID_CONFIG, "%s-%s-consumer".formatted(instanceId, processorName)); + consumerConfig.put(GROUP_ID_CONFIG, processorName); + consumerConfig.putAll(getGlobalTlsConfig()); + + final String propertyPrefix = "%s.consumer".formatted(processorName.toLowerCase()); + final Map properties = getPassThroughProperties(propertyPrefix); + for (final Map.Entry property : properties.entrySet()) { + if (!ConsumerConfig.configNames().contains(property.getKey())) { + LOGGER.warn("Consumer property %s was set for processor %s, but is unknown; Ignoring" + .formatted(property.getKey(), processorName)); + continue; + } + + consumerConfig.put(property.getKey(), property.getValue()); + } + + // Properties that MUST NOT be overwritten under any circumstance have to be applied + // AFTER pass-through properties. + consumerConfig.put(KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + consumerConfig.put(VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + consumerConfig.put(ENABLE_AUTO_COMMIT_CONFIG, false); // Commits are managed by parallel consumer + + LOGGER.debug("Creating consumer for processor %s with options %s".formatted(processorName, consumerConfig)); + final var consumer = new KafkaConsumer(consumerConfig); + + if (config.getPropertyAsBoolean(Config.AlpineKey.METRICS_ENABLED)) { + new KafkaClientMetrics(consumer).bindTo(Metrics.getRegistry()); + } + + return consumer; + } + + private AdminClient createAdminClient() { + final var adminClientConfig = new HashMap(); + adminClientConfig.put(BOOTSTRAP_SERVERS_CONFIG, config.getProperty(KAFKA_BOOTSTRAP_SERVERS)); + adminClientConfig.put(CLIENT_ID_CONFIG, "%s-admin-client".formatted(instanceId)); + adminClientConfig.putAll(getGlobalTlsConfig()); + + LOGGER.debug("Creating admin client with options %s".formatted(adminClientConfig)); + return AdminClient.create(adminClientConfig); + } + + private Map getGlobalTlsConfig() { + if (!config.getPropertyAsBoolean(ConfigKey.KAFKA_TLS_ENABLED)) { + return Collections.emptyMap(); + } + + final var tlsConfig = new HashMap(); + tlsConfig.put(SECURITY_PROTOCOL_CONFIG, config.getProperty(ConfigKey.KAFKA_TLS_PROTOCOL)); + tlsConfig.put(SSL_TRUSTSTORE_LOCATION_CONFIG, config.getProperty(ConfigKey.TRUST_STORE_PATH)); + tlsConfig.put(SSL_TRUSTSTORE_PASSWORD_CONFIG, config.getProperty(ConfigKey.TRUST_STORE_PASSWORD)); + + if (config.getPropertyAsBoolean(ConfigKey.KAFKA_MTLS_ENABLED)) { + tlsConfig.put(SSL_KEYSTORE_LOCATION_CONFIG, config.getProperty(ConfigKey.KEY_STORE_PATH)); + tlsConfig.put(SSL_KEYSTORE_PASSWORD_CONFIG, config.getProperty(ConfigKey.KEY_STORE_PASSWORD)); + } + + return tlsConfig; + } + + private Map getPassThroughProperties(final String prefix) { + final String fullPrefix = "kafka.processor.%s".formatted(prefix); + final Pattern fullPrefixPattern = Pattern.compile(Pattern.quote("%s.".formatted(fullPrefix))); + + final Map properties = config.getPassThroughProperties(fullPrefix); + if (properties.isEmpty()) { + return properties; + } + + final var trimmedProperties = new HashMap(properties.size()); + for (final Map.Entry property : properties.entrySet()) { + final String trimmedKey = fullPrefixPattern.matcher(property.getKey()).replaceFirst(""); + trimmedProperties.put(trimmedKey, property.getValue()); + } + + return trimmedProperties; + } + + /** + * Validate a given {@link Processor} name. + *

+ * Due to how Alpine's {@link Config} is resolved, {@link Processor} names must have a specific + * format in order to be able to resolve properties for them. + * + * @param name The {@link Processor} name to validate. + */ + private static void requireValidProcessorName(final String name) { + if (name == null) { + throw new IllegalArgumentException("name must not be null"); + } + if (!PROCESSOR_NAME_PATTERN.matcher(name).matches()) { + throw new IllegalArgumentException("name is invalid; names must match the regular expression %s" + .formatted(PROCESSOR_NAME_PATTERN.pattern())); + } + } + + private static IntervalFunction getRetryIntervalFunction(final Map properties) { + final long initialDelayMs = Optional.ofNullable(properties.get(PROPERTY_RETRY_INITIAL_DELAY_MS)) + .map(Long::parseLong) + .orElse(PROPERTY_RETRY_INITIAL_DELAY_MS_DEFAULT); + final long maxDelayMs = Optional.ofNullable(properties.get(PROPERTY_RETRY_MAX_DELAY_MS)) + .map(Long::parseLong) + .orElse(PROPERTY_RETRY_MAX_DELAY_MS_DEFAULT); + final int multiplier = Optional.ofNullable(properties.get(PROPERTY_RETRY_MULTIPLIER)) + .map(Integer::parseInt) + .orElse(PROPERTY_RETRY_MULTIPLIER_DEFAULT); + final double randomizationFactor = Optional.ofNullable(properties.get(PROPERTY_RETRY_RANDOMIZATION_FACTOR)) + .map(Double::parseDouble) + .orElse(PROPERTY_RETRY_RANDOMIZATION_FACTOR_DEFAULT); + + return IntervalFunction.ofExponentialRandomBackoff(Duration.ofMillis(initialDelayMs), + multiplier, randomizationFactor, Duration.ofMillis(maxDelayMs)); + } + + private record ManagedProcessor(ParallelStreamProcessor parallelConsumer, + ProcessingStrategy processingStrategy, String topic) { + } + +} diff --git a/src/main/java/org/dependencytrack/event/kafka/processor/api/ProcessorProperties.java b/src/main/java/org/dependencytrack/event/kafka/processor/api/ProcessorProperties.java new file mode 100644 index 000000000..e7c1612c4 --- /dev/null +++ b/src/main/java/org/dependencytrack/event/kafka/processor/api/ProcessorProperties.java @@ -0,0 +1,25 @@ +package org.dependencytrack.event.kafka.processor.api; + +import io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder; + +final class ProcessorProperties { + + static final String PROPERTY_MAX_BATCH_SIZE = "max.batch.size"; + static final int PROPERTY_MAX_BATCH_SIZE_DEFAULT = 10; + static final String PROPERTY_MAX_CONCURRENCY = "max.concurrency"; + static final int PROPERTY_MAX_CONCURRENCY_DEFAULT = 1; + static final String PROPERTY_PROCESSING_ORDER = "processing.order"; + static final ProcessingOrder PROPERTY_PROCESSING_ORDER_DEFAULT = ProcessingOrder.PARTITION; + static final String PROPERTY_RETRY_INITIAL_DELAY_MS = "retry.initial.delay.ms"; + static final long PROPERTY_RETRY_INITIAL_DELAY_MS_DEFAULT = 1000; // 1s + static final String PROPERTY_RETRY_MULTIPLIER = "retry.multiplier"; + static final int PROPERTY_RETRY_MULTIPLIER_DEFAULT = 1; + static final String PROPERTY_RETRY_RANDOMIZATION_FACTOR = "retry.randomization.factor"; + static final double PROPERTY_RETRY_RANDOMIZATION_FACTOR_DEFAULT = 0.3; + static final String PROPERTY_RETRY_MAX_DELAY_MS = "retry.max.delay.ms"; + static final long PROPERTY_RETRY_MAX_DELAY_MS_DEFAULT = 60 * 1000; // 60s + + private ProcessorProperties() { + } + +} diff --git a/src/main/java/org/dependencytrack/event/kafka/processor/api/SingleRecordProcessingStrategy.java b/src/main/java/org/dependencytrack/event/kafka/processor/api/SingleRecordProcessingStrategy.java new file mode 100644 index 000000000..d085b4404 --- /dev/null +++ b/src/main/java/org/dependencytrack/event/kafka/processor/api/SingleRecordProcessingStrategy.java @@ -0,0 +1,81 @@ +package org.dependencytrack.event.kafka.processor.api; + +import alpine.common.logging.Logger; +import io.confluent.parallelconsumer.PCRetriableException; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.serialization.Serde; +import org.dependencytrack.event.kafka.processor.exception.ProcessingException; +import org.slf4j.MDC; + +import java.util.List; + +import static org.dependencytrack.common.MdcKeys.MDC_KAFKA_RECORD_KEY; +import static org.dependencytrack.common.MdcKeys.MDC_KAFKA_RECORD_OFFSET; +import static org.dependencytrack.common.MdcKeys.MDC_KAFKA_RECORD_PARTITION; +import static org.dependencytrack.common.MdcKeys.MDC_KAFKA_RECORD_TOPIC; + +/** + * A {@link ProcessingStrategy} that processes records individually. + * + * @param Type of the {@link ConsumerRecord} key + * @param Type of the {@link ConsumerRecord} value + */ +class SingleRecordProcessingStrategy extends AbstractProcessingStrategy { + + private static final Logger LOGGER = Logger.getLogger(SingleRecordProcessingStrategy.class); + + private final Processor processor; + + SingleRecordProcessingStrategy(final Processor processor, + final Serde keySerde, final Serde valueSerde) { + super(keySerde, valueSerde); + this.processor = processor; + } + + /** + * {@inheritDoc} + */ + @Override + public void processRecords(final List> records) { + if (records.isEmpty()) { + return; + } + if (records.size() > 1) { + throw new IllegalArgumentException("Expected at most one record, but received %d".formatted(records.size())); + } + + final ConsumerRecord record = records.get(0); + + try (var ignoredMdcKafkaRecordTopic = MDC.putCloseable(MDC_KAFKA_RECORD_TOPIC, record.topic()); + var ignoredMdcKafkaRecordPartition = MDC.putCloseable(MDC_KAFKA_RECORD_PARTITION, String.valueOf(record.partition())); + var ignoredMdcKafkaRecordOffset = MDC.putCloseable(MDC_KAFKA_RECORD_OFFSET, String.valueOf(record.offset()))) { + processRecord(record); + } + } + + private void processRecord(final ConsumerRecord record) { + final ConsumerRecord deserializedRecord; + try { + deserializedRecord = deserialize(record); + } catch (SerializationException e) { + LOGGER.error("Failed to deserialize consumer record %s; Skipping", e); + // TODO: Consider supporting error handlers, e.g. to send record to DLT. + return; // Skip record to avoid poison-pill scenario. + } + + try (var ignoredMdcKafkaRecordKey = MDC.putCloseable(MDC_KAFKA_RECORD_KEY, String.valueOf(deserializedRecord.key()))) { + processor.process(deserializedRecord); + } catch (ProcessingException | RuntimeException e) { + if (isRetryableException(e)) { + LOGGER.warn("Encountered retryable exception while processing record", e); + throw new PCRetriableException(e); + } + + LOGGER.error("Encountered non-retryable exception while processing record; Skipping", e); + // TODO: Consider supporting error handlers, e.g. to send record to DLT. + // Skip record to avoid poison-pill scenario. + } + } + +} \ No newline at end of file diff --git a/src/main/java/org/dependencytrack/event/kafka/processor/exception/ProcessingException.java b/src/main/java/org/dependencytrack/event/kafka/processor/exception/ProcessingException.java new file mode 100644 index 000000000..25afeb469 --- /dev/null +++ b/src/main/java/org/dependencytrack/event/kafka/processor/exception/ProcessingException.java @@ -0,0 +1,29 @@ +package org.dependencytrack.event.kafka.processor.exception; + +/** + * An {@link Exception} indicating an error during record processing. + */ +public class ProcessingException extends Exception { + + /** + * {@inheritDoc} + */ + public ProcessingException(final String message) { + super(message); + } + + /** + * {@inheritDoc} + */ + public ProcessingException(final String message, final Throwable cause) { + super(message, cause); + } + + /** + * {@inheritDoc} + */ + public ProcessingException(final Throwable cause) { + super(cause); + } + +} \ No newline at end of file diff --git a/src/main/java/org/dependencytrack/event/kafka/processor/exception/RetryableProcessingException.java b/src/main/java/org/dependencytrack/event/kafka/processor/exception/RetryableProcessingException.java new file mode 100644 index 000000000..1dd51fb75 --- /dev/null +++ b/src/main/java/org/dependencytrack/event/kafka/processor/exception/RetryableProcessingException.java @@ -0,0 +1,29 @@ +package org.dependencytrack.event.kafka.processor.exception; + +/** + * A {@link ProcessingException} indicating a retryable error. + */ +public class RetryableProcessingException extends ProcessingException { + + /** + * {@inheritDoc} + */ + public RetryableProcessingException(final String message) { + super(message); + } + + /** + * {@inheritDoc} + */ + public RetryableProcessingException(final String message, final Throwable cause) { + super(message, cause); + } + + /** + * {@inheritDoc} + */ + public RetryableProcessingException(final Throwable cause) { + super(cause); + } + +} diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 02213600c..ac06834f6 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -434,6 +434,16 @@ kafka.streams.production.exception.threshold.interval=PT30M kafka.streams.transient.processing.exception.threshold.count=50 kafka.streams.transient.processing.exception.threshold.interval=PT30M +# Optional +# alpine.kafka.processor..processing.order=partition +# alpine.kafka.processor..max.batch.size=10 +# alpine.kafka.processor..max.concurrency=1 +# alpine.kafka.processor..retry.initial.delay.ms=1000 +# alpine.kafka.processor..retry.multiplier=1 +# alpine.kafka.processor..retry.randomization.factor=0.3 +# alpine.kafka.processor..retry.max.delay.ms=60000 +# alpine.kafka.processor..consumer.= + # Scheduling tasks after 3 minutes (3*60*1000) of starting application task.scheduler.initial.delay=180000 diff --git a/src/main/webapp/WEB-INF/web.xml b/src/main/webapp/WEB-INF/web.xml index f7f1c5d63..374340174 100644 --- a/src/main/webapp/WEB-INF/web.xml +++ b/src/main/webapp/WEB-INF/web.xml @@ -47,6 +47,9 @@ org.dependencytrack.event.EventSubsystemInitializer + + org.dependencytrack.event.kafka.processor.ProcessorInitializer + org.dependencytrack.event.kafka.streams.KafkaStreamsInitializer diff --git a/src/test/java/org/dependencytrack/event/kafka/processor/AbstractProcessorTest.java b/src/test/java/org/dependencytrack/event/kafka/processor/AbstractProcessorTest.java new file mode 100644 index 000000000..f344651e4 --- /dev/null +++ b/src/test/java/org/dependencytrack/event/kafka/processor/AbstractProcessorTest.java @@ -0,0 +1,59 @@ +package org.dependencytrack.event.kafka.processor; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.record.TimestampType; +import org.dependencytrack.AbstractPostgresEnabledTest; + +import java.time.Instant; +import java.util.Optional; + +import static java.util.Objects.requireNonNullElseGet; + +abstract class AbstractProcessorTest extends AbstractPostgresEnabledTest { + + static ConsumerRecordBuilder aConsumerRecord(final K key, final V value) { + return new ConsumerRecordBuilder<>(key, value); + } + + static final class ConsumerRecordBuilder { + + private final K key; + private final V value; + private Instant timestamp; + private Headers headers; + + private ConsumerRecordBuilder(final K key, final V value) { + this.key = key; + this.value = value; + } + + ConsumerRecordBuilder withTimestamp(final Instant timestamp) { + this.timestamp = timestamp; + return this; + } + + ConsumerRecordBuilder withHeaders(final Headers headers) { + this.headers = headers; + return this; + } + + ConsumerRecord build() { + final Instant timestamp = requireNonNullElseGet(this.timestamp, Instant::now); + final Headers headers = requireNonNullElseGet(this.headers, RecordHeaders::new); + return new ConsumerRecord<>( + "topicName", + /* partition */ 0, + /* offset */ 1, + timestamp.toEpochMilli(), TimestampType.CREATE_TIME, + /* serializedKeySize */ -1, + /* serializedValueSize */ -1, + this.key, this.value, + headers, + /* leaderEpoch */ Optional.empty()); + } + + } + +} \ No newline at end of file diff --git a/src/test/java/org/dependencytrack/event/kafka/processor/api/ProcessorManagerTest.java b/src/test/java/org/dependencytrack/event/kafka/processor/api/ProcessorManagerTest.java new file mode 100644 index 000000000..e98a420f8 --- /dev/null +++ b/src/test/java/org/dependencytrack/event/kafka/processor/api/ProcessorManagerTest.java @@ -0,0 +1,256 @@ +package org.dependencytrack.event.kafka.processor.api; + +import alpine.Config; +import net.mguenther.kafka.junit.ExternalKafkaCluster; +import net.mguenther.kafka.junit.KeyValue; +import net.mguenther.kafka.junit.SendKeyValues; +import net.mguenther.kafka.junit.TopicConfig; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.dependencytrack.common.ConfigKey; +import org.dependencytrack.event.kafka.KafkaTopics.Topic; +import org.eclipse.microprofile.health.HealthCheckResponse; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.testcontainers.redpanda.RedpandaContainer; +import org.testcontainers.utility.DockerImageName; + +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + +import static net.javacrumbs.jsonunit.assertj.JsonAssertions.assertThatJson; +import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.awaitility.Awaitility.await; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +public class ProcessorManagerTest { + + @Rule + public RedpandaContainer kafkaContainer = new RedpandaContainer(DockerImageName + .parse("docker.redpanda.com/vectorized/redpanda:v23.3.3")); + + private ExternalKafkaCluster kafka; + private Config configMock; + + @Before + public void setUp() { + kafka = ExternalKafkaCluster.at(kafkaContainer.getBootstrapServers()); + + configMock = mock(Config.class); + when(configMock.getProperty(eq(ConfigKey.KAFKA_BOOTSTRAP_SERVERS))) + .thenReturn(kafkaContainer.getBootstrapServers()); + } + + @Test + public void testSingleRecordProcessor() throws Exception { + final var inputTopic = new Topic<>("input", Serdes.String(), Serdes.String()); + kafka.createTopic(TopicConfig.withName(inputTopic.name()).withNumberOfPartitions(3)); + + final var recordsProcessed = new AtomicInteger(0); + + when(configMock.getPassThroughProperties(eq("kafka.processor.foo.consumer"))) + .thenReturn(Map.of( + "kafka.processor.foo.processing.order", "key", + "kafka.processor.foo.max.concurrency", "5", + "kafka.processor.foo.consumer.auto.offset.reset", "earliest" + )); + + final Processor processor = + record -> recordsProcessed.incrementAndGet(); + + try (final var processorManager = new ProcessorManager(UUID.randomUUID(), configMock)) { + processorManager.registerProcessor("foo", inputTopic, processor); + + for (int i = 0; i < 100; i++) { + kafka.send(SendKeyValues.to("input", List.of(new KeyValue<>("foo" + i, "bar" + i))) + .with(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()) + .with(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName())); + } + + processorManager.startAll(); + + await("Record Processing") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> assertThat(recordsProcessed).hasValue(100)); + } + } + + @Test + public void testSingleRecordProcessorRetry() throws Exception { + final var inputTopic = new Topic<>("input", Serdes.String(), Serdes.String()); + kafka.createTopic(TopicConfig.withName(inputTopic.name()).withNumberOfPartitions(3)); + + final var attemptsCounter = new AtomicInteger(0); + + final var objectSpy = spy(new Object()); + when(objectSpy.toString()) + .thenThrow(new RuntimeException(new TimeoutException())) + .thenThrow(new RuntimeException(new TimeoutException())) + .thenThrow(new RuntimeException(new TimeoutException())) + .thenReturn("done"); + + final Processor processor = record -> { + attemptsCounter.incrementAndGet(); + var ignored = objectSpy.toString(); + }; + + when(configMock.getPassThroughProperties(eq("kafka.processor.foo"))) + .thenReturn(Map.of( + "kafka.processor.foo.retry.initial.delay.ms", "5", + "kafka.processor.foo.retry.multiplier", "1", + "kafka.processor.foo.retry.max.delay.ms", "10" + )); + when(configMock.getPassThroughProperties(eq("kafka.processor.foo.consumer"))) + .thenReturn(Map.of( + "kafka.processor.foo.consumer.auto.offset.reset", "earliest" + )); + + try (final var processorManager = new ProcessorManager(UUID.randomUUID(), configMock)) { + processorManager.registerProcessor("foo", inputTopic, processor); + + kafka.send(SendKeyValues.to("input", List.of(new KeyValue<>("foo", "bar"))) + .with(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()) + .with(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName())); + + processorManager.startAll(); + + await("Record Processing") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> assertThat(attemptsCounter).hasValue(4)); + } + } + + @Test + public void testBatchProcessor() throws Exception { + final var inputTopic = new Topic<>("input", Serdes.String(), Serdes.String()); + kafka.createTopic(TopicConfig.withName(inputTopic.name()).withNumberOfPartitions(3)); + + final var recordsProcessed = new AtomicInteger(0); + final var actualBatchSizes = new ConcurrentLinkedQueue<>(); + + when(configMock.getPassThroughProperties(eq("kafka.processor.foo"))) + .thenReturn(Map.of( + "kafka.processor.foo.processing.order", "key", + "kafka.processor.foo.max.batch.size", "100" + )); + when(configMock.getPassThroughProperties(eq("kafka.processor.foo.consumer"))) + .thenReturn(Map.of( + "kafka.processor.foo.consumer.auto.offset.reset", "earliest" + )); + + final BatchProcessor recordProcessor = records -> { + recordsProcessed.addAndGet(records.size()); + actualBatchSizes.add(records.size()); + }; + + try (final var processorManager = new ProcessorManager(UUID.randomUUID(), configMock)) { + processorManager.registerBatchProcessor("foo", inputTopic, recordProcessor); + + for (int i = 0; i < 1_000; i++) { + kafka.send(SendKeyValues.to("input", List.of(new KeyValue<>("foo" + i, "bar" + i))) + .with(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()) + .with(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName())); + } + + processorManager.startAll(); + + await("Record Processing") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> assertThat(recordsProcessed).hasValue(1_000)); + + assertThat(actualBatchSizes).containsOnly(100); + } + } + + @Test + public void testStartAllWithMissingTopics() { + final var inputTopicA = new Topic<>("input-a", Serdes.String(), Serdes.String()); + final var inputTopicB = new Topic<>("input-b", Serdes.String(), Serdes.String()); + + kafka.createTopic(TopicConfig.withName(inputTopicA.name()).withNumberOfPartitions(3)); + + final Processor processor = record -> { + }; + + try (final var processorManager = new ProcessorManager(UUID.randomUUID(), configMock)) { + processorManager.registerProcessor("a", inputTopicA, processor); + processorManager.registerProcessor("b", inputTopicB, processor); + + assertThatExceptionOfType(IllegalStateException.class) + .isThrownBy(processorManager::startAll) + .withMessage(""" + Existence of 1 topic(s) could not be verified: \ + [{topic=input-b, error=org.apache.kafka.common.errors.UnknownTopicOrPartitionException: \ + This server does not host this topic-partition.}]"""); + } + } + + @Test + public void testProbeHealth() { + final var inputTopic = new Topic<>("input", Serdes.String(), Serdes.String()); + kafka.createTopic(TopicConfig.withName(inputTopic.name()).withNumberOfPartitions(3)); + + final Processor processor = record -> { + }; + + try (final var processorManager = new ProcessorManager(UUID.randomUUID(), configMock)) { + processorManager.registerProcessor("foo", inputTopic, processor); + + { + final HealthCheckResponse healthCheckResponse = processorManager.probeHealth(); + assertThat(healthCheckResponse.getName()).isEqualTo("kafka-processors"); + assertThat(healthCheckResponse.getStatus()).isEqualTo(HealthCheckResponse.Status.UP); + assertThat(healthCheckResponse.getData()).isPresent(); + assertThatJson(healthCheckResponse.getData().get()) + .isEqualTo(""" + { + "foo": "UP" + } + """); + } + + processorManager.startAll(); + + { + final HealthCheckResponse healthCheckResponse = processorManager.probeHealth(); + assertThat(healthCheckResponse.getName()).isEqualTo("kafka-processors"); + assertThat(healthCheckResponse.getStatus()).isEqualTo(HealthCheckResponse.Status.UP); + assertThat(healthCheckResponse.getData()).isPresent(); + assertThatJson(healthCheckResponse.getData().get()) + .isEqualTo(""" + { + "foo": "UP" + } + """); + } + + processorManager.close(); + + { + final HealthCheckResponse healthCheckResponse = processorManager.probeHealth(); + assertThat(healthCheckResponse.getName()).isEqualTo("kafka-processors"); + assertThat(healthCheckResponse.getStatus()).isEqualTo(HealthCheckResponse.Status.DOWN); + assertThat(healthCheckResponse.getData()).isPresent(); + assertThatJson(healthCheckResponse.getData().get()) + .isEqualTo(""" + { + "foo": "DOWN" + } + """); + } + } + } + +} \ No newline at end of file