Propel is a wrapper over the Confluent Kafka Go library which enables you to consume Kafka events in a batch or run a throttled consumer which adapts to your processing speed without leaving the consumer group.
- Propel [v0.5.1]
- Features
- Throttled Consumer API
- Batch Consumer API
- Common API
- FAQs
- What is the
ConsumerConfig.ConfigMapOverride
? - What is the difference between the
Run
andRunFunc
methods ? - What happens to the batch after the handler executes ?
- How are offsets committed ?
- How do I select an optimum batch size ?
- What is the
ShutdownTimeout
in the BatchConsumer struct? - How do I shut down the consumer from the handler ?
- Why are the stats info handled through a notifier function and not a proper stats collector lib ?
- What is the
- Examples
- Benchmarks
- Throttled consumer which adapts to your message processing speed unbounded by
max.poll.interval.ms
- Batch consumer which allows you to process a batch of messages bounded by
max.poll.interval.ms
- Small API footprint
- Uses a single dependency (uses testify only for testing)
The Throttled Consumer API lets you consume a batch of messages in a single go,
but it throttles the Kafka Consumer to adapt to your processing speed, which
means your consumer will not get kicked out even your processing exceeds the max.poll.interval.ms
The Throttled Consumer is a good fit if your message processing is blocking in nature and high throughput processing is not your priority
throtCon := throttled.Consumer{
PollTimeoutMS: 2000,
BatchSize: 200,
KafkaConsumerConf: &conflgo.ConsumerConfig{
BoostrapServers: "localhost:9092",
GroupID: "test_part_cons",
},
Logger: logger,
StatsNotifier: func(notif stats.Notification) {
// publish to a metric agent
},
}
_= throtCon.RunFunc(context.Background(), func(ctx context.Context, records []record.Record) {
// application logic
}, "test-topic")
The Batch Consumer API lets you consume a batch of messages in a single go, the batch consumer API runs each consumer in the group in its own Goroutine. The Batch consumer is a good fit if your consumer processing is fast, and you have high throughput requirements
batCon := batch.Consumer{
StatsNotifier: func(notif stats.Notification) {
// publish to statsd OR prometheus
},
BatchSize: 200,
KafkaConsumerConf: &conflgo.ConsumerConfig{
BoostrapServers: "localhost:9092",
GroupID: "foo.id",
},
Count: 3, // Controls the concurrency, for best performance set it to the number of partitions
ShutdownTimeout: 10000 * time.Millisecond,
Logger: slog.New(slog.NewTextHandler(os.Stdout, nil)),
}
_= batcCon.RunFunc(context.Background(), func(ctx context.Context, records []record.Record) {
// application logic
}, "test-topic")
Note
The RunFunc
panics if less than one topic is provided for both batch consumer and throttled consumer
propel.WorkerContext
is used to get the worker-ID which invoked the handler, this is useful for debugging purposes only. The handler is fully decoupled from the worker
The ConsumerConfig
provides a way to specify the consumer config in a type safe way, ConfigMapOverride
lets you directly specify a *kafka.ConfigMap
to override extra configuration.
For a full list of allowed configs, refer this.
The Run
method allows you to take an implementation of the Handler
interface, whereas the RunFunc
takes in a plain Go function as the handler
The batch is valid only for one handler invocation, the batch is cleared for the next invocation and the messages get GC-ed as per the GC schedule
We use a combination of auto-commits and manual offset store management, after every handler invocation we store the next set of offsets to be committed in the next auto commit cycle.
Selecting a good batch size is about evaluating tradeoffs
- A Smaller batch size is good if you want to limit the number of uncommitted messages, and want to limit the memory usage, as the batch is held in memory till processing completes.
- A Larger batch size is good if you have a high throughput and memory is not a concern.
- The batch itself can be concurrently processed for faster processing.
- Prefer a smaller batch is your processing time is on the high end of the spectrum and could take higher than
max.poll.interval.ms
to complete. - Prefer a higher batch size if your processing time is very fast and is well within the
max.poll.interval.ms
- Last but not the least, measure, measure and measure to fine tune the batch size.
- Sometimes processing can hang up the consumer poll loop preventing it to ignore the shutdown signals. This can cause your process to block indefinitely and not honoring the SIGTERM/SIGINT
- The
ShutdownTimeout
should be higher than the PollIntervalMS because the Poll loop by default blocks for PollTimeoutMS amount of time and cannot be cancelled, this is a known limitation of the Confluent Go Kafka Client. - The ShutdownTimeout should be avoided at all costs, instead make good use of the
context.Context
package to propagate timeouts and cancellation signals to your handler logic.
- Use the cancel function returned by the
context.<WithFunc>
, pass the cancel func to your handler or capture the reference using a closure. Remember, use context and proper timeouts in your handlers to provide proper deadlines, if your handler is blocking indefinitely, it is definitely a red flag in your application logic.
There are a lot of stats collectors / agents, and we cannot decide for the user which the better agent is,instead we are leaving that responsibility to the stats libraries which will do it better than us. We want to keep the lib's API footprint as small as possible
- For code usage refer the
example
directory - For full docs
godoc -http :8080
open http://localhost:8080/pkg/github.com/shubhang93/propel/
The stats.Notifier
receives a stats.Notification
payload with the appropriate stats information.
Metric Name | Type | Example Value | Label Values |
---|---|---|---|
poll_records_count | int64 | 1 | consumer_id,consumer_1 |
confluent_go_kafka_stats more info | string | {"key":"value"} | N/A |
worker_alive (only for throttled consumer ) | int64 | 0/1 | worker_id,worker#1 |
Note
Batch Consumers Only
Depending on the Count
value we spin up Count
number of consumers in its goroutine, which run their own poll loop
and consume messages from Kafka. The intermediate batching layer batches the messages into the batch size specified
by BatchSize
parameter in the config. The default value is 500. The BatchSize
value can be tweaked according to
your requirements. Currently, we only support auto-commits, the offset management is managed by the library
internally. You still have to process records within max.poll.interval.ms
to avoid getting your consumer getting kicked
out of the consumer group
Note
Throttled Consumers Only
The Throttled consumer works differently from the BatchConsumer, a single consumer is spun up which consumes from Kafka in one poll loop, messages are grouped based on a combination of topic and partition and is fed to the handler. The Throttled consumer pauses partitions until all records for a topic-partition are fully processed, this ensures that the poll is called as frequently as possible. You will not have to worry about your consumer getting kicked out due to slow consumption but this one comes with a cost, the ingestion throughput will suffer due to pausing and resuming of partitions. This consumer is better suited for blocking handler operations and not meant for fast consumption. All the concurrency is handled at the Goroutine level, records for each partition are handled in its own goroutine without spinning a physical consumer.
Device: Macbook Air M1 2020
8GB RAM
256 GB SSD
Kafka Consumer Config:
max.poll.interval.ms: 15s
Poll Timeout: 100ms
Produce Count: 10_000
Message Key Max Size: 11 bytes
Message Value Max Size: 5 bytes
Note
The memory will vary for different keys and values, so will the allocations. Most of the allocations are happening on the Confluent Go Kafka Lib
The below benchmark uses constant 10ms
as the time taken for processing a batch of records.
goos: darwin
goarch: arm64
pkg: github.com/shubhang93/propel/batch
BenchmarkConsumer_RunFunc
BenchmarkConsumer_RunFunc/batch_size_1
BenchmarkConsumer_RunFunc/batch_size_1-8 1 76022596041 ns/op 14988048 B/op 386652 allocs/op
BenchmarkConsumer_RunFunc/batch_size_100
BenchmarkConsumer_RunFunc/batch_size_100-8 1 4030724333 ns/op 9633216 B/op 186594 allocs/op
BenchmarkConsumer_RunFunc/batch_size_500
BenchmarkConsumer_RunFunc/batch_size_500-8 1 3522976500 ns/op 10023040 B/op 184559 allocs/op
BenchmarkConsumer_RunFunc/batch_size_1000
BenchmarkConsumer_RunFunc/batch_size_1000-8 1 3423151125 ns/op 9560160 B/op 184259 allocs/op
BenchmarkConsumer_RunFunc/batch_size_5000
BenchmarkConsumer_RunFunc/batch_size_5000-8 1 3448683125 ns/op 9764704 B/op 183978 allocs/op
PASS
If your processing logic does not support batch operations you will most likely not gain a lot of benefits from batching, but if your processing logic
supports batch operations, your processing time drastically decreases, if we compare the first and second benchmark we are seeing a 94%
decrease in the processing time
while using batching, however we see diminishing returns with higher batch sizes. This benchmark can be used to select an optimum batch size. These numbers are for reference,
whether your application will see the same amount of decrease in the processing time is subjective to your message Key Value content and the processing logic's complexity. However,
these numbers serve as a good blueprint for optimising your workloads.
The below benchmark doesn't use any sort of sleep in the handler
goos: darwin
goarch: arm64
pkg: github.com/shubhang93/propel/throttled
BenchmarkConsumer_RunFunc
BenchmarkConsumer_RunFunc/batch_size_10
BenchmarkConsumer_RunFunc/batch_size_10-8 1 68003927042 ns/op 9500232 B/op 224834 allocs/op
BenchmarkConsumer_RunFunc/batch_size_100
BenchmarkConsumer_RunFunc/batch_size_100-8 1 13771014584 ns/op 8003480 B/op 186456 allocs/op
BenchmarkConsumer_RunFunc/batch_size_500
BenchmarkConsumer_RunFunc/batch_size_500-8 1 7242461500 ns/op 7920240 B/op 182793 allocs/op
BenchmarkConsumer_RunFunc/batch_size_1000
BenchmarkConsumer_RunFunc/batch_size_1000-8 1 6327211292 ns/op 7939696 B/op 182280 allocs/op
BenchmarkConsumer_RunFunc/batch_size_5000
BenchmarkConsumer_RunFunc/batch_size_5000-8 1 4245761291 ns/op 8258512 B/op 181528 allocs/op
PASS
We see that there is a significant overhead with pausing and resuming of partitions. At higher batch sizes the overhead is significantly reduced, hence the throttled consumer isn't a good fit for high throughput workloads. It is a better fit for handlers which block.