diff --git a/CHANGELOG.md b/CHANGELOG.md index 20b87cb..0a2f48f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,12 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ### Fixed +## [2.4.0] - 2024-06-06 + +### Added + +- Added option `midlewares` to add middleware before message processing + ## [2.3.1] - 2024-06-05 ### Fixed diff --git a/README.md b/README.md index 08d22d1..9697f3c 100644 --- a/README.md +++ b/README.md @@ -183,12 +183,45 @@ consumer_groups: #### `consumer.init_attrs` options for `BaseConsumer` - `skip_on_error` - optional, default false, omit consumer errors in message processing and commit the offset to Kafka +- `middlewares` - optional, default [], type String, add middleware before message processing + +```yaml +init_attrs: + middlewares: ['SomeMiddleware'] +``` + +```ruby +class SomeMiddleware + def call(message) + yield if message.payload.id.to_i % 2 == 0 + end +end +``` +__CAUTION__: +- ⚠️ `yield` is mandatory for all middleware, as it returns control to the `process_message` method. #### `consumer.init_attrs` options for `InboxConsumer` - `inbox_item` - required, name of the inbox item class - `event_name` - optional, default nil, used when the inbox item keep several event types - `skip_on_error` - optional, default false, omit consumer errors in message processing and commit the offset to Kafka +- `middlewares` - optional, default [], type String, add middleware before message processing + +```yaml +init_attrs: + middlewares: ['SomeMiddleware'] +``` + +```ruby +class SomeMiddleware + def call(message) + yield if message.payload.id.to_i % 2 == 0 + end +end +``` +__CAUTION__: +- ⚠️ `yield` is mandatory for all middleware, as it returns control to the `process_message` method. +- ⚠️ Doesn't work with `process_batch`. #### `deserializer.init_attrs` options diff --git a/lib/sbmt/kafka_consumer/base_consumer.rb b/lib/sbmt/kafka_consumer/base_consumer.rb index ab432c7..f2fcdf3 100644 --- a/lib/sbmt/kafka_consumer/base_consumer.rb +++ b/lib/sbmt/kafka_consumer/base_consumer.rb @@ -5,9 +5,10 @@ module KafkaConsumer class BaseConsumer < Karafka::BaseConsumer attr_reader :trace_id - def self.consumer_klass(skip_on_error: false) + def self.consumer_klass(skip_on_error: false, middlewares: []) Class.new(self) do const_set(:SKIP_ON_ERROR, skip_on_error) + const_set(:MIDDLEWARES, middlewares.map(&:constantize)) def self.name superclass.name @@ -93,7 +94,7 @@ def do_consume(message) # so we trigger it explicitly to catch undeserializable message early message.payload - process_message(message) + call_middlewares(message, middlewares) { process_message(message) } mark_as_consumed!(message) end @@ -102,6 +103,10 @@ def skip_on_error self.class::SKIP_ON_ERROR end + def middlewares + self.class::MIDDLEWARES + end + # can be overridden in consumer to enable message logging def log_payload? false @@ -132,6 +137,21 @@ def instrument_error(error, message) def message_payload(message) message.payload || message.raw_payload end + + def call_middlewares(message, middlewares) + return yield if middlewares.empty? + + chain = middlewares.map { |middleware_class| middleware_class.new } + + traverse_chain = proc do + if chain.empty? + yield + else + chain.shift.call(message, &traverse_chain) + end + end + traverse_chain.call + end end end end diff --git a/lib/sbmt/kafka_consumer/inbox_consumer.rb b/lib/sbmt/kafka_consumer/inbox_consumer.rb index 4c955de..7cfc6ec 100644 --- a/lib/sbmt/kafka_consumer/inbox_consumer.rb +++ b/lib/sbmt/kafka_consumer/inbox_consumer.rb @@ -6,11 +6,12 @@ class InboxConsumer < BaseConsumer IDEMPOTENCY_HEADER_NAME = "Idempotency-Key" DEFAULT_SOURCE = "KAFKA" - def self.consumer_klass(inbox_item:, event_name: nil, skip_on_error: false, name: nil) + def self.consumer_klass(inbox_item:, event_name: nil, skip_on_error: false, name: nil, middlewares: []) Class.new(self) do const_set(:INBOX_ITEM_CLASS_NAME, inbox_item) const_set(:EVENT_NAME, event_name) const_set(:SKIP_ON_ERROR, skip_on_error) + const_set(:MIDDLEWARES, middlewares.map(&:constantize)) def self.name superclass.name diff --git a/lib/sbmt/kafka_consumer/version.rb b/lib/sbmt/kafka_consumer/version.rb index d63c153..9132be5 100644 --- a/lib/sbmt/kafka_consumer/version.rb +++ b/lib/sbmt/kafka_consumer/version.rb @@ -2,6 +2,6 @@ module Sbmt module KafkaConsumer - VERSION = "2.3.1" + VERSION = "2.4.0" end end diff --git a/spec/sbmt/kafka_consumer/base_consumer_spec.rb b/spec/sbmt/kafka_consumer/base_consumer_spec.rb index 7df2f87..16f0e89 100644 --- a/spec/sbmt/kafka_consumer/base_consumer_spec.rb +++ b/spec/sbmt/kafka_consumer/base_consumer_spec.rb @@ -2,6 +2,28 @@ require "rails_helper" +class MyMiddleware + def call(message) + yield + end +end + +class MyMiddleware1 + def call(message) + yield + end +end + +class SkipMessageMiddleware + def call(message); end +end + +class MiddlewareError + def call(message) + raise exception_class, "Middleware error" + end +end + describe Sbmt::KafkaConsumer::BaseConsumer do include_context "with sbmt karafka consumer" @@ -91,6 +113,96 @@ def process_message(_message) consume_with_sbmt_karafka end end + + context "when used middlewares" do + let(:consumer_class) do + base_klass = described_class.consumer_klass(middlewares: middlewares) + Class.new(base_klass) do + def process_message(_message) + @consumed = true + end + + def consumed? + !!@consumed + end + end + end + let(:consumer) { build_consumer(consumer_class.new) } + + context "when middleware condition calls message processing" do + let(:middlewares) { ["SkipMessageMiddleware"] } + + it "calls middleware before processing message" do + expect(consumer).not_to receive(:process_message) + expect(consumer).to receive(:mark_as_consumed!).once.and_call_original + + consume_with_sbmt_karafka + expect(consumer).not_to be_consumed + end + end + + context "when middlewares are present" do + let(:middlewares) { ["MyMiddleware"] } + + it "calls middleware before processing message" do + consume_with_sbmt_karafka + expect(consumer).to be_consumed + end + end + + context "when multiple middlewares are present" do + let(:middlewares) { %w[MyMiddleware MyMiddleware1] } + + it "calls each middleware in order before processing message" do + consume_with_sbmt_karafka + expect(consumer).to be_consumed + end + end + + context "when no middlewares are present" do + let(:consumer_class) do + Class.new(described_class.consumer_klass) do + def process_message(_message) + @consumed = true + end + + def consumed? + !!@consumed + end + end + end + let(:consumer) { build_consumer(consumer_class.new) } + + it "does not call any middleware" do + consume_with_sbmt_karafka + expect(consumer).to be_consumed + end + end + + context "when middleware raises exception" do + let(:exception_class) { StandardError } + let(:middlewares) { ["MiddlewareError"] } + let(:consumer_class) do + Class.new(described_class.consumer_klass(skip_on_error: true, middlewares: middlewares)) do + def process_message(_message) + @consumed = true + end + + def consumed? + !!@consumed + end + end + end + let(:consumer) { build_consumer(consumer_class.new) } + + it "skips message if middleware raises exception and skip_on_error is set" do + expect(Rails.logger).to receive(:warn).once.with(/skipping unprocessable message/) + + consume_with_sbmt_karafka + expect(consumer).not_to be_consumed + end + end + end end context "when the consumer export messages in batches" do