Skip to content

Commit

Permalink
[DEX-2574] fix: refactor consumer init
Browse files Browse the repository at this point in the history
  • Loading branch information
ysatarov committed Sep 9, 2024
1 parent 49fd804 commit 0e8943f
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 48 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

### Fixed

## [3.1.0] - 2024-09-09

### Fixed

- Refactor consumer class initialization

## [3.0.0] - 2024-09-04

## BREAKING
Expand All @@ -25,6 +31,12 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

- Support consumer group mappers to support backward compatibility of consumer group naming

## [2.8.0] - 2024-09-09

### Fixed

- Refactor consumer class initialization

## [2.7.1] - 2024-08-01

### Fixed
Expand Down
21 changes: 9 additions & 12 deletions lib/sbmt/kafka_consumer/base_consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,20 @@
module Sbmt
module KafkaConsumer
class BaseConsumer < Karafka::BaseConsumer
def self.consumer_klass(skip_on_error: false, middlewares: [])
Class.new(self) do
const_set(:SKIP_ON_ERROR, skip_on_error)
const_set(:MIDDLEWARES, middlewares.map(&:constantize))
class_attribute :skip_on_error, instance_writer: false, default: false
class_attribute :middlewares, instance_writer: false, default: []

def self.consumer_klass(skip_on_error: nil, middlewares: nil)
klass = Class.new(self) do
def self.name
superclass.name
end
end

# defaults are set in class_attribute definition
klass.skip_on_error = skip_on_error if skip_on_error
klass.middlewares = middlewares.map(&:constantize) if middlewares
klass
end

def consume
Expand Down Expand Up @@ -112,14 +117,6 @@ def do_consume(message)
end
end

def skip_on_error
self.class::SKIP_ON_ERROR
end

def middlewares
self.class::MIDDLEWARES
end

# can be overridden in consumer to enable message logging
def log_payload?
false
Expand Down
33 changes: 14 additions & 19 deletions lib/sbmt/kafka_consumer/inbox_consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,20 @@ class InboxConsumer < BaseConsumer
IDEMPOTENCY_HEADER_NAME = "Idempotency-Key"
DEFAULT_SOURCE = "KAFKA"

def self.consumer_klass(inbox_item:, event_name: nil, skip_on_error: false, name: nil, middlewares: [])
Class.new(self) do
const_set(:INBOX_ITEM_CLASS_NAME, inbox_item)
const_set(:EVENT_NAME, event_name)
const_set(:SKIP_ON_ERROR, skip_on_error)
const_set(:MIDDLEWARES, middlewares.map(&:constantize))

def self.name
superclass.name
end
end
class_attribute :inbox_item_class, instance_writer: false, default: nil
class_attribute :event_name, instance_writer: false, default: nil

def self.consumer_klass(inbox_item:, event_name: nil, skip_on_error: nil, name: nil, middlewares: nil)
# defaults are set in class_attribute definition
klass = super(skip_on_error: skip_on_error, middlewares: middlewares)
klass.inbox_item_class = inbox_item.constantize
klass.event_name = event_name if event_name
klass
end

def initialize
raise Sbmt::KafkaConsumer::Error, "inbox_item param is not set" if inbox_item_class.blank?
super
end

def extra_message_attrs(_message)
Expand Down Expand Up @@ -101,14 +104,6 @@ def message_uuid(message)
message.metadata.headers.fetch(IDEMPOTENCY_HEADER_NAME, nil).presence
end

def inbox_item_class
@inbox_item_class ||= self.class::INBOX_ITEM_CLASS_NAME.constantize
end

def event_name
@event_name ||= self.class::EVENT_NAME
end

def inbox_name
inbox_item_class.box_name
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@
let(:kafka_client) { instance_double(Karafka::Connection::Client) }
let(:null_deserializer) { Sbmt::KafkaConsumer::Serialization::NullDeserializer.new }

let(:consumer) {
build_consumer(described_class.new)
}
let(:consumer_class) { described_class.consumer_klass }
let(:consumer) { build_consumer(consumer_class.new) }

before {
Sbmt::KafkaConsumer::ClientConfigurer.configure!
Expand Down
2 changes: 1 addition & 1 deletion lib/sbmt/kafka_consumer/version.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@

module Sbmt
module KafkaConsumer
VERSION = "3.0.0"
VERSION = "3.1.0"
end
end
21 changes: 14 additions & 7 deletions spec/sbmt/kafka_consumer/base_consumer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,20 @@ def call(message)
describe Sbmt::KafkaConsumer::BaseConsumer do
include_context "with sbmt karafka consumer"

context "with class_attribute params" do
it "child class does not overwrite parent's attrs" do
parent_class = described_class.consumer_klass(skip_on_error: false)
expect(parent_class.skip_on_error).to be(false)
expect(parent_class.name).to eq(described_class.name)

child_class = parent_class.consumer_klass(skip_on_error: true)
expect(child_class.skip_on_error).to be(true)
expect(child_class.name).to eq(described_class.name)

expect(parent_class.skip_on_error).to be(false)
end
end

context "when the consumer processes one message at a time" do
let(:consumer_class) do
Class.new(described_class.consumer_klass) do
Expand Down Expand Up @@ -57,8 +71,6 @@ def consumed?
end
end

let(:consumer) { build_consumer(consumer_class.new) }

let(:payload) { "test-payload" }
let(:headers) { {"Test-Header" => "test-header-value"} }
let(:key) { "test-key" }
Expand Down Expand Up @@ -107,7 +119,6 @@ def process_message(_message)
end
end
end
let(:consumer) { build_consumer(consumer_class.new) }

it "skips message if skip_on_error is set" do
expect(Rails.logger).to receive(:error).twice
Expand Down Expand Up @@ -155,7 +166,6 @@ def consumed?
end
end
end
let(:consumer) { build_consumer(consumer_class.new) }

context "when middleware condition calls message processing" do
let(:middlewares) { ["SkipMessageMiddleware"] }
Expand Down Expand Up @@ -199,7 +209,6 @@ def consumed?
end
end
end
let(:consumer) { build_consumer(consumer_class.new) }

it "does not call any middleware" do
consume_with_sbmt_karafka
Expand All @@ -221,7 +230,6 @@ def consumed?
end
end
end
let(:consumer) { build_consumer(consumer_class.new) }

it "skips message if middleware raises exception and skip_on_error is set" do
expect(Rails.logger).to receive(:warn).once.with(/skipping unprocessable message/)
Expand All @@ -248,7 +256,6 @@ def consumed?
end
end

let(:consumer) { build_consumer(consumer_class.new) }
let(:payload) { "test-payload" }

before do
Expand Down
10 changes: 4 additions & 6 deletions spec/sbmt/kafka_consumer/inbox_consumer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
describe Sbmt::KafkaConsumer::InboxConsumer do
include_context "with sbmt karafka consumer"

let(:klass) do
let(:consumer_class) do
described_class.consumer_klass(
name: "test_items",
event_name: "test-event-name",
Expand All @@ -15,7 +15,6 @@
end

let(:skip_on_error) { false }
let(:consumer) { build_consumer(klass.new) }
let(:create_item_result) { Dry::Monads::Result::Success }
let(:logger) { double(ActiveSupport::TaggedLogging) }
let(:uuid) { "test-uuid-1" }
Expand Down Expand Up @@ -240,14 +239,13 @@
end

context "when extra_message_attrs is used" do
let(:consumer) do
consumer_class = Class.new(klass) do
let(:consumer_class) do
klass = super()
Class.new(klass) do
def extra_message_attrs(_message)
{event_name: "custom-value"}
end
end

build_consumer(consumer_class.new)
end

it "merges with default inbox-item attributes" do
Expand Down

0 comments on commit 0e8943f

Please sign in to comment.