-
Notifications
You must be signed in to change notification settings - Fork 509
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #325 from Robban1980/feature/add_kafka_event_queue
Add support for consuming events from Kafka
- Loading branch information
Showing
12 changed files
with
1,688 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,133 @@ | ||
# Event Queue | ||
|
||
## Published Artifacts | ||
|
||
Group: `com.netflix.conductor` | ||
|
||
| Published Artifact | Description | | ||
| ----------- | ----------- | | ||
| conductor-kafka-event-queue | Support for integration with Kafka and consume events from it. | | ||
|
||
## Modules | ||
|
||
### Kafka | ||
|
||
https://kafka.apache.org/ | ||
|
||
## kafka-event-queue | ||
|
||
Provides ability to consume messages from Kafka. | ||
|
||
## Usage | ||
|
||
To use it in an event handler prefix the event with `kafka` followed by the topic. | ||
|
||
Example: | ||
|
||
```json | ||
{ | ||
"name": "kafka_test_event_handler", | ||
"event": "kafka:conductor-event", | ||
"actions": [ | ||
{ | ||
"action": "start_workflow", | ||
"start_workflow": { | ||
"name": "workflow_triggered_by_kafka", | ||
"input": { | ||
"payload": "${payload}" | ||
} | ||
}, | ||
"expandInlineJSON": true | ||
} | ||
], | ||
"active": true | ||
} | ||
``` | ||
|
||
The data from the kafka event has the format: | ||
|
||
```json | ||
{ | ||
"key": "key-1", | ||
"headers": { | ||
"header-1": "value1" | ||
}, | ||
"payload": { | ||
"first": "Marcelo", | ||
"middle": "Billie", | ||
"last": "Mertz" | ||
} | ||
} | ||
``` | ||
|
||
* `key` is the key field in Kafka message. | ||
* `headers` is the headers in the kafka message. | ||
* `payload` is the message of the Kafka message. | ||
|
||
To access them in the event handler use for example `"${payload}"` to access the payload property, which contains the kafka message data. | ||
|
||
## Configuration | ||
|
||
To enable the queue use set the following to true. | ||
|
||
```properties | ||
conductor.event-queues.kafka.enabled=true | ||
``` | ||
|
||
There are is a set of shared properties these are: | ||
|
||
```properties | ||
# If kafka should be used with event queues like SQS or AMPQ | ||
conductor.default-event-queue.type=kafka | ||
|
||
# the bootstrap server ot use. | ||
conductor.event-queues.kafka.bootstrap-servers=kafka:29092 | ||
|
||
# The dead letter queue to use for events that had some error. | ||
conductor.event-queues.kafka.dlq-topic=conductor-dlq | ||
|
||
# topic prefix combined with conductor.default-event-queue.type | ||
conductor.event-queues.kafka.listener-queue-prefix=conductor_ | ||
|
||
# The polling duration. Start at 500ms and reduce based on how your environment behaves. | ||
conductor.event-queues.kafka.poll-time-duration=500ms | ||
``` | ||
|
||
There are 3 clients that should be configured, there is the Consumer, responsible to consuming messages, Publisher that publishes messages to Kafka and the Admin which handles admin operations. | ||
|
||
The supported properties for the 3 clients are the ones included in `org.apache.kafka:kafka-clients:3.5.1` for each client type. | ||
|
||
## Consumer properties | ||
|
||
Example of consumer settings. | ||
|
||
```properties | ||
conductor.event-queues.kafka.consumer.client.id=consumer-client | ||
conductor.event-queues.kafka.consumer.auto.offset.reset=earliest | ||
conductor.event-queues.kafka.consumer.enable.auto.commit=false | ||
conductor.event-queues.kafka.consumer.fetch.min.bytes=1 | ||
conductor.event-queues.kafka.consumer.max.poll.records=500 | ||
conductor.event-queues.kafka.consumer.group-id=conductor-group | ||
``` | ||
|
||
## Producer properties | ||
|
||
Example of producer settings. | ||
|
||
```properties | ||
conductor.event-queues.kafka.producer.client.id=producer-client | ||
conductor.event-queues.kafka.producer.acks=all | ||
conductor.event-queues.kafka.producer.retries=5 | ||
conductor.event-queues.kafka.producer.batch.size=16384 | ||
conductor.event-queues.kafka.producer.linger.ms=10 | ||
conductor.event-queues.kafka.producer.compression.type=gzip | ||
``` | ||
|
||
## Admin properties | ||
|
||
Example of admin settings. | ||
|
||
```properties | ||
conductor.event-queues.kafka.admin.client.id=admin-client | ||
conductor.event-queues.kafka.admin.connections.max.idle.ms=10000 | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
dependencies { | ||
// Core Conductor dependencies | ||
implementation project(':conductor-common') | ||
implementation project(':conductor-core') | ||
|
||
// Spring Boot support | ||
implementation 'org.springframework.boot:spring-boot-starter' | ||
|
||
// Apache Commons Lang for utility classes | ||
implementation 'org.apache.commons:commons-lang3' | ||
|
||
// Reactive programming support with RxJava | ||
implementation "io.reactivex:rxjava:${revRxJava}" | ||
|
||
// SBMTODO: Remove Guava dependency if possible | ||
// Guava should only be included if specifically needed | ||
implementation "com.google.guava:guava:${revGuava}" | ||
|
||
// Removed AWS SQS SDK as we are transitioning to Kafka | ||
// implementation "com.amazonaws:aws-java-sdk-sqs:${revAwsSdk}" | ||
|
||
// Test dependencies | ||
testImplementation 'org.springframework.boot:spring-boot-starter-test' | ||
testImplementation project(':conductor-common').sourceSets.test.output | ||
|
||
|
||
// Add Kafka client dependency | ||
implementation 'org.apache.kafka:kafka-clients:3.5.1' | ||
|
||
// Add SLF4J API for logging | ||
implementation 'org.slf4j:slf4j-api:2.0.9' | ||
|
||
// Add SLF4J binding for logging with Logback | ||
runtimeOnly 'ch.qos.logback:logback-classic:1.4.11' | ||
} | ||
|
||
// test { | ||
// testLogging { | ||
// events "passed", "skipped", "failed" | ||
// showStandardStreams = true // Enable standard output | ||
// } | ||
// } | ||
|
||
|
102 changes: 102 additions & 0 deletions
102
...ueue/src/main/java/com/netflix/conductor/kafkaeq/config/KafkaEventQueueConfiguration.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
/* | ||
* Copyright 2024 Conductor Authors. | ||
* <p> | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* <p> | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* <p> | ||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on | ||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations under the License. | ||
*/ | ||
package com.netflix.conductor.kafkaeq.config; | ||
|
||
import java.util.HashMap; | ||
import java.util.Map; | ||
import java.util.Optional; | ||
|
||
import org.apache.commons.lang3.StringUtils; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
import org.springframework.beans.factory.annotation.Autowired; | ||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; | ||
import org.springframework.boot.context.properties.EnableConfigurationProperties; | ||
import org.springframework.context.annotation.Bean; | ||
import org.springframework.context.annotation.Configuration; | ||
|
||
import com.netflix.conductor.core.config.ConductorProperties; | ||
import com.netflix.conductor.core.events.EventQueueProvider; | ||
import com.netflix.conductor.core.events.queue.ObservableQueue; | ||
import com.netflix.conductor.kafkaeq.eventqueue.KafkaObservableQueue.Builder; | ||
import com.netflix.conductor.model.TaskModel.Status; | ||
|
||
@Configuration | ||
@EnableConfigurationProperties(KafkaEventQueueProperties.class) | ||
@ConditionalOnProperty(name = "conductor.event-queues.kafka.enabled", havingValue = "true") | ||
public class KafkaEventQueueConfiguration { | ||
|
||
@Autowired private KafkaEventQueueProperties kafkaProperties; | ||
|
||
private static final Logger LOGGER = | ||
LoggerFactory.getLogger(KafkaEventQueueConfiguration.class); | ||
|
||
public KafkaEventQueueConfiguration(KafkaEventQueueProperties kafkaProperties) { | ||
this.kafkaProperties = kafkaProperties; | ||
} | ||
|
||
@Bean | ||
public EventQueueProvider kafkaEventQueueProvider() { | ||
return new KafkaEventQueueProvider(kafkaProperties); | ||
} | ||
|
||
@ConditionalOnProperty( | ||
name = "conductor.default-event-queue.type", | ||
havingValue = "kafka", | ||
matchIfMissing = false) | ||
@Bean | ||
public Map<Status, ObservableQueue> getQueues( | ||
ConductorProperties conductorProperties, KafkaEventQueueProperties properties) { | ||
try { | ||
|
||
LOGGER.debug( | ||
"Starting to create KafkaObservableQueues with properties: {}", properties); | ||
|
||
String stack = | ||
Optional.ofNullable(conductorProperties.getStack()) | ||
.filter(stackName -> stackName.length() > 0) | ||
.map(stackName -> stackName + "_") | ||
.orElse(""); | ||
|
||
LOGGER.debug("Using stack: {}", stack); | ||
|
||
Status[] statuses = new Status[] {Status.COMPLETED, Status.FAILED}; | ||
Map<Status, ObservableQueue> queues = new HashMap<>(); | ||
|
||
for (Status status : statuses) { | ||
// Log the status being processed | ||
LOGGER.debug("Processing status: {}", status); | ||
|
||
String queuePrefix = | ||
StringUtils.isBlank(properties.getListenerQueuePrefix()) | ||
? conductorProperties.getAppId() + "_kafka_notify_" + stack | ||
: properties.getListenerQueuePrefix(); | ||
|
||
LOGGER.debug("queuePrefix: {}", queuePrefix); | ||
|
||
String topicName = queuePrefix + status.name(); | ||
|
||
LOGGER.debug("topicName: {}", topicName); | ||
|
||
final ObservableQueue queue = new Builder(properties).build(topicName); | ||
queues.put(status, queue); | ||
} | ||
|
||
LOGGER.debug("Successfully created queues: {}", queues); | ||
return queues; | ||
} catch (Exception e) { | ||
LOGGER.error("Failed to create KafkaObservableQueues", e); | ||
throw new RuntimeException("Failed to getQueues on KafkaEventQueueConfiguration", e); | ||
} | ||
} | ||
} |
Oops, something went wrong.