Skip to content

Commit 49fd804

Browse files
committed
Merge branch 'feat/DEX-2268/karafka-2.4-support' into 'master'
[DEX-2268] feat: support karafka 2.4+ Closes DEX-2268 See merge request nstmrt/rubygems/sbmt-kafka_consumer!65
2 parents 7e6b685 + 6f8fb74 commit 49fd804

19 files changed

+170
-130
lines changed

.github/workflows/tests.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ jobs:
3131
strategy:
3232
fail-fast: false
3333
matrix:
34-
ruby: [ '2.7', '3.0', '3.1', '3.2', '3.3' ]
34+
ruby: [ '3.0', '3.1', '3.2', '3.3' ]
3535
env:
3636
RUBY_VERSION: ${{ matrix.ruby }}
3737
name: Ruby ${{ matrix.ruby }}

.gitlab-ci.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ tests:
1515
image: ${BUILD_CONF_HARBOR_REGISTRY}/dhub/library/ruby:$RUBY_VERSION
1616
parallel:
1717
matrix:
18-
- RUBY_VERSION: ['2.7', '3.0', '3.1', '3.2', '3.3']
18+
- RUBY_VERSION: ['3.0', '3.1', '3.2', '3.3']
1919
services:
2020
- name: ${BUILD_CONF_HARBOR_REGISTRY}/dhub/library/postgres:13
2121
alias: postgres

Appraisals

+2-2
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@
33
# See compatibility table at https://www.fastruby.io/blog/ruby/rails/versions/compatibility-table.html
44

55
versions_map = {
6-
"6.0" => %w[2.7],
76
"6.1" => %w[2.7 3.0],
87
"7.0" => %w[3.1],
9-
"7.1" => %w[3.2, 3.3]
8+
"7.1" => %w[3.2, 3.3],
9+
"7.2" => %w[3.3]
1010
}
1111

1212
current_ruby_version = RUBY_VERSION.split(".").first(2).join(".")

CHANGELOG.md

+12
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,18 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
1313

1414
### Fixed
1515

16+
## [3.0.0] - 2024-09-04
17+
18+
## BREAKING
19+
20+
- Drop support for Ruby 2.7
21+
- Drop support for Rails 6.0
22+
- Add support for Karafka 2.4
23+
24+
### Fixed
25+
26+
- Support consumer group mappers to support backward compatibility of consumer group naming
27+
1628
## [2.7.1] - 2024-08-01
1729

1830
### Fixed

LICENSE

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
MIT License
22

3-
Copyright (c) 2024 SberMarket Tech
3+
Copyright (c) 2024 Kuper Tech
44

55
Permission is hereby granted, free of charge, to any person obtaining a copy
66
of this software and associated documentation files (the "Software"), to deal

README.md

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
[![Gem Version](https://badge.fury.io/rb/sbmt-kafka_consumer.svg)](https://badge.fury.io/rb/sbmt-kafka_consumer)
2-
[![Build Status](https://github.com/SberMarket-Tech/sbmt-kafka_consumer/actions/workflows/tests.yml/badge.svg?branch=master)](https://github.com/SberMarket-Tech/sbmt-kafka_consumer/actions?query=branch%3Amaster)
2+
[![Build Status](https://github.com/Kuper-Tech/sbmt-kafka_consumer/actions/workflows/tests.yml/badge.svg?branch=master)](https://github.com/Kuper-Tech/sbmt-kafka_consumer/actions?query=branch%3Amaster)
33

44
# Sbmt-KafkaConsumer
55

6-
This gem is used to consume Kafka messages. It is a wrapper over the [Karafka](https://github.com/karafka/karafka) gem, and is recommended for use as a transport with the [sbmt-outbox](https://github.com/SberMarket-Tech/sbmt-outbox) gem.
6+
This gem is used to consume Kafka messages. It is a wrapper over the [Karafka](https://github.com/karafka/karafka) gem, and is recommended for use as a transport with the [sbmt-outbox](https://github.com/Kuper-Tech/sbmt-outbox) gem.
77

88
## Installation
99

@@ -21,7 +21,7 @@ bundle install
2121

2222
## Demo
2323

24-
Learn how to use this gem and how it works with Ruby on Rails at here https://github.com/SberMarket-Tech/outbox-example-apps
24+
Learn how to use this gem and how it works with Ruby on Rails at here https://github.com/Kuper-Tech/outbox-example-apps
2525

2626
## Auto configuration
2727

@@ -47,7 +47,7 @@ rails g kafka_consumer:consumer MaybeNamespaced::Name
4747

4848
### Inbox consumer
4949

50-
To generate an Inbox consumer for use with gem [sbmt-outbox](https://github.com/SberMarket-Tech/sbmt-outbox), run the following command:
50+
To generate an Inbox consumer for use with gem [sbmt-outbox](https://github.com/Kuper-Tech/sbmt-outbox), run the following command:
5151

5252
```shell
5353
rails g kafka_consumer:inbox_consumer MaybeNamespaced::Name some-consumer-group some-topic

dip.yml

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
version: '7'
22

33
environment:
4-
RUBY_VERSION: '3.2'
4+
RUBY_VERSION: '3.3'
55

66
compose:
77
files:
@@ -35,14 +35,14 @@ interaction:
3535
subcommands:
3636
all:
3737
command: bundle exec appraisal rspec
38-
rails-6.0:
39-
command: bundle exec appraisal rails-6.0 rspec
4038
rails-6.1:
4139
command: bundle exec appraisal rails-6.1 rspec
4240
rails-7.0:
4341
command: bundle exec appraisal rails-7.0 rspec
4442
rails-7.1:
4543
command: bundle exec appraisal rails-7.1 rspec
44+
rails-7.2:
45+
command: bundle exec appraisal rails-7.2 rspec
4646

4747
rubocop:
4848
description: Run Ruby linter

lib/sbmt/kafka_consumer/client_configurer.rb

+5-4
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,8 @@ def self.configure!(**opts)
66
Karafka::App.setup do |karafka_config|
77
karafka_config.monitor = config.monitor_class.classify.constantize.new
88
karafka_config.logger = Sbmt::KafkaConsumer.logger
9-
karafka_config.deserializer = config.deserializer_class.classify.constantize.new
109

1110
karafka_config.client_id = config.client_id
12-
karafka_config.consumer_mapper = config.consumer_mapper_class.classify.constantize.new
1311
karafka_config.kafka = config.to_kafka_options
1412

1513
karafka_config.pause_timeout = config.pause_timeout * 1_000 if config.pause_timeout.present?
@@ -43,13 +41,16 @@ def self.configure!(**opts)
4341

4442
raise "No configured consumer groups found, exiting" if target_consumer_groups.blank?
4543

44+
consumer_mapper = config.consumer_mapper_class.classify.constantize.new
45+
4646
# clear routes in case CLI runner tries to reconfigure them
4747
# but railtie initializer had already executed and did the same
4848
# otherwise we'll get duplicate routes error from sbmt-karafka internal config validation process
4949
Karafka::App.routes.clear
5050
Karafka::App.routes.draw do
5151
target_consumer_groups.each do |cg|
52-
consumer_group cg.name do
52+
group_id = consumer_mapper.call(cg.name)
53+
consumer_group group_id do
5354
cg.topics.each do |t|
5455
topic t.name do
5556
active t.active
@@ -66,7 +67,7 @@ def self.configure!(**opts)
6667

6768
def self.routes
6869
Karafka::App.routes.map do |cg|
69-
topics = cg.topics.map { |t| {name: t.name, deserializer: t.deserializer} }
70+
topics = cg.topics.map { |t| {name: t.name, deserializer: t.deserializers.payload} }
7071
{group: cg.id, topics: topics}
7172
end
7273
end
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
# frozen_string_literal: true
2+
3+
module Sbmt
4+
module KafkaConsumer
5+
module Routing
6+
module ConsumerMapper
7+
class Base
8+
# @param raw_consumer_group_name [String, Symbol] string or symbolized consumer group name
9+
# @return [String] remapped final consumer group name
10+
def call(raw_consumer_group_name)
11+
raise "Implement #call in a subclass"
12+
end
13+
end
14+
end
15+
end
16+
end
17+
end

lib/sbmt/kafka_consumer/routing/karafka_v1_consumer_mapper.rb

+6-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
1+
# frozen_string_literal: true
2+
3+
require_relative "consumer_mapper/base"
4+
15
module Sbmt
26
module KafkaConsumer
37
module Routing
4-
class KarafkaV1ConsumerMapper < Karafka::Routing::ConsumerMapper
8+
class KarafkaV1ConsumerMapper < ConsumerMapper::Base
9+
# karafka v1 consumer group name mapper
510
def call(raw_consumer_group_name)
611
client_id = ActiveSupport::Inflector.underscore(Karafka::App.config.client_id).tr("/", "_")
712
"#{client_id}_#{raw_consumer_group_name}"
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,16 @@
1+
# frozen_string_literal: true
2+
3+
require_relative "consumer_mapper/base"
4+
15
module Sbmt
26
module KafkaConsumer
37
module Routing
4-
# uses default karafka v2 mapper
5-
# exists just for naming consistency with KarafkaV1ConsumerMapper
6-
class KarafkaV2ConsumerMapper < Karafka::Routing::ConsumerMapper; end
8+
# karafka v2 (before 2.4) consumer group name mapper
9+
class KarafkaV2ConsumerMapper < ConsumerMapper::Base
10+
def call(raw_consumer_group_name)
11+
"#{Karafka::App.config.client_id}_#{raw_consumer_group_name}"
12+
end
13+
end
714
end
815
end
916
end

lib/sbmt/kafka_consumer/testing/shared_contexts/with_sbmt_karafka_consumer.rb

+11-11
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
RSpec.shared_context "with sbmt karafka consumer" do
44
subject(:consume_with_sbmt_karafka) do
5-
coordinator.increment
5+
coordinator.increment(:consume)
66
consumer.on_consume
77
end
88

@@ -28,27 +28,27 @@
2828
}
2929

3030
def publish_to_sbmt_karafka(raw_payload, opts = {})
31-
message = Karafka::Messages::Message.new(raw_payload, Karafka::Messages::Metadata.new(metadata_defaults.merge(opts)))
31+
message = Karafka::Messages::Message.new(raw_payload, Karafka::Messages::Metadata.new(build_metadata_hash(opts)))
3232
consumer.messages = consumer_messages([message])
3333
end
3434

3535
def publish_to_sbmt_karafka_batch(raw_payloads, opts = {})
3636
messages = raw_payloads.map do |p|
37-
Karafka::Messages::Message.new(p, Karafka::Messages::Metadata.new(metadata_defaults.merge(opts)))
37+
Karafka::Messages::Message.new(p, Karafka::Messages::Metadata.new(build_metadata_hash(opts)))
3838
end
3939
consumer.messages = consumer_messages(messages)
4040
end
4141

4242
# @return [Hash] message default options
43-
def metadata_defaults
43+
def build_metadata_hash(opts)
4444
{
45-
deserializer: null_deserializer,
46-
headers: {},
47-
key: nil,
48-
offset: 0,
49-
partition: 0,
50-
received_at: Time.current,
51-
topic: test_topic.name
45+
deserializers: test_topic.deserializers(payload: opts[:deserializer] || null_deserializer),
46+
raw_headers: opts[:headers] || {},
47+
raw_key: opts[:key],
48+
offset: opts[:offset] || 0,
49+
partition: opts[:partition] || 0,
50+
received_at: opts[:received_at] || Time.current,
51+
topic: opts[:topic] || test_topic.name
5252
}
5353
end
5454

lib/sbmt/kafka_consumer/version.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,6 @@
22

33
module Sbmt
44
module KafkaConsumer
5-
VERSION = "2.7.1"
5+
VERSION = "3.0.0"
66
end
77
end

sbmt-kafka_consumer.gemspec

+5-5
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,12 @@ Gem::Specification.new do |spec|
66
spec.name = "sbmt-kafka_consumer"
77
spec.license = "MIT"
88
spec.version = Sbmt::KafkaConsumer::VERSION
9-
spec.authors = ["Sbermarket Ruby-Platform Team"]
9+
spec.authors = ["Kuper Ruby-Platform Team"]
1010

1111
spec.summary = "Ruby gem for consuming Kafka messages"
1212
spec.description = "This gem is used for consuming Kafka messages. It represents a wrapper over Karafka gem and is recommended for using as a transport with sbmt-outbox"
13-
spec.homepage = "https://github.com/SberMarket-Tech/sbmt-kafka_consumer"
14-
spec.required_ruby_version = ">= 2.7.0"
13+
spec.homepage = "https://github.com/Kuper-Tech/sbmt-kafka_consumer"
14+
spec.required_ruby_version = ">= 3.0.0"
1515

1616
spec.metadata["allowed_push_host"] = "https://rubygems.org"
1717

@@ -31,9 +31,9 @@ Gem::Specification.new do |spec|
3131
spec.executables = spec.files.grep(%r{\Aexe/}) { |f| File.basename(f) }
3232
spec.require_paths = ["lib"]
3333

34-
spec.add_dependency "rails", ">= 6.0"
34+
spec.add_dependency "rails", ">= 6.1"
3535
spec.add_dependency "zeitwerk", "~> 2.3"
36-
spec.add_dependency "karafka", "~> 2.2", "< 2.4" # [Breaking] Drop the concept of consumer group mapping.
36+
spec.add_dependency "karafka", "~> 2.4"
3737
spec.add_dependency "yabeda", ">= 0.11"
3838
spec.add_dependency "anyway_config", ">= 2.4.0"
3939
spec.add_dependency "thor"

spec/factories/karafka/batch_metadata.rb

+5-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,11 @@
99
sequence(:last_offset) { |nr| nr }
1010
topic { "topic" }
1111
partition { 0 }
12-
deserializer { ->(message) { message.raw_payload } }
12+
deserializers {
13+
{
14+
payload: ->(message) { message.raw_payload }
15+
}
16+
}
1317
created_at { Time.now.utc }
1418
scheduled_at { Time.now.utc }
1519
processed_at { Time.now.utc }

spec/factories/karafka/metadata.rb

+5-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,11 @@
77
topic { "topic" }
88
sequence(:offset) { |nr| nr }
99
partition { 0 }
10-
deserializer { ->(message) { message.raw_payload } }
10+
deserializers {
11+
{
12+
payload: ->(message) { message.raw_payload }
13+
}
14+
}
1115
timestamp { Time.now.utc }
1216
end
1317
end

spec/internal/config/initializers/protobuf.rb

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
# frozen_string_literal: true
22

33
%w[local vendor].each do |dir|
4-
Dir[Rails.root.join("protobuf", dir, "compile/**/*.rb")].sort.each do |file|
4+
Rails.root.glob("protobuf/#{dir}/compile/**/*.rb").sort.each do |file|
55
# Supress messages about downcased letters in constants
66
Kernel.silence_warnings do
77
require_relative(file)
88
end
99
end
1010
end
1111

12-
Dir[Rails.root.join("pkg/**/*.rb")].sort.each do |file|
12+
Rails.root.glob("pkg/**/*.rb").sort.each do |file|
1313
# Supress messages about downcased letters in constants
1414
Kernel.silence_warnings do
1515
require_relative(file)

0 commit comments

Comments
 (0)