Skip to content

Commit

Permalink
[DEX-2146] feat: added option midlewares
Browse files Browse the repository at this point in the history
  • Loading branch information
Arlantir authored and bibendi committed Jun 7, 2024
1 parent a1f6274 commit bb66c32
Show file tree
Hide file tree
Showing 6 changed files with 176 additions and 4 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

### Fixed

## [2.4.0] - 2024-06-06

### Added

- Added option `midlewares` to add middleware before message processing

## [2.3.1] - 2024-06-05

### Fixed
Expand Down
33 changes: 33 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,12 +183,45 @@ consumer_groups:
#### `consumer.init_attrs` options for `BaseConsumer`

- `skip_on_error` - optional, default false, omit consumer errors in message processing and commit the offset to Kafka
- `middlewares` - optional, default [], type String, add middleware before message processing

```yaml
init_attrs:
middlewares: ['SomeMiddleware']
```

```ruby
class SomeMiddleware
def call(message)
yield if message.payload.id.to_i % 2 == 0
end
end
```
__CAUTION__:
- ⚠️ `yield` is mandatory for all middleware, as it returns control to the `process_message` method.

#### `consumer.init_attrs` options for `InboxConsumer`

- `inbox_item` - required, name of the inbox item class
- `event_name` - optional, default nil, used when the inbox item keep several event types
- `skip_on_error` - optional, default false, omit consumer errors in message processing and commit the offset to Kafka
- `middlewares` - optional, default [], type String, add middleware before message processing

```yaml
init_attrs:
middlewares: ['SomeMiddleware']
```

```ruby
class SomeMiddleware
def call(message)
yield if message.payload.id.to_i % 2 == 0
end
end
```
__CAUTION__:
- ⚠️ `yield` is mandatory for all middleware, as it returns control to the `process_message` method.
- ⚠️ Doesn't work with `process_batch`.

#### `deserializer.init_attrs` options

Expand Down
24 changes: 22 additions & 2 deletions lib/sbmt/kafka_consumer/base_consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ module KafkaConsumer
class BaseConsumer < Karafka::BaseConsumer
attr_reader :trace_id

def self.consumer_klass(skip_on_error: false)
def self.consumer_klass(skip_on_error: false, middlewares: [])
Class.new(self) do
const_set(:SKIP_ON_ERROR, skip_on_error)
const_set(:MIDDLEWARES, middlewares.map(&:constantize))

def self.name
superclass.name
Expand Down Expand Up @@ -93,7 +94,7 @@ def do_consume(message)
# so we trigger it explicitly to catch undeserializable message early
message.payload

process_message(message)
call_middlewares(message, middlewares) { process_message(message) }

mark_as_consumed!(message)
end
Expand All @@ -102,6 +103,10 @@ def skip_on_error
self.class::SKIP_ON_ERROR
end

def middlewares
self.class::MIDDLEWARES
end

# can be overridden in consumer to enable message logging
def log_payload?
false
Expand Down Expand Up @@ -132,6 +137,21 @@ def instrument_error(error, message)
def message_payload(message)
message.payload || message.raw_payload
end

def call_middlewares(message, middlewares)
return yield if middlewares.empty?

chain = middlewares.map { |middleware_class| middleware_class.new }

traverse_chain = proc do
if chain.empty?
yield
else
chain.shift.call(message, &traverse_chain)
end
end
traverse_chain.call
end
end
end
end
3 changes: 2 additions & 1 deletion lib/sbmt/kafka_consumer/inbox_consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ class InboxConsumer < BaseConsumer
IDEMPOTENCY_HEADER_NAME = "Idempotency-Key"
DEFAULT_SOURCE = "KAFKA"

def self.consumer_klass(inbox_item:, event_name: nil, skip_on_error: false, name: nil)
def self.consumer_klass(inbox_item:, event_name: nil, skip_on_error: false, name: nil, middlewares: [])
Class.new(self) do
const_set(:INBOX_ITEM_CLASS_NAME, inbox_item)
const_set(:EVENT_NAME, event_name)
const_set(:SKIP_ON_ERROR, skip_on_error)
const_set(:MIDDLEWARES, middlewares.map(&:constantize))

def self.name
superclass.name
Expand Down
2 changes: 1 addition & 1 deletion lib/sbmt/kafka_consumer/version.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@

module Sbmt
module KafkaConsumer
VERSION = "2.3.1"
VERSION = "2.4.0"
end
end
112 changes: 112 additions & 0 deletions spec/sbmt/kafka_consumer/base_consumer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,28 @@

require "rails_helper"

class MyMiddleware
def call(message)
yield
end
end

class MyMiddleware1
def call(message)
yield
end
end

class SkipMessageMiddleware
def call(message); end
end

class MiddlewareError
def call(message)
raise exception_class, "Middleware error"
end
end

describe Sbmt::KafkaConsumer::BaseConsumer do
include_context "with sbmt karafka consumer"

Expand Down Expand Up @@ -91,6 +113,96 @@ def process_message(_message)
consume_with_sbmt_karafka
end
end

context "when used middlewares" do
let(:consumer_class) do
base_klass = described_class.consumer_klass(middlewares: middlewares)
Class.new(base_klass) do
def process_message(_message)
@consumed = true
end

def consumed?
!!@consumed
end
end
end
let(:consumer) { build_consumer(consumer_class.new) }

context "when middleware condition calls message processing" do
let(:middlewares) { ["SkipMessageMiddleware"] }

it "calls middleware before processing message" do
expect(consumer).not_to receive(:process_message)
expect(consumer).to receive(:mark_as_consumed!).once.and_call_original

consume_with_sbmt_karafka
expect(consumer).not_to be_consumed
end
end

context "when middlewares are present" do
let(:middlewares) { ["MyMiddleware"] }

it "calls middleware before processing message" do
consume_with_sbmt_karafka
expect(consumer).to be_consumed
end
end

context "when multiple middlewares are present" do
let(:middlewares) { %w[MyMiddleware MyMiddleware1] }

it "calls each middleware in order before processing message" do
consume_with_sbmt_karafka
expect(consumer).to be_consumed
end
end

context "when no middlewares are present" do
let(:consumer_class) do
Class.new(described_class.consumer_klass) do
def process_message(_message)
@consumed = true
end

def consumed?
!!@consumed
end
end
end
let(:consumer) { build_consumer(consumer_class.new) }

it "does not call any middleware" do
consume_with_sbmt_karafka
expect(consumer).to be_consumed
end
end

context "when middleware raises exception" do
let(:exception_class) { StandardError }
let(:middlewares) { ["MiddlewareError"] }
let(:consumer_class) do
Class.new(described_class.consumer_klass(skip_on_error: true, middlewares: middlewares)) do
def process_message(_message)
@consumed = true
end

def consumed?
!!@consumed
end
end
end
let(:consumer) { build_consumer(consumer_class.new) }

it "skips message if middleware raises exception and skip_on_error is set" do
expect(Rails.logger).to receive(:warn).once.with(/skipping unprocessable message/)

consume_with_sbmt_karafka
expect(consumer).not_to be_consumed
end
end
end
end

context "when the consumer export messages in batches" do
Expand Down

0 comments on commit bb66c32

Please sign in to comment.