diff --git a/CHANGELOG.md b/CHANGELOG.md index 37e98ad..dc6fa9d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,12 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ### Fixed +## [3.1.0] - 2024-09-09 + +### Fixed + +- Refactor consumer class initialization + ## [3.0.0] - 2024-09-04 ## BREAKING @@ -25,6 +31,12 @@ and this project adheres to [Semantic Versioning](http://semver.org/). - Support consumer group mappers to support backward compatibility of consumer group naming +## [2.8.0] - 2024-09-09 + +### Fixed + +- Refactor consumer class initialization + ## [2.7.1] - 2024-08-01 ### Fixed diff --git a/lib/sbmt/kafka_consumer/base_consumer.rb b/lib/sbmt/kafka_consumer/base_consumer.rb index f1f4e68..b24596c 100644 --- a/lib/sbmt/kafka_consumer/base_consumer.rb +++ b/lib/sbmt/kafka_consumer/base_consumer.rb @@ -3,15 +3,20 @@ module Sbmt module KafkaConsumer class BaseConsumer < Karafka::BaseConsumer - def self.consumer_klass(skip_on_error: false, middlewares: []) - Class.new(self) do - const_set(:SKIP_ON_ERROR, skip_on_error) - const_set(:MIDDLEWARES, middlewares.map(&:constantize)) + class_attribute :skip_on_error, instance_writer: false, default: false + class_attribute :middlewares, instance_writer: false, default: [] + def self.consumer_klass(skip_on_error: nil, middlewares: nil) + klass = Class.new(self) do def self.name superclass.name end end + + # defaults are set in class_attribute definition + klass.skip_on_error = skip_on_error if skip_on_error + klass.middlewares = middlewares.map(&:constantize) if middlewares + klass end def consume @@ -112,14 +117,6 @@ def do_consume(message) end end - def skip_on_error - self.class::SKIP_ON_ERROR - end - - def middlewares - self.class::MIDDLEWARES - end - # can be overridden in consumer to enable message logging def log_payload? false diff --git a/lib/sbmt/kafka_consumer/inbox_consumer.rb b/lib/sbmt/kafka_consumer/inbox_consumer.rb index 7cfc6ec..ec08635 100644 --- a/lib/sbmt/kafka_consumer/inbox_consumer.rb +++ b/lib/sbmt/kafka_consumer/inbox_consumer.rb @@ -6,17 +6,20 @@ class InboxConsumer < BaseConsumer IDEMPOTENCY_HEADER_NAME = "Idempotency-Key" DEFAULT_SOURCE = "KAFKA" - def self.consumer_klass(inbox_item:, event_name: nil, skip_on_error: false, name: nil, middlewares: []) - Class.new(self) do - const_set(:INBOX_ITEM_CLASS_NAME, inbox_item) - const_set(:EVENT_NAME, event_name) - const_set(:SKIP_ON_ERROR, skip_on_error) - const_set(:MIDDLEWARES, middlewares.map(&:constantize)) - - def self.name - superclass.name - end - end + class_attribute :inbox_item_class, instance_writer: false, default: nil + class_attribute :event_name, instance_writer: false, default: nil + + def self.consumer_klass(inbox_item:, event_name: nil, skip_on_error: nil, name: nil, middlewares: nil) + # defaults are set in class_attribute definition + klass = super(skip_on_error: skip_on_error, middlewares: middlewares) + klass.inbox_item_class = inbox_item.constantize + klass.event_name = event_name if event_name + klass + end + + def initialize + raise Sbmt::KafkaConsumer::Error, "inbox_item param is not set" if inbox_item_class.blank? + super end def extra_message_attrs(_message) @@ -101,14 +104,6 @@ def message_uuid(message) message.metadata.headers.fetch(IDEMPOTENCY_HEADER_NAME, nil).presence end - def inbox_item_class - @inbox_item_class ||= self.class::INBOX_ITEM_CLASS_NAME.constantize - end - - def event_name - @event_name ||= self.class::EVENT_NAME - end - def inbox_name inbox_item_class.box_name end diff --git a/lib/sbmt/kafka_consumer/testing/shared_contexts/with_sbmt_karafka_consumer.rb b/lib/sbmt/kafka_consumer/testing/shared_contexts/with_sbmt_karafka_consumer.rb index 69a52dd..cd9bd5a 100644 --- a/lib/sbmt/kafka_consumer/testing/shared_contexts/with_sbmt_karafka_consumer.rb +++ b/lib/sbmt/kafka_consumer/testing/shared_contexts/with_sbmt_karafka_consumer.rb @@ -16,9 +16,8 @@ let(:kafka_client) { instance_double(Karafka::Connection::Client) } let(:null_deserializer) { Sbmt::KafkaConsumer::Serialization::NullDeserializer.new } - let(:consumer) { - build_consumer(described_class.new) - } + let(:consumer_class) { described_class.consumer_klass } + let(:consumer) { build_consumer(consumer_class.new) } before { Sbmt::KafkaConsumer::ClientConfigurer.configure! diff --git a/lib/sbmt/kafka_consumer/version.rb b/lib/sbmt/kafka_consumer/version.rb index 50ad8b8..2a54330 100644 --- a/lib/sbmt/kafka_consumer/version.rb +++ b/lib/sbmt/kafka_consumer/version.rb @@ -2,6 +2,6 @@ module Sbmt module KafkaConsumer - VERSION = "3.0.0" + VERSION = "3.1.0" end end diff --git a/spec/sbmt/kafka_consumer/base_consumer_spec.rb b/spec/sbmt/kafka_consumer/base_consumer_spec.rb index 9fdeecc..a3861a9 100644 --- a/spec/sbmt/kafka_consumer/base_consumer_spec.rb +++ b/spec/sbmt/kafka_consumer/base_consumer_spec.rb @@ -27,6 +27,20 @@ def call(message) describe Sbmt::KafkaConsumer::BaseConsumer do include_context "with sbmt karafka consumer" + context "with class_attribute params" do + it "child class does not overwrite parent's attrs" do + parent_class = described_class.consumer_klass(skip_on_error: false) + expect(parent_class.skip_on_error).to be(false) + expect(parent_class.name).to eq(described_class.name) + + child_class = parent_class.consumer_klass(skip_on_error: true) + expect(child_class.skip_on_error).to be(true) + expect(child_class.name).to eq(described_class.name) + + expect(parent_class.skip_on_error).to be(false) + end + end + context "when the consumer processes one message at a time" do let(:consumer_class) do Class.new(described_class.consumer_klass) do @@ -57,8 +71,6 @@ def consumed? end end - let(:consumer) { build_consumer(consumer_class.new) } - let(:payload) { "test-payload" } let(:headers) { {"Test-Header" => "test-header-value"} } let(:key) { "test-key" } @@ -107,7 +119,6 @@ def process_message(_message) end end end - let(:consumer) { build_consumer(consumer_class.new) } it "skips message if skip_on_error is set" do expect(Rails.logger).to receive(:error).twice @@ -155,7 +166,6 @@ def consumed? end end end - let(:consumer) { build_consumer(consumer_class.new) } context "when middleware condition calls message processing" do let(:middlewares) { ["SkipMessageMiddleware"] } @@ -199,7 +209,6 @@ def consumed? end end end - let(:consumer) { build_consumer(consumer_class.new) } it "does not call any middleware" do consume_with_sbmt_karafka @@ -221,7 +230,6 @@ def consumed? end end end - let(:consumer) { build_consumer(consumer_class.new) } it "skips message if middleware raises exception and skip_on_error is set" do expect(Rails.logger).to receive(:warn).once.with(/skipping unprocessable message/) @@ -248,7 +256,6 @@ def consumed? end end - let(:consumer) { build_consumer(consumer_class.new) } let(:payload) { "test-payload" } before do diff --git a/spec/sbmt/kafka_consumer/inbox_consumer_spec.rb b/spec/sbmt/kafka_consumer/inbox_consumer_spec.rb index 4f845e5..635ed34 100644 --- a/spec/sbmt/kafka_consumer/inbox_consumer_spec.rb +++ b/spec/sbmt/kafka_consumer/inbox_consumer_spec.rb @@ -5,7 +5,7 @@ describe Sbmt::KafkaConsumer::InboxConsumer do include_context "with sbmt karafka consumer" - let(:klass) do + let(:consumer_class) do described_class.consumer_klass( name: "test_items", event_name: "test-event-name", @@ -15,7 +15,6 @@ end let(:skip_on_error) { false } - let(:consumer) { build_consumer(klass.new) } let(:create_item_result) { Dry::Monads::Result::Success } let(:logger) { double(ActiveSupport::TaggedLogging) } let(:uuid) { "test-uuid-1" } @@ -240,14 +239,13 @@ end context "when extra_message_attrs is used" do - let(:consumer) do - consumer_class = Class.new(klass) do + let(:consumer_class) do + klass = super() + Class.new(klass) do def extra_message_attrs(_message) {event_name: "custom-value"} end end - - build_consumer(consumer_class.new) end it "merges with default inbox-item attributes" do