Skip to content

Kuper-Tech/sbmt-kafka_consumer

Repository files navigation

Gem Version Build Status

Sbmt-KafkaConsumer

This gem is used to consume Kafka messages. It is a wrapper over the Karafka gem, and is recommended for use as a transport with the sbmt-outbox gem.

Installation

Add this line to your application's Gemfile:

gem "sbmt-kafka_consumer"

And then execute:

bundle install

Demo

Learn how to use this gem and how it works with Ruby on Rails at here https://github.com/SberMarket-Tech/outbox-example-apps

Auto configuration

We recommend going through the configuration and file creation process using the following Rails generators. Each generator can be run by using the --help option to learn more about the available arguments.

Initial configuration

If you plug the gem into your application for the first time, you can generate the initial configuration:

rails g kafka_consumer:install

As the result, the config/kafka_consumer.yml file will be created.

Consumer class

A consumer class can be generated with the following command:

rails g kafka_consumer:consumer MaybeNamespaced::Name

Inbox consumer

To generate an Inbox consumer for use with gem sbmt-outbox, run the following command:

rails g kafka_consumer:inbox_consumer MaybeNamespaced::Name some-consumer-group some-topic

Manual configuration

The config/kafka_consumer.yml file is a main configuration for the gem.

Example config with a full set of options:

default: &default
  client_id: "my-app-consumer"
  concurrency: 4 # max number of threads
  # optional Karafka options
  max_wait_time: 1
  shutdown_timeout: 60
  pause_timeout: 1
  pause_max_timeout: 30
  pause_with_exponential_backoff: true
  partition_assignment_strategy: cooperative-sticky
  auth:
    kind: plaintext
  kafka:
    servers: "kafka:9092"
    # optional Kafka options
    heartbeat_timeout: 5
    session_timeout: 30
    reconnect_timeout: 3
    connect_timeout: 5
    socket_timeout: 30
    kafka_options:
      allow.auto.create.topics: true
  probes: # optional section
    port: 9394
    endpoints:
      readiness:
        enabled: true
        path: "/readiness"
      liveness:
        enabled: true
        path: "/liveness"
        timeout: 15
        max_error_count: 15 # default 10
  metrics: # optional section
    port: 9090
    path: "/metrics"
  consumer_groups:
    group_ref_id_1:
      name: cg_with_single_topic
      topics:
        - name: topic_with_inbox_items
          consumer:
            klass: "Sbmt::KafkaConsumer::InboxConsumer"
            init_attrs:
              name: "test_items"
              inbox_item: "TestInboxItem"
          deserializer:
            klass: "Sbmt::KafkaConsumer::Serialization::NullDeserializer"
          kafka_options:
            auto.offset.reset: latest
    group_ref_id_2:
      name: cg_with_multiple_topics
      topics:
        - name: topic_with_json_data
          consumer:
            klass: "SomeConsumer"
          deserializer:
            klass: "Sbmt::KafkaConsumer::Serialization::JsonDeserializer"
        - name: topic_with_protobuf_data
          consumer:
            klass: "SomeConsumer"
          deserializer:
            klass: "Sbmt::KafkaConsumer::Serialization::ProtobufDeserializer"
            init_attrs:
              message_decoder_klass: "SomeDecoder"
              skip_decoding_error: true

development:
  <<: *default

test:
  <<: *default
  deliver: false

production:
  <<: *default

auth config section

The gem supports 2 variants: plaintext (default) and SASL-plaintext

SASL-plaintext:

auth:
  kind: sasl_plaintext
  sasl_username: user
  sasl_password: pwd
  sasl_mechanism: SCRAM-SHA-512

kafka config section

The servers key is required and should be in rdkafka format: without kafka:// prefix, for example: srv1:port1,srv2:port2,....

The kafka_config section may contain any rdkafka option. Also, kafka_options may be redefined for each topic. Please note that the partition.assignment.strategy option within kafka_options is not supported for topics; instead, use the global option partition_assignment_strategy.

consumer_groups config section

consumer_groups:
  # group id can be used when starting a consumer process (see CLI section below)
  group_id:
    name: some_group_name # required
    topics:
    - name: some_topic_name # required
      active: true # optional, default true
      consumer:
        klass: SomeConsumerClass # required, a consumer class inherited from Sbmt::KafkaConsumer::BaseConsumer
        init_attrs: # optional, consumer class attributes (see below)
          key: value
      deserializer:
        klass: SomeDeserializerClass # optional, default NullDeserializer, a deserializer class inherited from Sbmt::KafkaConsumer::Serialization::NullDeserializer
        init_attrs: # optional, deserializer class attributes (see below)
          key: value
      kafka_options: # optional, this section allows to redefine the root rdkafka options for each topic
        auto.offset.reset: latest

consumer.init_attrs options for BaseConsumer

  • skip_on_error - optional, default false, omit consumer errors in message processing and commit the offset to Kafka
  • middlewares - optional, default [], type String, add middleware before message processing
init_attrs:
  middlewares: ['SomeMiddleware']
class SomeMiddleware
  def call(message)
    yield if message.payload.id.to_i % 2 == 0
  end
end

CAUTION:

  • ⚠️ yield is mandatory for all middleware, as it returns control to the process_message method.

consumer.init_attrs options for InboxConsumer

  • inbox_item - required, name of the inbox item class
  • event_name - optional, default nil, used when the inbox item keep several event types
  • skip_on_error - optional, default false, omit consumer errors in message processing and commit the offset to Kafka
  • middlewares - optional, default [], type String, add middleware before message processing
init_attrs:
  middlewares: ['SomeMiddleware']
class SomeMiddleware
  def call(message)
    yield if message.payload.id.to_i % 2 == 0
  end
end

CAUTION:

  • ⚠️ yield is mandatory for all middleware, as it returns control to the process_message method.
  • ⚠️ Doesn't work with process_batch.

deserializer.init_attrs options

  • skip_decoding_error — don't raise an exception when cannot deserialize the message

probes config section

In Kubernetes, probes are mechanisms used to assess the health of your application running within a container.

probes:
  port: 9394 # optional, default 9394
  endpoints:
    liveness:
      enabled: true # optional, default true
      path: /liveness # optional, default "/liveness"
      timeout: 10 # optional, default 10, timeout in seconds after which the group is considered dead
    readiness:
      enabled: true # optional, default true
      path: /readiness/kafka_consumer # optional, default "/readiness/kafka_consumer"

metrics config section

We use Yabeda to collect all kind of metrics.

metrics:
  port: 9090 # optional, default is probes.port
  path: /metrics # optional, default "/metrics"

Kafkafile

You can create a Kafkafile in the root of your app to configure additional settings for your needs.

Example:

require_relative "config/environment"

some-extra-configuration

Process batch

To process messages in batches, you need to add the process_batch method in the consumer

# app/consumers/some_consumer.rb
class SomeConsumer < Sbmt::KafkaConsumer::BaseConsumer
  def process_batch(messages)
    # some code 
  end
end

CAUTION:

  • ⚠️ Inbox does not support batch insertion.
  • ⚠️ If you want to use this feature, you need to process the stack atomically (eg: insert it into clickhouse in one request).

CLI

Run the following command to execute a server

kafka_consumer -g some_group_id_1 -g some_group_id_2 -c 5

Where:

  • -g - group, a consumer group id, if not specified, all groups from the config will be processed
  • -c - concurrency, a number of threads, default is 4

concurrency argument

Concurrency and Multithreading.

Don't forget to properly calculate and set the size of the ActiveRecord connection pool:

  • each thread will utilize one db connection from the pool
  • an application can have monitoring threads which can use db connections from the pool

Also pay attention to the number of processes of the server:

  • number_of_processes x concurrency for topics with high data intensity can be equal to the number of partitions of the consumed topic
  • number_sof_processes x concurrency for topics with low data intensity can be less than the number of partitions of the consumed topic

Testing

To test your consumer with Rspec, please use this shared context

for payload

require "sbmt/kafka_consumer/testing"

RSpec.describe OrderCreatedConsumer do
  include_context "with sbmt karafka consumer"

  it "works" do
    publish_to_sbmt_karafka(payload, deserializer: deserializer)
    expect { consume_with_sbmt_karafka }.to change(Order, :count).by(1)
  end
end

for payloads

require "sbmt/kafka_consumer/testing"

RSpec.describe OrderCreatedConsumer do
  include_context "with sbmt karafka consumer"

  it "works" do
    publish_to_sbmt_karafka_batch(payloads, deserializer: deserializer)
    expect { consume_with_sbmt_karafka }.to change(Order, :count).by(1)
  end
end

Development

  1. Prepare environment
dip provision
  1. Run tests
dip rspec
  1. Run linter
dip rubocop
  1. Run Kafka server
dip up
  1. Run consumer server
dip kafka-consumer