From 75c42a01e1e5bb59caa03edf6887ba011ce9620e Mon Sep 17 00:00:00 2001 From: "denis.tarasenko" Date: Tue, 30 Jul 2024 19:12:37 +0300 Subject: [PATCH] [DEX-2403] feat: When using strategy cooperative-sticky, method mark_as_consumed is used --- CHANGELOG.md | 6 ++++ lib/sbmt/kafka_consumer/base_consumer.rb | 20 +++++++++++-- lib/sbmt/kafka_consumer/version.rb | 2 +- .../sbmt/kafka_consumer/base_consumer_spec.rb | 28 ++++++++++++++++++- 4 files changed, 52 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ab9561a..6ab3d58 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,12 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ### Fixed +## [2.7.0] - 2024-07-30 + +### Changed + +- When using strategy `cooperative-sticky`, method `mark_as_consumed` is used ([more details](https://github.com/karafka/karafka/wiki/FAQ#why-when-using-cooperative-sticky-rebalance-strategy-all-topics-get-revoked-on-rebalance)) + ## [2.6.1] - 2024-07-05 ### Fixed diff --git a/lib/sbmt/kafka_consumer/base_consumer.rb b/lib/sbmt/kafka_consumer/base_consumer.rb index a87baf8..f1f4e68 100644 --- a/lib/sbmt/kafka_consumer/base_consumer.rb +++ b/lib/sbmt/kafka_consumer/base_consumer.rb @@ -19,7 +19,7 @@ def consume if process_batch? with_batch_instrumentation(messages) do process_batch(messages) - mark_as_consumed!(messages.last) + mark_message(messages.last) end else messages.each do |message| @@ -108,7 +108,7 @@ def do_consume(message) end with_common_instrumentation("mark_as_consumed", message) do - mark_as_consumed!(message) + mark_message(message) end end @@ -169,6 +169,22 @@ def call_middlewares(message, middlewares) def trace_id @trace_id ||= SecureRandom.base58 end + + def config + @config ||= Sbmt::KafkaConsumer::Config.new + end + + def cooperative_sticky? + config.partition_assignment_strategy == "cooperative-sticky" + end + + def mark_message(message) + if cooperative_sticky? + mark_as_consumed(message) + else + mark_as_consumed!(message) + end + end end end end diff --git a/lib/sbmt/kafka_consumer/version.rb b/lib/sbmt/kafka_consumer/version.rb index 8ce33b6..9929eb2 100644 --- a/lib/sbmt/kafka_consumer/version.rb +++ b/lib/sbmt/kafka_consumer/version.rb @@ -2,6 +2,6 @@ module Sbmt module KafkaConsumer - VERSION = "2.6.1" + VERSION = "2.7.0" end end diff --git a/spec/sbmt/kafka_consumer/base_consumer_spec.rb b/spec/sbmt/kafka_consumer/base_consumer_spec.rb index 2e726e9..dc6d8f6 100644 --- a/spec/sbmt/kafka_consumer/base_consumer_spec.rb +++ b/spec/sbmt/kafka_consumer/base_consumer_spec.rb @@ -116,6 +116,32 @@ def process_message(_message) end end + context "when cooperative_sticky is true" do + before do + allow(consumer).to receive(:cooperative_sticky?).and_return(true) + end + + it "calls mark_as_consumed" do + expect(consumer).to receive(:mark_as_consumed).once + expect(consumer).not_to receive(:mark_as_consumed!) + + consume_with_sbmt_karafka + end + end + + context "when cooperative_sticky is false" do + before do + allow(consumer).to receive(:cooperative_sticky?).and_return(false) + end + + it "calls mark_as_consumed!" do + expect(consumer).to receive(:mark_as_consumed!).once.and_call_original + expect(consumer).not_to receive(:mark_as_consumed) + + consume_with_sbmt_karafka + end + end + context "when used middlewares" do let(:consumer_class) do base_klass = described_class.consumer_klass(middlewares: middlewares) @@ -136,7 +162,7 @@ def consumed? it "calls middleware before processing message" do expect(consumer).not_to receive(:process_message) - expect(consumer).to receive(:mark_as_consumed!).once.and_call_original + expect(consumer).to receive(:mark_message).once.and_call_original consume_with_sbmt_karafka expect(consumer).not_to be_consumed