Skip to content

Commit

Permalink
[DEX-2268] feat: support karafka 2.4+
Browse files Browse the repository at this point in the history
  • Loading branch information
ysatarov committed Sep 5, 2024
1 parent 7e6b685 commit 6f8fb74
Show file tree
Hide file tree
Showing 19 changed files with 170 additions and 130 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:
strategy:
fail-fast: false
matrix:
ruby: [ '2.7', '3.0', '3.1', '3.2', '3.3' ]
ruby: [ '3.0', '3.1', '3.2', '3.3' ]
env:
RUBY_VERSION: ${{ matrix.ruby }}
name: Ruby ${{ matrix.ruby }}
Expand Down
2 changes: 1 addition & 1 deletion .gitlab-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ tests:
image: ${BUILD_CONF_HARBOR_REGISTRY}/dhub/library/ruby:$RUBY_VERSION
parallel:
matrix:
- RUBY_VERSION: ['2.7', '3.0', '3.1', '3.2', '3.3']
- RUBY_VERSION: ['3.0', '3.1', '3.2', '3.3']
services:
- name: ${BUILD_CONF_HARBOR_REGISTRY}/dhub/library/postgres:13
alias: postgres
Expand Down
4 changes: 2 additions & 2 deletions Appraisals
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
# See compatibility table at https://www.fastruby.io/blog/ruby/rails/versions/compatibility-table.html

versions_map = {
"6.0" => %w[2.7],
"6.1" => %w[2.7 3.0],
"7.0" => %w[3.1],
"7.1" => %w[3.2, 3.3]
"7.1" => %w[3.2, 3.3],
"7.2" => %w[3.3]
}

current_ruby_version = RUBY_VERSION.split(".").first(2).join(".")
Expand Down
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,18 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

### Fixed

## [3.0.0] - 2024-09-04

## BREAKING

- Drop support for Ruby 2.7
- Drop support for Rails 6.0
- Add support for Karafka 2.4

### Fixed

- Support consumer group mappers to support backward compatibility of consumer group naming

## [2.7.1] - 2024-08-01

### Fixed
Expand Down
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
MIT License

Copyright (c) 2024 SberMarket Tech
Copyright (c) 2024 Kuper Tech

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
Expand Down
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
[![Gem Version](https://badge.fury.io/rb/sbmt-kafka_consumer.svg)](https://badge.fury.io/rb/sbmt-kafka_consumer)
[![Build Status](https://github.com/SberMarket-Tech/sbmt-kafka_consumer/actions/workflows/tests.yml/badge.svg?branch=master)](https://github.com/SberMarket-Tech/sbmt-kafka_consumer/actions?query=branch%3Amaster)
[![Build Status](https://github.com/Kuper-Tech/sbmt-kafka_consumer/actions/workflows/tests.yml/badge.svg?branch=master)](https://github.com/Kuper-Tech/sbmt-kafka_consumer/actions?query=branch%3Amaster)

# Sbmt-KafkaConsumer

This gem is used to consume Kafka messages. It is a wrapper over the [Karafka](https://github.com/karafka/karafka) gem, and is recommended for use as a transport with the [sbmt-outbox](https://github.com/SberMarket-Tech/sbmt-outbox) gem.
This gem is used to consume Kafka messages. It is a wrapper over the [Karafka](https://github.com/karafka/karafka) gem, and is recommended for use as a transport with the [sbmt-outbox](https://github.com/Kuper-Tech/sbmt-outbox) gem.

## Installation

Expand All @@ -21,7 +21,7 @@ bundle install

## Demo

Learn how to use this gem and how it works with Ruby on Rails at here https://github.com/SberMarket-Tech/outbox-example-apps
Learn how to use this gem and how it works with Ruby on Rails at here https://github.com/Kuper-Tech/outbox-example-apps

## Auto configuration

Expand All @@ -47,7 +47,7 @@ rails g kafka_consumer:consumer MaybeNamespaced::Name

### Inbox consumer

To generate an Inbox consumer for use with gem [sbmt-outbox](https://github.com/SberMarket-Tech/sbmt-outbox), run the following command:
To generate an Inbox consumer for use with gem [sbmt-outbox](https://github.com/Kuper-Tech/sbmt-outbox), run the following command:

```shell
rails g kafka_consumer:inbox_consumer MaybeNamespaced::Name some-consumer-group some-topic
Expand Down
6 changes: 3 additions & 3 deletions dip.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
version: '7'

environment:
RUBY_VERSION: '3.2'
RUBY_VERSION: '3.3'

compose:
files:
Expand Down Expand Up @@ -35,14 +35,14 @@ interaction:
subcommands:
all:
command: bundle exec appraisal rspec
rails-6.0:
command: bundle exec appraisal rails-6.0 rspec
rails-6.1:
command: bundle exec appraisal rails-6.1 rspec
rails-7.0:
command: bundle exec appraisal rails-7.0 rspec
rails-7.1:
command: bundle exec appraisal rails-7.1 rspec
rails-7.2:
command: bundle exec appraisal rails-7.2 rspec

rubocop:
description: Run Ruby linter
Expand Down
9 changes: 5 additions & 4 deletions lib/sbmt/kafka_consumer/client_configurer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@ def self.configure!(**opts)
Karafka::App.setup do |karafka_config|
karafka_config.monitor = config.monitor_class.classify.constantize.new
karafka_config.logger = Sbmt::KafkaConsumer.logger
karafka_config.deserializer = config.deserializer_class.classify.constantize.new

karafka_config.client_id = config.client_id
karafka_config.consumer_mapper = config.consumer_mapper_class.classify.constantize.new
karafka_config.kafka = config.to_kafka_options

karafka_config.pause_timeout = config.pause_timeout * 1_000 if config.pause_timeout.present?
Expand Down Expand Up @@ -43,13 +41,16 @@ def self.configure!(**opts)

raise "No configured consumer groups found, exiting" if target_consumer_groups.blank?

consumer_mapper = config.consumer_mapper_class.classify.constantize.new

# clear routes in case CLI runner tries to reconfigure them
# but railtie initializer had already executed and did the same
# otherwise we'll get duplicate routes error from sbmt-karafka internal config validation process
Karafka::App.routes.clear
Karafka::App.routes.draw do
target_consumer_groups.each do |cg|
consumer_group cg.name do
group_id = consumer_mapper.call(cg.name)
consumer_group group_id do
cg.topics.each do |t|
topic t.name do
active t.active
Expand All @@ -66,7 +67,7 @@ def self.configure!(**opts)

def self.routes
Karafka::App.routes.map do |cg|
topics = cg.topics.map { |t| {name: t.name, deserializer: t.deserializer} }
topics = cg.topics.map { |t| {name: t.name, deserializer: t.deserializers.payload} }
{group: cg.id, topics: topics}
end
end
Expand Down
17 changes: 17 additions & 0 deletions lib/sbmt/kafka_consumer/routing/consumer_mapper/base.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# frozen_string_literal: true

module Sbmt
module KafkaConsumer
module Routing
module ConsumerMapper
class Base
# @param raw_consumer_group_name [String, Symbol] string or symbolized consumer group name
# @return [String] remapped final consumer group name
def call(raw_consumer_group_name)
raise "Implement #call in a subclass"
end
end
end
end
end
end
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
# frozen_string_literal: true

require_relative "consumer_mapper/base"

module Sbmt
module KafkaConsumer
module Routing
class KarafkaV1ConsumerMapper < Karafka::Routing::ConsumerMapper
class KarafkaV1ConsumerMapper < ConsumerMapper::Base
# karafka v1 consumer group name mapper
def call(raw_consumer_group_name)
client_id = ActiveSupport::Inflector.underscore(Karafka::App.config.client_id).tr("/", "_")
"#{client_id}_#{raw_consumer_group_name}"
Expand Down
13 changes: 10 additions & 3 deletions lib/sbmt/kafka_consumer/routing/karafka_v2_consumer_mapper.rb
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
# frozen_string_literal: true

require_relative "consumer_mapper/base"

module Sbmt
module KafkaConsumer
module Routing
# uses default karafka v2 mapper
# exists just for naming consistency with KarafkaV1ConsumerMapper
class KarafkaV2ConsumerMapper < Karafka::Routing::ConsumerMapper; end
# karafka v2 (before 2.4) consumer group name mapper
class KarafkaV2ConsumerMapper < ConsumerMapper::Base
def call(raw_consumer_group_name)
"#{Karafka::App.config.client_id}_#{raw_consumer_group_name}"
end
end
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

RSpec.shared_context "with sbmt karafka consumer" do
subject(:consume_with_sbmt_karafka) do
coordinator.increment
coordinator.increment(:consume)
consumer.on_consume
end

Expand All @@ -28,27 +28,27 @@
}

def publish_to_sbmt_karafka(raw_payload, opts = {})
message = Karafka::Messages::Message.new(raw_payload, Karafka::Messages::Metadata.new(metadata_defaults.merge(opts)))
message = Karafka::Messages::Message.new(raw_payload, Karafka::Messages::Metadata.new(build_metadata_hash(opts)))
consumer.messages = consumer_messages([message])
end

def publish_to_sbmt_karafka_batch(raw_payloads, opts = {})
messages = raw_payloads.map do |p|
Karafka::Messages::Message.new(p, Karafka::Messages::Metadata.new(metadata_defaults.merge(opts)))
Karafka::Messages::Message.new(p, Karafka::Messages::Metadata.new(build_metadata_hash(opts)))
end
consumer.messages = consumer_messages(messages)
end

# @return [Hash] message default options
def metadata_defaults
def build_metadata_hash(opts)
{
deserializer: null_deserializer,
headers: {},
key: nil,
offset: 0,
partition: 0,
received_at: Time.current,
topic: test_topic.name
deserializers: test_topic.deserializers(payload: opts[:deserializer] || null_deserializer),
raw_headers: opts[:headers] || {},
raw_key: opts[:key],
offset: opts[:offset] || 0,
partition: opts[:partition] || 0,
received_at: opts[:received_at] || Time.current,
topic: opts[:topic] || test_topic.name
}
end

Expand Down
2 changes: 1 addition & 1 deletion lib/sbmt/kafka_consumer/version.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@

module Sbmt
module KafkaConsumer
VERSION = "2.7.1"
VERSION = "3.0.0"
end
end
10 changes: 5 additions & 5 deletions sbmt-kafka_consumer.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ Gem::Specification.new do |spec|
spec.name = "sbmt-kafka_consumer"
spec.license = "MIT"
spec.version = Sbmt::KafkaConsumer::VERSION
spec.authors = ["Sbermarket Ruby-Platform Team"]
spec.authors = ["Kuper Ruby-Platform Team"]

spec.summary = "Ruby gem for consuming Kafka messages"
spec.description = "This gem is used for consuming Kafka messages. It represents a wrapper over Karafka gem and is recommended for using as a transport with sbmt-outbox"
spec.homepage = "https://github.com/SberMarket-Tech/sbmt-kafka_consumer"
spec.required_ruby_version = ">= 2.7.0"
spec.homepage = "https://github.com/Kuper-Tech/sbmt-kafka_consumer"
spec.required_ruby_version = ">= 3.0.0"

spec.metadata["allowed_push_host"] = "https://rubygems.org"

Expand All @@ -31,9 +31,9 @@ Gem::Specification.new do |spec|
spec.executables = spec.files.grep(%r{\Aexe/}) { |f| File.basename(f) }
spec.require_paths = ["lib"]

spec.add_dependency "rails", ">= 6.0"
spec.add_dependency "rails", ">= 6.1"
spec.add_dependency "zeitwerk", "~> 2.3"
spec.add_dependency "karafka", "~> 2.2", "< 2.4" # [Breaking] Drop the concept of consumer group mapping.
spec.add_dependency "karafka", "~> 2.4"
spec.add_dependency "yabeda", ">= 0.11"
spec.add_dependency "anyway_config", ">= 2.4.0"
spec.add_dependency "thor"
Expand Down
6 changes: 5 additions & 1 deletion spec/factories/karafka/batch_metadata.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@
sequence(:last_offset) { |nr| nr }
topic { "topic" }
partition { 0 }
deserializer { ->(message) { message.raw_payload } }
deserializers {
{
payload: ->(message) { message.raw_payload }
}
}
created_at { Time.now.utc }
scheduled_at { Time.now.utc }
processed_at { Time.now.utc }
Expand Down
6 changes: 5 additions & 1 deletion spec/factories/karafka/metadata.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@
topic { "topic" }
sequence(:offset) { |nr| nr }
partition { 0 }
deserializer { ->(message) { message.raw_payload } }
deserializers {
{
payload: ->(message) { message.raw_payload }
}
}
timestamp { Time.now.utc }
end
end
4 changes: 2 additions & 2 deletions spec/internal/config/initializers/protobuf.rb
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
# frozen_string_literal: true

%w[local vendor].each do |dir|
Dir[Rails.root.join("protobuf", dir, "compile/**/*.rb")].sort.each do |file|
Rails.root.glob("protobuf/#{dir}/compile/**/*.rb").sort.each do |file|
# Supress messages about downcased letters in constants
Kernel.silence_warnings do
require_relative(file)
end
end
end

Dir[Rails.root.join("pkg/**/*.rb")].sort.each do |file|
Rails.root.glob("pkg/**/*.rb").sort.each do |file|
# Supress messages about downcased letters in constants
Kernel.silence_warnings do
require_relative(file)
Expand Down
Loading

0 comments on commit 6f8fb74

Please sign in to comment.