Kafka Workers is a client library which unifies records consuming from Kafka and processing them by user-defined WorkerTasks. It provides:
- higher level of distribution because of sub-partitioning defined by WorkerPartitioner,
- tighter control of offsets commits to Kafka applied by RecordStatusObserver,
- possibility to pause and resume processing by WorkerTask for given partition,
- at-least-once state and output semantics,
- backpressure,
- processing timeouts,
- handling failures.
Current version is 1.0.11
You need Java 11 and at least Apache Kafka 2.0 to use this library.
Releases are distributed on Maven central:
<dependency>
<groupId>com.rtbhouse</groupId>
<artifactId>kafka-workers</artifactId>
<version>1.0.11</version>
</dependency>
To use Kafka Workers you should implement the following interfaces:
public interface WorkerTask<K, V> {
void init(WorkerSubpartition subpartition, WorkersConfig config);
boolean accept(WorkerRecord<K, V> record);
void process(WorkerRecord<K, V> record, RecordStatusObserver observer);
void punctuate(long punctuateTime);
void close();
}
User-defined task which is associated with one of WorkerSubpartitions. The most crucial are: accept() and process() methods. The first one checks if given WorkerRecord could be polled from internal WorkerSubpartition's queue peek and passed to process method. The second one processes just polled WorkerRecord from given WorkerSubpartition's internal queue. Processing could be done synchronously or asynchronously but in both cases one of the RecordStatusObserver's methods onSuccess() or onFailure() has to be called. Not calling any of these methods for configurable amount of time will be considered as a failure. Additionally, punctuate() method allows to do maintenance tasks every configurable amount of time independently if there are records to process or not. All the methods: accept(), process() and punctuate() are executed in a single thread sequentially so synchronization is not necessary. What is more, both methods: init() and close() are synchronized with these accept(), process() and punctuate() internally by Kafka Workers so additional user synchronization is not necessary for these calls as well.
public interface WorkerPartitioner<K, V> {
int subpartition(ConsumerRecord<K, V> consumerRecord);
int count(TopicPartition topicPartition);
}
User-defined partitioner is used for additional sub-partitioning which could give better distribution of processing. It means that stream of records from one TopicPartition could be reordered during processing but records with the same WorkerSubpartition remain ordered to each other. It leads also to a bit more complex offsets committing policy which is provided by Kafka Workers to ensure at-least-once delivery.
Usage example:
Properties properties = new Properties();
properties.setProperty("consumer.topics", "my-topic");
properties.setProperty("consumer.kafka.bootstrap.servers", "localhost:9192");
properties.setProperty("consumer.kafka.group.id", "my-workers");
properties.setProperty("consumer.kafka.key.deserializer", "org.apache.kafka.common.serialization.BytesDeserializer");
properties.setProperty("consumer.kafka.value.deserializer", "org.apache.kafka.common.serialization.BytesDeserializer");
KafkaWorkers<String, String> kafkaWorkers = new KafkaWorkers<>(
new WorkersConfig(properties),
new MyWorkerTaskFactory<>(),
new MyWorkerPartitioner<>(),
new MyShutdownCallback());
Runtime.getRuntime().addShutdownHook(new Thread(kafkaWorkers::shutdown));
kafkaWorkers.start();
}
Name | Description | Type | Default |
---|---|---|---|
consumer.topics | A list of kafka topics read by ConsumerThread. | list | |
consumer.commit.interval.ms | The frequency in milliseconds that the processed offsets are committed to Kafka. | long | 10000 |
consumer.processing.timeout.ms | The timeout in milliseconds for record to be successfully processed. | long | 300000 |
consumer.poll.timeout.ms | The time in milliseconds spent waiting in poll if data is not available in the buffer. If 0, returns immediately with any records that are available currently in the buffer, else returns empty. | long | 1000 |
consumer.commit.retries | The number of retries in case of retriable commit failed exception. | int | 3 |
consumer.kafka | Should be used as a prefix for internal kafka consumer configuration. Usage example:
|
||
worker.threads.num | The number of WorkerThreads per one Kafka Workers instance. | int | 1 |
worker.sleep.ms | The time in milliseconds to wait for WorkerThread in case of not accepted tasks. | long | 1000 |
worker.processing.guarantee | Specifies worker processing guarantees. Possible values:
|
String | at_least_once |
worker.task | Could be used as a prefix for internal task configuration. | ||
punctuator.interval.ms | The frequency in milliseconds that punctuate method is called. | long | 1000 |
queue.max.size.bytes | This configuration controls the max size in bytes for single WorkerSubpartition's internal queue. | long | 268435456 |
queue.total.max.size.bytes | This configuration controls the total max size in bytes for all internal queues. | long | null |
queue.resume.ratio | The minimum ratio of used to total queue size for partition resuming. | double | 0.9 |
metric.reporters | A list of classes to use as metrics reporters. Implementing the org.apache.kafka.common.metrics.MetricsReporter interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics. |
list | "" |
Internally one Kafka Workers instance launches one consumer thread, one punctuator thread and configurable count of worker threads. Each thread can execute one or more WorkerTasks and each WorkerTask processes WorkerRecords from internal queue associated with given WorkerSubpartition. Kafka Workers ensures by its offsets state that only continuously processed offsets are commited.