Skip to content

Commit

Permalink
[DEX-2403] feat: When using strategy cooperative-sticky, method mark_…
Browse files Browse the repository at this point in the history
…as_consumed is used
  • Loading branch information
Arlantir committed Jul 30, 2024
1 parent d8090b4 commit 75c42a0
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 4 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

### Fixed

## [2.7.0] - 2024-07-30

### Changed

- When using strategy `cooperative-sticky`, method `mark_as_consumed` is used ([more details](https://github.com/karafka/karafka/wiki/FAQ#why-when-using-cooperative-sticky-rebalance-strategy-all-topics-get-revoked-on-rebalance))

## [2.6.1] - 2024-07-05

### Fixed
Expand Down
20 changes: 18 additions & 2 deletions lib/sbmt/kafka_consumer/base_consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def consume
if process_batch?
with_batch_instrumentation(messages) do
process_batch(messages)
mark_as_consumed!(messages.last)
mark_message(messages.last)
end
else
messages.each do |message|
Expand Down Expand Up @@ -108,7 +108,7 @@ def do_consume(message)
end

with_common_instrumentation("mark_as_consumed", message) do
mark_as_consumed!(message)
mark_message(message)
end
end

Expand Down Expand Up @@ -169,6 +169,22 @@ def call_middlewares(message, middlewares)
def trace_id
@trace_id ||= SecureRandom.base58
end

def config
@config ||= Sbmt::KafkaConsumer::Config.new
end

def cooperative_sticky?
config.partition_assignment_strategy == "cooperative-sticky"
end

def mark_message(message)
if cooperative_sticky?
mark_as_consumed(message)
else
mark_as_consumed!(message)
end
end
end
end
end
2 changes: 1 addition & 1 deletion lib/sbmt/kafka_consumer/version.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@

module Sbmt
module KafkaConsumer
VERSION = "2.6.1"
VERSION = "2.7.0"
end
end
28 changes: 27 additions & 1 deletion spec/sbmt/kafka_consumer/base_consumer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,32 @@ def process_message(_message)
end
end

context "when cooperative_sticky is true" do
before do
allow(consumer).to receive(:cooperative_sticky?).and_return(true)
end

it "calls mark_as_consumed" do
expect(consumer).to receive(:mark_as_consumed).once
expect(consumer).not_to receive(:mark_as_consumed!)

consume_with_sbmt_karafka
end
end

context "when cooperative_sticky is false" do
before do
allow(consumer).to receive(:cooperative_sticky?).and_return(false)
end

it "calls mark_as_consumed!" do
expect(consumer).to receive(:mark_as_consumed!).once.and_call_original
expect(consumer).not_to receive(:mark_as_consumed)

consume_with_sbmt_karafka
end
end

context "when used middlewares" do
let(:consumer_class) do
base_klass = described_class.consumer_klass(middlewares: middlewares)
Expand All @@ -136,7 +162,7 @@ def consumed?

it "calls middleware before processing message" do
expect(consumer).not_to receive(:process_message)
expect(consumer).to receive(:mark_as_consumed!).once.and_call_original
expect(consumer).to receive(:mark_message).once.and_call_original

consume_with_sbmt_karafka
expect(consumer).not_to be_consumed
Expand Down

0 comments on commit 75c42a0

Please sign in to comment.