Skip to content

Commit

Permalink
Implement foundational API for parallel-consumer based Kafka processors
Browse files Browse the repository at this point in the history
Decoupled from #509

This merely adds an API on top of which processors can be implemented. We can migrate processors one-by-one from Kafka Streams to this API. Majority of this work was already done in #509, but got out of date due to changed priorities. At the very least said PR is good to take inspiration from.

Relates to DependencyTrack/hyades#346
Relates to DependencyTrack/hyades#901
Relates to DependencyTrack/hyades#907

Signed-off-by: nscuro <[email protected]>
  • Loading branch information
nscuro committed Feb 5, 2024
1 parent f906f65 commit 30cdcaa
Show file tree
Hide file tree
Showing 16 changed files with 1,108 additions and 0 deletions.
8 changes: 8 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
<lib.kafka-junit.version>3.6.0</lib.kafka-junit.version>
<lib.micrometer-jvm-extras.version>0.2.2</lib.micrometer-jvm-extras.version>
<lib.packageurl.version>1.4.1</lib.packageurl.version>
<lib.parallel-consumer.version>0.5.2.8</lib.parallel-consumer.version>
<lib.pebble.version>3.2.0</lib.pebble.version>
<lib.protobuf-java.version>3.25.2</lib.protobuf-java.version>
<lib.testcontainers.version>1.18.3</lib.testcontainers.version>
Expand Down Expand Up @@ -299,6 +300,13 @@
<artifactId>packageurl-java</artifactId>
<version>${lib.packageurl.version}</version>
</dependency>

<dependency>
<groupId>io.confluent.parallelconsumer</groupId>
<artifactId>parallel-consumer-core</artifactId>
<version>${lib.parallel-consumer.version}</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
Expand Down
16 changes: 16 additions & 0 deletions src/main/java/org/dependencytrack/common/MdcKeys.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.dependencytrack.common;

/**
* Common fields for use with SLF4J's {@link org.slf4j.MDC}.
*/
public final class MdcKeys {

public static final String MDC_KAFKA_RECORD_TOPIC = "kafkaRecordTopic";
public static final String MDC_KAFKA_RECORD_PARTITION = "kafkaRecordPartition";
public static final String MDC_KAFKA_RECORD_OFFSET = "kafkaRecordOffset";
public static final String MDC_KAFKA_RECORD_KEY = "kafkaRecordKey";

private MdcKeys() {
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package org.dependencytrack.event.kafka.processor;

import alpine.common.logging.Logger;
import org.dependencytrack.event.kafka.processor.api.ProcessorManager;

import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;

public class ProcessorInitializer implements ServletContextListener {

private static final Logger LOGGER = Logger.getLogger(ProcessorInitializer.class);

static final ProcessorManager PROCESSOR_MANAGER = new ProcessorManager();

@Override
public void contextInitialized(final ServletContextEvent event) {
LOGGER.info("Initializing processors");

// PROCESSOR_MANAGER.registerProcessor(MirroredVulnerabilityProcessor.PROCESSOR_NAME,
// new MirroredVulnerabilityProcessor(), KafkaTopics.NEW_VULNERABILITY);
// PROCESSOR_MANAGER.registerProcessor(RepositoryMetaResultProcessor.PROCESSOR_NAME,
// new RepositoryMetaResultProcessor(), KafkaTopics.REPO_META_ANALYSIS_RESULT);
// PROCESSOR_MANAGER.registerProcessor(VulnerabilityScanResultProcessor.PROCESSOR_NAME,
// new VulnerabilityScanResultProcessor(), KafkaTopics.VULN_ANALYSIS_RESULT);
// PROCESSOR_MANAGER.registerBatchProcessor(ProcessedVulnerabilityScanResultProcessor.PROCESSOR_NAME,
// new ProcessedVulnerabilityScanResultProcessor(), KafkaTopics.VULN_ANALYSIS_RESULT_PROCESSED);
// if (Config.getInstance().getPropertyAsBoolean(ConfigKey.TMP_DELAY_BOM_PROCESSED_NOTIFICATION)) {
// PROCESSOR_MANAGER.registerBatchProcessor(DelayedBomProcessedNotificationProcessor.PROCESSOR_NAME,
// new DelayedBomProcessedNotificationProcessor(), KafkaTopics.NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE);
// }

PROCESSOR_MANAGER.startAll();
}

@Override
public void contextDestroyed(final ServletContextEvent event) {
LOGGER.info("Stopping processors");
PROCESSOR_MANAGER.close();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.dependencytrack.event.kafka.processor;

import org.eclipse.microprofile.health.HealthCheck;
import org.eclipse.microprofile.health.HealthCheckResponse;
import org.eclipse.microprofile.health.Liveness;

import static org.dependencytrack.event.kafka.processor.ProcessorInitializer.PROCESSOR_MANAGER;

@Liveness
public class ProcessorsHealthCheck implements HealthCheck {

@Override
public HealthCheckResponse call() {
return PROCESSOR_MANAGER.probeHealth();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package org.dependencytrack.event.kafka.processor.api;

import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.http.conn.ConnectTimeoutException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serde;
import org.datanucleus.api.jdo.exceptions.ConnectionInUseException;
import org.datanucleus.store.query.QueryInterruptedException;
import org.dependencytrack.event.kafka.processor.exception.RetryableProcessingException;
import org.postgresql.util.PSQLState;

import javax.jdo.JDOOptimisticVerificationException;
import java.net.SocketTimeoutException;
import java.sql.SQLException;
import java.sql.SQLTransientConnectionException;
import java.sql.SQLTransientException;
import java.util.List;
import java.util.concurrent.TimeoutException;

/**
* An abstract {@link ProcessingStrategy} that provides various shared functionality.
*
* @param <K> Type of the {@link ConsumerRecord} key
* @param <V> Type of the {@link ConsumerRecord} value
*/
abstract class AbstractProcessingStrategy<K, V> implements ProcessingStrategy {

private final Serde<K> keySerde;
private final Serde<V> valueSerde;

AbstractProcessingStrategy(final Serde<K> keySerde, final Serde<V> valueSerde) {
this.keySerde = keySerde;
this.valueSerde = valueSerde;
}

/**
* @param record The {@link ConsumerRecord} to deserialize key and value of
* @return A {@link ConsumerRecord} with deserialized key and value
* @throws SerializationException When deserializing the {@link ConsumerRecord} failed
*/
ConsumerRecord<K, V> deserialize(final ConsumerRecord<byte[], byte[]> record) {
final K deserializedKey;
final V deserializedValue;
try {
deserializedKey = keySerde.deserializer().deserialize(record.topic(), record.key());
deserializedValue = valueSerde.deserializer().deserialize(record.topic(), record.value());
} catch (RuntimeException e) {
if (e instanceof SerializationException) {
throw e;
}

throw new SerializationException(e);
}

return new ConsumerRecord<>(record.topic(), record.partition(), record.offset(),
record.timestamp(), record.timestampType(), record.serializedKeySize(), record.serializedValueSize(),
deserializedKey, deserializedValue, record.headers(), record.leaderEpoch());
}

private static final List<Class<? extends Exception>> KNOWN_TRANSIENT_EXCEPTIONS = List.of(
ConnectTimeoutException.class,
ConnectionInUseException.class,
JDOOptimisticVerificationException.class,
QueryInterruptedException.class,
SocketTimeoutException.class,
SQLTransientException.class,
SQLTransientConnectionException.class,
TimeoutException.class
);

boolean isRetryableException(final Throwable throwable) {
if (throwable instanceof RetryableProcessingException) {
return true;
}

final boolean isKnownTransientException = ExceptionUtils.getThrowableList(throwable).stream()
.anyMatch(cause -> KNOWN_TRANSIENT_EXCEPTIONS.contains(cause.getClass()));
if (isKnownTransientException) {
return true;
}

return ExceptionUtils.getRootCause(throwable) instanceof final SQLException se
&& PSQLState.isConnectionError(se.getSQLState());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package org.dependencytrack.event.kafka.processor.api;

import alpine.common.logging.Logger;
import io.confluent.parallelconsumer.PCRetriableException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serde;
import org.dependencytrack.common.MdcKeys;
import org.dependencytrack.event.kafka.processor.exception.ProcessingException;
import org.slf4j.MDC;

import java.util.ArrayList;
import java.util.List;

/**
* A {@link ProcessingStrategy} that processes records in batches.
*
* @param <K> Type of the {@link ConsumerRecord} key
* @param <V> Type of the {@link ConsumerRecord} value
*/
class BatchProcessingStrategy<K, V> extends AbstractProcessingStrategy<K, V> {

private static final Logger LOGGER = Logger.getLogger(BatchProcessingStrategy.class);

private final BatchProcessor<K, V> batchProcessor;

BatchProcessingStrategy(final BatchProcessor<K, V> batchProcessor,
final Serde<K> keySerde, final Serde<V> valueSerde) {
super(keySerde, valueSerde);
this.batchProcessor = batchProcessor;
}

/**
* {@inheritDoc}
*/
@Override
public void processRecords(final List<ConsumerRecord<byte[], byte[]>> records) {
final var deserializedRecords = new ArrayList<ConsumerRecord<K, V>>(records.size());
for (final ConsumerRecord<byte[], byte[]> record : records) {
try (var ignoredMdcKafkaRecordTopic = MDC.putCloseable(MdcKeys.MDC_KAFKA_RECORD_TOPIC, record.topic());
var ignoredMdcKafkaRecordPartition = MDC.putCloseable(MdcKeys.MDC_KAFKA_RECORD_PARTITION, String.valueOf(record.partition()));
var ignoredMdcKafkaRecordOffset = MDC.putCloseable(MdcKeys.MDC_KAFKA_RECORD_OFFSET, String.valueOf(record.offset()))) {
deserializedRecords.add(deserialize(record));
} catch (SerializationException e) {
// TODO: Consider supporting error handlers, e.g. to send record to DLT.
LOGGER.error("Failed to deserialize record; Skipping", e);
}
}

if (deserializedRecords.isEmpty()) {
LOGGER.warn("All of the %d records in this batch failed to be deserialized".formatted(records.size()));
return;
}

try {
batchProcessor.process(deserializedRecords);
} catch (ProcessingException | RuntimeException e) {
if (isRetryableException(e)) {
LOGGER.warn("Encountered retryable exception while processing %d records".formatted(deserializedRecords.size()), e);
throw new PCRetriableException(e);
}

LOGGER.error("Encountered non-retryable exception while processing %d records; Skipping".formatted(deserializedRecords.size()), e);
// TODO: Consider supporting error handlers, e.g. to send records to DLT.
// Skip records to avoid poison-pill scenario.
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.dependencytrack.event.kafka.processor.api;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.dependencytrack.event.kafka.processor.exception.ProcessingException;

import java.util.List;

/**
* A processor of {@link ConsumerRecord} batches.
*
* @param <K> Type of the {@link ConsumerRecord} key
* @param <V> Type of the {@link ConsumerRecord} value
*/
public interface BatchProcessor<K, V> {

/**
* Process a batch of {@link ConsumerRecord}s.
* <p>
* This method may be called by multiple threads concurrently and thus MUST be thread safe!
*
* @param records Batch of {@link ConsumerRecord}s to process
* @throws ProcessingException When consuming the batch of {@link ConsumerRecord}s failed
*/
void process(final List<ConsumerRecord<K, V>> records) throws ProcessingException;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.dependencytrack.event.kafka.processor.api;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import java.util.List;

interface ProcessingStrategy {

/**
* Process zero or more {@link ConsumerRecord}s.
*
* @param records The {@link ConsumerRecord}s to process
*/
void processRecords(final List<ConsumerRecord<byte[], byte[]>> records);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package org.dependencytrack.event.kafka.processor.api;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.dependencytrack.event.kafka.processor.exception.ProcessingException;

/**
* A processor of individual {@link ConsumerRecord}s.
*
* @param <K> Type of the {@link ConsumerRecord} key
* @param <V> Type of the {@link ConsumerRecord} value
*/
public interface Processor<K, V> {

/**
* Process a {@link ConsumerRecord}.
* <p>
* This method may be called by multiple threads concurrently and thus MUST be thread safe!
*
* @param record The {@link ConsumerRecord} to process
* @throws ProcessingException When processing the {@link ConsumerRecord} failed
*/
void process(final ConsumerRecord<K, V> record) throws ProcessingException;

}
Loading

0 comments on commit 30cdcaa

Please sign in to comment.