Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[STALE] Replace Kafka Streams with Confluent Parallel Consumer #509

Closed
wants to merge 26 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
3997731
Move Kafka Streams classes to dedicated `streams` package
nscuro Dec 26, 2023
021d87b
WIP: Add mechanism to instantiate and manage parallel consumers
nscuro Dec 27, 2023
5bdf47e
WIP: Use properly typed interfaces instead of reflection for record p…
nscuro Dec 28, 2023
317b19f
WIP: Simplify processor APIs
nscuro Dec 28, 2023
275d832
WIP: Decouple processing strategy from parallel consumer specifics
nscuro Dec 28, 2023
ff0cef8
WIP: Handle retryable exceptions
nscuro Dec 28, 2023
34566ec
WIP: Cleanup
nscuro Dec 28, 2023
215a51c
WIP: Add processor application properties
nscuro Dec 28, 2023
5bd649d
WIP: Add more processor application properties
nscuro Dec 28, 2023
f2b205a
WIP: Add customizable consumer retry delays
nscuro Dec 28, 2023
8cc614f
WIP: Specify retry delays in milliseconds
nscuro Dec 28, 2023
c1e4889
WIP: Migrate `MirrorVulnerabilityProcessor`
nscuro Dec 28, 2023
2a5e698
WIP: Migrate `RepositoryMetaResultProcessor`
nscuro Dec 28, 2023
ed7f762
WIP: Handle more retryable exceptions
nscuro Dec 28, 2023
bf39346
WIP: Fix missing `TimeoutException` check
nscuro Dec 28, 2023
8dc71fd
Implement batch processor for processed vuln scan results
nscuro Dec 29, 2023
e125fd4
Increase `fetch.min.bytes` for more effective batching
nscuro Dec 29, 2023
40897cb
Migrate `DelayedBomProcessedNotificationProcessor`
nscuro Dec 30, 2023
befe8cb
Add health check for Kafka processors
nscuro Dec 30, 2023
7e2a123
Ensure processor properties can be provided via env vars
nscuro Dec 30, 2023
6578f95
Simplify processor API class names
nscuro Dec 30, 2023
e071090
Minor tweaks
nscuro Dec 30, 2023
ce20a62
Make processor implementations package-private
nscuro Dec 30, 2023
80b5742
Support dispatch of multiple events with arbitrary key-value types
nscuro Jan 1, 2024
8f8d76a
Add thread safety notice
nscuro Jan 1, 2024
530bccd
Migrate `VulnerabilityScanResultProcessor`
nscuro Jan 1, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
<lib.logstash-logback-encoder.version>7.3</lib.logstash-logback-encoder.version>
<lib.micrometer-jvm-extras.version>0.2.2</lib.micrometer-jvm-extras.version>
<lib.packageurl.version>1.4.1</lib.packageurl.version>
<lib.parallel-consumer.version>0.5.2.7</lib.parallel-consumer.version>
<lib.pebble.version>3.2.0</lib.pebble.version>
<lib.protobuf-java.version>3.25.1</lib.protobuf-java.version>
<lib.testcontainers.version>1.18.3</lib.testcontainers.version>
Expand Down Expand Up @@ -300,6 +301,13 @@
<artifactId>packageurl-java</artifactId>
<version>${lib.packageurl.version}</version>
</dependency>

<dependency>
<groupId>io.confluent.parallelconsumer</groupId>
<artifactId>parallel-consumer-core</artifactId>
<version>${lib.parallel-consumer.version}</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ class KafkaDefaultProducerCallback implements Callback {
private final String topic;
private final Object key;

KafkaDefaultProducerCallback(final Logger logger) {
this(logger, null, null);
}

KafkaDefaultProducerCallback(final Logger logger, final String topic, final Object key) {
this.logger = logger;
this.topic = topic;
Expand All @@ -27,7 +31,11 @@ class KafkaDefaultProducerCallback implements Callback {
@Override
public void onCompletion(final RecordMetadata metadata, final Exception exception) {
if (exception != null) {
logger.error("Failed to produce record with key %s to topic %s".formatted(key, topic), exception);
if (topic != null) {
logger.error("Failed to produce record with key %s to topic %s".formatted(key, topic), exception);
} else {
logger.error("Failed to produce record", exception);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,10 @@

import java.util.Map;

record KafkaEvent<K, V>(Topic<K, V> topic, K key, V value, Map<String, String> headers) {
public record KafkaEvent<K, V>(Topic<K, V> topic, K key, V value, Map<String, String> headers) {

public KafkaEvent(final Topic<K, V> topic, final K key, final V value) {
this(topic, key, value, null);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* Utility class to convert {@link alpine.event.framework.Event}s and {@link alpine.notification.Notification}s
* to {@link KafkaEvent}s.
*/
final class KafkaEventConverter {
public final class KafkaEventConverter {

private KafkaEventConverter() {
}
Expand Down Expand Up @@ -65,7 +65,7 @@ static KafkaEvent<String, AnalysisCommand> convert(final ComponentRepositoryMeta
return new KafkaEvent<>(KafkaTopics.REPO_META_ANALYSIS_COMMAND, event.purlCoordinates(), analysisCommand, null);
}

static KafkaEvent<String, Notification> convert(final String key, final Notification notification) {
public static KafkaEvent<String, Notification> convert(final String key, final Notification notification) {
final Topic<String, Notification> topic = switch (notification.getGroup()) {
case GROUP_CONFIGURATION -> KafkaTopics.NOTIFICATION_CONFIGURATION;
case GROUP_DATASOURCE_MIRRORING -> KafkaTopics.NOTIFICATION_DATASOURCE_MIRRORING;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
import org.dependencytrack.model.Vulnerability;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

Expand Down Expand Up @@ -144,6 +147,50 @@
return CompletableFuture.completedFuture(null);
}

return producer.send(toProducerRecord(event), requireNonNullElseGet(callback,
() -> new KafkaDefaultProducerCallback(LOGGER, event.topic().name(), event.key())));
}

public void dispatchAllBlocking(final List<KafkaEvent<?, ?>> events) {
dispatchAllBlocking(events, null);
}

public void dispatchAllBlocking(final List<KafkaEvent<?, ?>> events, Callback callback) {
final var countDownLatch = new CountDownLatch(events.size());

callback = requireNonNullElseGet(callback, () -> new KafkaDefaultProducerCallback(LOGGER));

Check warning on line 161 in src/main/java/org/dependencytrack/event/kafka/KafkaEventDispatcher.java

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

src/main/java/org/dependencytrack/event/kafka/KafkaEventDispatcher.java#L161

Avoid reassigning parameters such as 'callback'
callback = decorateCallback(callback, ((metadata, exception) -> countDownLatch.countDown()));

dispatchAllAsync(events, callback);

try {
countDownLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new KafkaException("""
Thread was interrupted while waiting for all events to be acknowledged \
by the broker. The acknowledgement of %d/%d events can not be determined.\
""".formatted(countDownLatch.getCount(), events.size()), e);
}
}

public <K, V> List<Future<RecordMetadata>> dispatchAllAsync(final List<KafkaEvent<?, ?>> events, Callback callback) {
final var records = new ArrayList<ProducerRecord<byte[], byte[]>>(events.size());
for (final KafkaEvent<?, ?> event : events) {
records.add(toProducerRecord(event));
}

callback = requireNonNullElseGet(callback, () -> new KafkaDefaultProducerCallback(LOGGER));

final var futures = new ArrayList<Future<RecordMetadata>>(records.size());
for (final ProducerRecord<byte[], byte[]> record : records) {
futures.add(producer.send(record, callback));
}

return futures;
}

private static <K, V> ProducerRecord<byte[], byte[]> toProducerRecord(final KafkaEvent<K, V> event) {
final byte[] keyBytes;
try (final Serde<K> keySerde = event.topic().keySerde()) {
keyBytes = keySerde.serializer().serialize(event.topic().name(), event.key());
Expand All @@ -165,8 +212,14 @@
}
}

return producer.send(record, requireNonNullElseGet(callback,
() -> new KafkaDefaultProducerCallback(LOGGER, record.topic(), event.key())));
return record;
}

private static Callback decorateCallback(final Callback originalCallback, final Callback decoratorCallback) {
return (metadata, exception) -> {
decoratorCallback.onCompletion(metadata, exception);
originalCallback.onCompletion(metadata, exception);
};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public final class KafkaTopics {
public static final Topic<String, AnalysisResult> REPO_META_ANALYSIS_RESULT;
public static final Topic<ScanKey, ScanCommand> VULN_ANALYSIS_COMMAND;
public static final Topic<ScanKey, ScanResult> VULN_ANALYSIS_RESULT;
public static final Topic<String, ScanResult> VULN_ANALYSIS_RESULT_PROCESSED;

public static final Topic<String, Notification> NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE;
private static final Serde<Notification> NOTIFICATION_SERDE = new KafkaProtobufSerde<>(Notification.parser());
Expand All @@ -58,6 +59,7 @@ public final class KafkaTopics {
REPO_META_ANALYSIS_RESULT = new Topic<>("dtrack.repo-meta-analysis.result", Serdes.String(), new KafkaProtobufSerde<>(AnalysisResult.parser()));
VULN_ANALYSIS_COMMAND = new Topic<>("dtrack.vuln-analysis.component", new KafkaProtobufSerde<>(ScanKey.parser()), new KafkaProtobufSerde<>(ScanCommand.parser()));
VULN_ANALYSIS_RESULT = new Topic<>("dtrack.vuln-analysis.result", new KafkaProtobufSerde<>(ScanKey.parser()), new KafkaProtobufSerde<>(ScanResult.parser()));
VULN_ANALYSIS_RESULT_PROCESSED = new Topic<>("dtrack.vuln-analysis.result.processed", Serdes.String(), new KafkaProtobufSerde<>(ScanResult.parser()));
NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE = new Topic<>("dtrack.notification.project-vuln-analysis-complete", Serdes.String(), NOTIFICATION_SERDE);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,87 +1,128 @@
package org.dependencytrack.event.kafka.processor;

import alpine.Config;
import alpine.common.logging.Logger;
import alpine.notification.NotificationLevel;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.dependencytrack.model.Bom;
import org.dependencytrack.model.Project;
import com.google.protobuf.Any;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Timestamp;
import com.google.protobuf.util.Timestamps;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.dependencytrack.common.ConfigKey;
import org.dependencytrack.event.kafka.KafkaEvent;
import org.dependencytrack.event.kafka.KafkaEventDispatcher;
import org.dependencytrack.event.kafka.KafkaTopics;
import org.dependencytrack.event.kafka.processor.api.BatchProcessor;
import org.dependencytrack.event.kafka.processor.exception.ProcessingException;
import org.dependencytrack.model.VulnerabilityScan;
import org.dependencytrack.model.WorkflowStatus;
import org.dependencytrack.model.WorkflowStep;
import org.dependencytrack.notification.NotificationConstants;
import org.dependencytrack.notification.NotificationGroup;
import org.dependencytrack.notification.NotificationScope;
import org.dependencytrack.notification.vo.BomConsumedOrProcessed;
import org.dependencytrack.persistence.QueryManager;
import org.dependencytrack.persistence.jdbi.NotificationSubjectDao;
import org.dependencytrack.proto.notification.v1.BomConsumedOrProcessedSubject;
import org.dependencytrack.proto.notification.v1.Notification;
import org.dependencytrack.proto.notification.v1.ProjectVulnAnalysisCompleteSubject;

import javax.jdo.Query;
import java.util.UUID;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import static org.dependencytrack.parser.dependencytrack.NotificationModelConverter.convert;
import static org.dependencytrack.persistence.jdbi.JdbiFactory.jdbi;
import static org.dependencytrack.proto.notification.v1.Group.GROUP_BOM_PROCESSED;
import static org.dependencytrack.proto.notification.v1.Level.LEVEL_INFORMATIONAL;
import static org.dependencytrack.proto.notification.v1.Scope.SCOPE_PORTFOLIO;

/**
* A {@link Processor} responsible for dispatching {@link NotificationGroup#BOM_PROCESSED} notifications
* A {@link BatchProcessor} responsible for dispatching {@link NotificationGroup#BOM_PROCESSED} notifications
* upon detection of a completed {@link VulnerabilityScan}.
* <p>
* The completion detection is based on {@link NotificationGroup#PROJECT_VULN_ANALYSIS_COMPLETE} notifications.
* This processor does nothing unless {@link ConfigKey#TMP_DELAY_BOM_PROCESSED_NOTIFICATION} is enabled.
*/
public class DelayedBomProcessedNotificationProcessor extends ContextualProcessor<String, VulnerabilityScan, String, Notification> {
class DelayedBomProcessedNotificationProcessor implements BatchProcessor<String, Notification> {

static final String PROCESSOR_NAME = "delayed.bom.processed.notification";

private static final Logger LOGGER = Logger.getLogger(DelayedBomProcessedNotificationProcessor.class);

private final Config config;
private final KafkaEventDispatcher eventDispatcher;

DelayedBomProcessedNotificationProcessor() {
this(Config.getInstance(), new KafkaEventDispatcher());
}

DelayedBomProcessedNotificationProcessor(final Config config, final KafkaEventDispatcher eventDispatcher) {
this.config = config;
this.eventDispatcher = eventDispatcher;
}

@Override
public void process(final Record<String, VulnerabilityScan> record) {
final VulnerabilityScan vulnScan = record.value();
public void process(final List<ConsumerRecord<String, Notification>> records) throws ProcessingException {
if (!config.getPropertyAsBoolean(ConfigKey.TMP_DELAY_BOM_PROCESSED_NOTIFICATION)) {
return;
}

if (vulnScan.getStatus() != VulnerabilityScan.Status.COMPLETED
&& vulnScan.getStatus() != VulnerabilityScan.Status.FAILED) {
LOGGER.warn("Received vulnerability scan with non-terminal status %s; Dropping (token=%s, project=%s)"
.formatted(vulnScan.getStatus(), vulnScan.getToken(), vulnScan.getTargetIdentifier()));
final Set<String> tokens = extractTokens(records);
if (tokens.isEmpty()) {
LOGGER.warn("No token could be extracted from any of the %d records in this batch"
.formatted(records.size()));
return;
}

final Project project;
final List<BomConsumedOrProcessedSubject> subjects;
try (final var qm = new QueryManager()) {
if (!qm.hasWorkflowStepWithStatus(UUID.fromString(vulnScan.getToken()), WorkflowStep.BOM_PROCESSING, WorkflowStatus.COMPLETED)) {
LOGGER.debug("Received completed vulnerability scan, but no %s step exists in this workflow; Dropping (token=%s, project=%s)"
.formatted(WorkflowStep.BOM_PROCESSING, vulnScan.getToken(), vulnScan.getTargetIdentifier()));
return;
subjects = jdbi(qm).withExtension(NotificationSubjectDao.class,
dao -> dao.getForDelayedBomProcessed(tokens));
}

dispatchNotifications(subjects);
}

private static Set<String> extractTokens(final List<ConsumerRecord<String, Notification>> records) {
final var tokens = new HashSet<String>();
for (final ConsumerRecord<String, Notification> record : records) {
final Notification notification = record.value();
if (!notification.hasSubject() || !notification.getSubject().is(ProjectVulnAnalysisCompleteSubject.class)) {
continue;
}

project = getProject(qm, vulnScan.getTargetIdentifier());
if (project == null) {
LOGGER.warn("Received completed vulnerability scan, but the target project does not exist; Dropping (token=%s, project=%s)"
.formatted(vulnScan.getToken(), vulnScan.getTargetIdentifier()));
return;
final ProjectVulnAnalysisCompleteSubject subject;
try {
subject = notification.getSubject().unpack(ProjectVulnAnalysisCompleteSubject.class);
} catch (InvalidProtocolBufferException e) {
LOGGER.warn("Failed to unpack notification subject from %s; Skipping".formatted(record), e);
continue;
}

tokens.add(subject.getToken());
}

final var alpineNotification = new alpine.notification.Notification()
.scope(NotificationScope.PORTFOLIO)
.group(NotificationGroup.BOM_PROCESSED)
.level(NotificationLevel.INFORMATIONAL)
.title(NotificationConstants.Title.BOM_PROCESSED)
// BOM format and spec version are hardcoded because we don't have this information at this point.
// DT currently only accepts CycloneDX anyway.
.content("A %s BOM was processed".formatted(Bom.Format.CYCLONEDX.getFormatShortName()))
.subject(new BomConsumedOrProcessed(UUID.fromString(vulnScan.getToken()), project, /* bom */ "(Omitted)", Bom.Format.CYCLONEDX, "Unknown"));

context().forward(record.withKey(project.getUuid().toString()).withValue(convert(alpineNotification)));
LOGGER.info("Dispatched delayed %s notification (token=%s, project=%s)"
.formatted(NotificationGroup.BOM_PROCESSED, vulnScan.getToken(), vulnScan.getTargetIdentifier()));
return tokens;
}

private static Project getProject(final QueryManager qm, final UUID uuid) {
final Query<Project> projectQuery = qm.getPersistenceManager().newQuery(Project.class);
projectQuery.setFilter("uuid == :uuid");
projectQuery.setParameters(uuid);
projectQuery.getFetchPlan().clearGroups(); // Ensure we're not loading too much bloat.
projectQuery.getFetchPlan().setGroup(Project.FetchGroup.NOTIFICATION.name());
try {
return qm.getPersistenceManager().detachCopy(projectQuery.executeResultUnique(Project.class));
} finally {
projectQuery.closeAll();
private void dispatchNotifications(final List<BomConsumedOrProcessedSubject> subjects) {
final Timestamp timestamp = Timestamps.now();
final var events = new ArrayList<KafkaEvent<?, ?>>(subjects.size());
for (final BomConsumedOrProcessedSubject subject : subjects) {
final var event = new KafkaEvent<>(KafkaTopics.NOTIFICATION_BOM,
subject.getProject().getUuid(), Notification.newBuilder()
.setScope(SCOPE_PORTFOLIO)
.setGroup(GROUP_BOM_PROCESSED)
.setLevel(LEVEL_INFORMATIONAL)
.setTimestamp(timestamp)
.setTitle(NotificationConstants.Title.BOM_PROCESSED)
.setContent("A %s BOM was processed".formatted(subject.getBom().getFormat()))
.setSubject(Any.pack(subject))
.build());
events.add(event);
}

eventDispatcher.dispatchAllBlocking(events);

for (final BomConsumedOrProcessedSubject subject : subjects) {
LOGGER.info("Dispatched delayed %s notification (token=%s, project=%s)"
.formatted(GROUP_BOM_PROCESSED, subject.getToken(), subject.getProject().getUuid()));
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.dependencytrack.event.kafka.processor;

import org.eclipse.microprofile.health.HealthCheck;
import org.eclipse.microprofile.health.HealthCheckResponse;
import org.eclipse.microprofile.health.Liveness;

import static org.dependencytrack.event.kafka.processor.KafkaProcessorsInitializer.PROCESSOR_MANAGER;

@Liveness
public class KafkaProcessorsHealthCheck implements HealthCheck {

@Override
public HealthCheckResponse call() {
return PROCESSOR_MANAGER.probeHealth();
}

}
Loading
Loading