This gem is used to consume Kafka messages. It is a wrapper over the Karafka gem, and is recommended for use as a transport with the sbmt-outbox gem.
Add this line to your application's Gemfile:
gem "sbmt-kafka_consumer"
And then execute:
bundle install
Learn how to use this gem and how it works with Ruby on Rails at here https://github.com/SberMarket-Tech/outbox-example-apps
We recommend going through the configuration and file creation process using the following Rails generators. Each generator can be run by using the --help
option to learn more about the available arguments.
If you plug the gem into your application for the first time, you can generate the initial configuration:
rails g kafka_consumer:install
As the result, the config/kafka_consumer.yml
file will be created.
A consumer class can be generated with the following command:
rails g kafka_consumer:consumer MaybeNamespaced::Name
To generate an Inbox consumer for use with gem sbmt-outbox, run the following command:
rails g kafka_consumer:inbox_consumer MaybeNamespaced::Name some-consumer-group some-topic
The config/kafka_consumer.yml
file is a main configuration for the gem.
Example config with a full set of options:
default: &default
client_id: "my-app-consumer"
concurrency: 4 # max number of threads
# optional Karafka options
max_wait_time: 1
shutdown_timeout: 60
pause_timeout: 1
pause_max_timeout: 30
pause_with_exponential_backoff: true
partition_assignment_strategy: cooperative-sticky
auth:
kind: plaintext
kafka:
servers: "kafka:9092"
# optional Kafka options
heartbeat_timeout: 5
session_timeout: 30
reconnect_timeout: 3
connect_timeout: 5
socket_timeout: 30
kafka_options:
allow.auto.create.topics: true
probes: # optional section
port: 9394
endpoints:
readiness:
enabled: true
path: "/readiness"
liveness:
enabled: true
path: "/liveness"
timeout: 15
max_error_count: 15 # default 10
metrics: # optional section
port: 9090
path: "/metrics"
consumer_groups:
group_ref_id_1:
name: cg_with_single_topic
topics:
- name: topic_with_inbox_items
consumer:
klass: "Sbmt::KafkaConsumer::InboxConsumer"
init_attrs:
name: "test_items"
inbox_item: "TestInboxItem"
deserializer:
klass: "Sbmt::KafkaConsumer::Serialization::NullDeserializer"
kafka_options:
auto.offset.reset: latest
group_ref_id_2:
name: cg_with_multiple_topics
topics:
- name: topic_with_json_data
consumer:
klass: "SomeConsumer"
deserializer:
klass: "Sbmt::KafkaConsumer::Serialization::JsonDeserializer"
- name: topic_with_protobuf_data
consumer:
klass: "SomeConsumer"
deserializer:
klass: "Sbmt::KafkaConsumer::Serialization::ProtobufDeserializer"
init_attrs:
message_decoder_klass: "SomeDecoder"
skip_decoding_error: true
development:
<<: *default
test:
<<: *default
deliver: false
production:
<<: *default
The gem supports 2 variants: plaintext (default) and SASL-plaintext
SASL-plaintext:
auth:
kind: sasl_plaintext
sasl_username: user
sasl_password: pwd
sasl_mechanism: SCRAM-SHA-512
The servers
key is required and should be in rdkafka format: without kafka://
prefix, for example: srv1:port1,srv2:port2,...
.
The kafka_config
section may contain any rdkafka option. Also, kafka_options
may be redefined for each topic.
Please note that the partition.assignment.strategy
option within kafka_options is not supported for topics; instead, use the global option partition_assignment_strategy.
consumer_groups:
# group id can be used when starting a consumer process (see CLI section below)
group_id:
name: some_group_name # required
topics:
- name: some_topic_name # required
active: true # optional, default true
consumer:
klass: SomeConsumerClass # required, a consumer class inherited from Sbmt::KafkaConsumer::BaseConsumer
init_attrs: # optional, consumer class attributes (see below)
key: value
deserializer:
klass: SomeDeserializerClass # optional, default NullDeserializer, a deserializer class inherited from Sbmt::KafkaConsumer::Serialization::NullDeserializer
init_attrs: # optional, deserializer class attributes (see below)
key: value
kafka_options: # optional, this section allows to redefine the root rdkafka options for each topic
auto.offset.reset: latest
skip_on_error
- optional, default false, omit consumer errors in message processing and commit the offset to Kafkamiddlewares
- optional, default [], type String, add middleware before message processing
init_attrs:
middlewares: ['SomeMiddleware']
class SomeMiddleware
def call(message)
yield if message.payload.id.to_i % 2 == 0
end
end
CAUTION:
⚠️ yield
is mandatory for all middleware, as it returns control to theprocess_message
method.
inbox_item
- required, name of the inbox item classevent_name
- optional, default nil, used when the inbox item keep several event typesskip_on_error
- optional, default false, omit consumer errors in message processing and commit the offset to Kafkamiddlewares
- optional, default [], type String, add middleware before message processing
init_attrs:
middlewares: ['SomeMiddleware']
class SomeMiddleware
def call(message)
yield if message.payload.id.to_i % 2 == 0
end
end
CAUTION:
⚠️ yield
is mandatory for all middleware, as it returns control to theprocess_message
method.⚠️ Doesn't work withprocess_batch
.
skip_decoding_error
— don't raise an exception when cannot deserialize the message
In Kubernetes, probes are mechanisms used to assess the health of your application running within a container.
probes:
port: 9394 # optional, default 9394
endpoints:
liveness:
enabled: true # optional, default true
path: /liveness # optional, default "/liveness"
timeout: 10 # optional, default 10, timeout in seconds after which the group is considered dead
readiness:
enabled: true # optional, default true
path: /readiness/kafka_consumer # optional, default "/readiness/kafka_consumer"
We use Yabeda to collect all kind of metrics.
metrics:
port: 9090 # optional, default is probes.port
path: /metrics # optional, default "/metrics"
You can create a Kafkafile
in the root of your app to configure additional settings for your needs.
Example:
require_relative "config/environment"
some-extra-configuration
To process messages in batches, you need to add the process_batch
method in the consumer
# app/consumers/some_consumer.rb
class SomeConsumer < Sbmt::KafkaConsumer::BaseConsumer
def process_batch(messages)
# some code
end
end
CAUTION:
⚠️ Inbox does not support batch insertion.⚠️ If you want to use this feature, you need to process the stack atomically (eg: insert it into clickhouse in one request).
Run the following command to execute a server
kafka_consumer -g some_group_id_1 -g some_group_id_2 -c 5
Where:
-g
-group
, a consumer group id, if not specified, all groups from the config will be processed-c
-concurrency
, a number of threads, default is 4
Concurrency and Multithreading.
Don't forget to properly calculate and set the size of the ActiveRecord connection pool:
- each thread will utilize one db connection from the pool
- an application can have monitoring threads which can use db connections from the pool
Also pay attention to the number of processes of the server:
number_of_processes x concurrency
for topics with high data intensity can be equal to the number of partitions of the consumed topicnumber_sof_processes x concurrency
for topics with low data intensity can be less than the number of partitions of the consumed topic
To test your consumer with Rspec, please use this shared context
require "sbmt/kafka_consumer/testing"
RSpec.describe OrderCreatedConsumer do
include_context "with sbmt karafka consumer"
it "works" do
publish_to_sbmt_karafka(payload, deserializer: deserializer)
expect { consume_with_sbmt_karafka }.to change(Order, :count).by(1)
end
end
require "sbmt/kafka_consumer/testing"
RSpec.describe OrderCreatedConsumer do
include_context "with sbmt karafka consumer"
it "works" do
publish_to_sbmt_karafka_batch(payloads, deserializer: deserializer)
expect { consume_with_sbmt_karafka }.to change(Order, :count).by(1)
end
end
- Prepare environment
dip provision
- Run tests
dip rspec
- Run linter
dip rubocop
- Run Kafka server
dip up
- Run consumer server
dip kafka-consumer