Skip to content

Commit 34a5bb6

Browse files
committed
Merge branch 'feat/DEX-2010/add_process_batch' into 'master'
[DEX-2010] feat: add process_batch Closes DEX-2010 See merge request nstmrt/rubygems/sbmt-kafka_consumer!49
2 parents 5a8f752 + ce351d5 commit 34a5bb6

File tree

11 files changed

+356
-83
lines changed

11 files changed

+356
-83
lines changed

CHANGELOG.md

+6
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,12 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
1313

1414
### Fixed
1515

16+
## [2.1.0] - 2024-05-13
17+
18+
### Added
19+
20+
- Implemented method `export_batch` for processing messages in batches
21+
1622
## [2.0.1] - 2024-05-08
1723

1824
### Fixed

README.md

+31
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,22 @@ require_relative "config/environment"
231231
some-extra-configuration
232232
```
233233

234+
### `Export batch`
235+
236+
To process messages in batches, you need to add the `export_batch` method in the consumer
237+
238+
```ruby
239+
# app/consumers/some_consumer.rb
240+
class SomeConsumer < Sbmt::KafkaConsumer::BaseConsumer
241+
def export_batch(messages)
242+
# some code
243+
end
244+
end
245+
```
246+
__CAUTION__:
247+
- ⚠️ Inbox does not support batch insertion.
248+
- ⚠️ If you want to use this feature, you need to process the stack atomically (eg: insert it into clickhouse in one request).
249+
234250
## CLI
235251

236252
Run the following command to execute a server
@@ -259,6 +275,7 @@ Also pay attention to the number of processes of the server:
259275

260276
To test your consumer with Rspec, please use [this shared context](./lib/sbmt/kafka_consumer/testing/shared_contexts/with_sbmt_karafka_consumer.rb)
261277

278+
### for payload
262279
```ruby
263280
require "sbmt/kafka_consumer/testing"
264281
@@ -272,6 +289,20 @@ RSpec.describe OrderCreatedConsumer do
272289
end
273290
```
274291

292+
### for payloads
293+
```ruby
294+
require "sbmt/kafka_consumer/testing"
295+
296+
RSpec.describe OrderCreatedConsumer do
297+
include_context "with sbmt karafka consumer"
298+
299+
it "works" do
300+
publish_to_sbmt_karafka_batch(payloads, deserializer: deserializer)
301+
expect { consume_with_sbmt_karafka }.to change(Order, :count).by(1)
302+
end
303+
end
304+
```
305+
275306
## Development
276307

277308
1. Prepare environment

lib/sbmt/kafka_consumer/base_consumer.rb

+35-2
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,26 @@ def self.name
1717

1818
def consume
1919
::Rails.application.executor.wrap do
20-
messages.each do |message|
21-
with_instrumentation(message) { do_consume(message) }
20+
if export_batch?
21+
with_batch_instrumentation(messages) do
22+
export_batch(messages)
23+
mark_as_consumed!(messages.last)
24+
end
25+
else
26+
messages.each do |message|
27+
with_instrumentation(message) { do_consume(message) }
28+
end
2229
end
2330
end
2431
end
2532

33+
def export_batch?
34+
if @export_batch_memoized.nil?
35+
@export_batch_memoized = respond_to?(:export_batch)
36+
end
37+
@export_batch_memoized
38+
end
39+
2640
private
2741

2842
def with_instrumentation(message)
@@ -53,6 +67,25 @@ def with_instrumentation(message)
5367
end
5468
end
5569

70+
def with_batch_instrumentation(messages)
71+
@trace_id = SecureRandom.base58
72+
73+
logger.tagged(
74+
trace_id: trace_id,
75+
first_offset: messages.first.metadata.offset,
76+
last_offset: messages.last.metadata.offset
77+
) do
78+
::Sbmt::KafkaConsumer.monitor.instrument(
79+
"consumer.consumed_batch",
80+
caller: self,
81+
messages: messages,
82+
trace_id: trace_id
83+
) do
84+
yield
85+
end
86+
end
87+
end
88+
5689
def do_consume(message)
5790
log_message(message) if log_payload?
5891

lib/sbmt/kafka_consumer/instrumentation/base_monitor.rb

+1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ class BaseMonitor < Karafka::Instrumentation::Monitor
99
SBMT_KAFKA_CONSUMER_EVENTS = %w[
1010
consumer.consumed_one
1111
consumer.inbox.consumed_one
12+
consumer.consumed_batch
1213
].freeze
1314

1415
def initialize

lib/sbmt/kafka_consumer/instrumentation/open_telemetry_tracer.rb

+31
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ def enabled?
2020

2121
def trace(&block)
2222
return handle_consumed_one(&block) if @event_id == "consumer.consumed_one"
23+
return handle_consumed_batch(&block) if @event_id == "consumer.consumed_batch"
2324
return handle_inbox_consumed_one(&block) if @event_id == "consumer.inbox.consumed_one"
2425
return handle_error(&block) if @event_id == "error.occurred"
2526

@@ -43,6 +44,23 @@ def handle_consumed_one
4344
end
4445
end
4546

47+
def handle_consumed_batch
48+
return yield unless enabled?
49+
50+
consumer = @payload[:caller]
51+
messages = @payload[:messages]
52+
53+
links = messages.filter_map do |m|
54+
parent_context = ::OpenTelemetry.propagation.extract(m.headers, getter: ::OpenTelemetry::Context::Propagation.text_map_getter)
55+
span_context = ::OpenTelemetry::Trace.current_span(parent_context).context
56+
::OpenTelemetry::Trace::Link.new(span_context) if span_context.valid?
57+
end
58+
59+
tracer.in_span("consume batch", links: links, attributes: batch_attrs(consumer, messages), kind: :consumer) do
60+
yield
61+
end
62+
end
63+
4664
def handle_inbox_consumed_one
4765
return yield unless enabled?
4866

@@ -92,6 +110,19 @@ def consumer_attrs(consumer, message)
92110
attributes.compact
93111
end
94112

113+
def batch_attrs(consumer, messages)
114+
message = messages.first
115+
{
116+
"messaging.system" => "kafka",
117+
"messaging.destination" => message.topic,
118+
"messaging.destination_kind" => "topic",
119+
"messaging.kafka.consumer_group" => consumer.topic.consumer_group.id,
120+
"messaging.batch_size" => messages.count,
121+
"messaging.first_offset" => messages.first.offset,
122+
"messaging.last_offset" => messages.last.offset
123+
}.compact
124+
end
125+
95126
def extract_message_key(key)
96127
# skip encode if already valid utf8
97128
return key if key.nil? || (key.encoding == Encoding::UTF_8 && key.valid_encoding?)

lib/sbmt/kafka_consumer/instrumentation/sentry_tracer.rb

+46-16
Original file line numberDiff line numberDiff line change
@@ -9,34 +9,48 @@ module Instrumentation
99
class SentryTracer < ::Sbmt::KafkaConsumer::Instrumentation::Tracer
1010
CONSUMER_ERROR_TYPES = %w[
1111
consumer.base.consume_one
12+
consumer.base.consumed_batch
1213
consumer.inbox.consume_one
1314
].freeze
1415

1516
def trace(&block)
1617
return handle_consumed_one(&block) if @event_id == "consumer.consumed_one"
18+
return handle_consumed_batch(&block) if @event_id == "consumer.consumed_batch"
1719
return handle_error(&block) if @event_id == "error.occurred"
1820

1921
yield
2022
end
2123

2224
def handle_consumed_one
23-
return yield unless ::Sentry.initialized?
24-
25-
consumer = @payload[:caller]
26-
message = @payload[:message]
27-
trace_id = @payload[:trace_id]
28-
29-
scope, transaction = start_transaction(trace_id, consumer, message)
30-
31-
begin
25+
message = {
26+
trace_id: @payload[:trace_id],
27+
topic: @payload[:message].topic,
28+
offset: @payload[:message].offset
29+
}
30+
31+
with_sentry_transaction(
32+
@payload[:caller],
33+
message
34+
) do
3235
yield
33-
rescue
34-
finish_transaction(transaction, 500)
35-
raise
3636
end
37+
end
3738

38-
finish_transaction(transaction, 200)
39-
scope.clear
39+
def handle_consumed_batch
40+
message_first = @payload[:messages].first
41+
message = {
42+
trace_id: @payload[:trace_id],
43+
topic: message_first.topic,
44+
first_offset: message_first.offset,
45+
last_offset: @payload[:messages].last.offset
46+
}
47+
48+
with_sentry_transaction(
49+
@payload[:caller],
50+
message
51+
) do
52+
yield
53+
end
4054
end
4155

4256
def handle_error
@@ -64,9 +78,9 @@ def handle_error
6478

6579
private
6680

67-
def start_transaction(trace_id, consumer, message)
81+
def start_transaction(consumer, message)
6882
scope = ::Sentry.get_current_scope
69-
scope.set_tags(trace_id: trace_id, topic: message.topic, offset: message.offset)
83+
scope.set_tags(message)
7084
scope.set_transaction_name("Sbmt/KafkaConsumer/#{consumer.class.name}")
7185

7286
transaction = ::Sentry.start_transaction(name: scope.transaction_name, op: "kafka-consumer")
@@ -97,6 +111,22 @@ def message_payload(message)
97111
# so in that case we return raw_payload
98112
message.raw_payload
99113
end
114+
115+
def with_sentry_transaction(consumer, message)
116+
return yield unless ::Sentry.initialized?
117+
118+
scope, transaction = start_transaction(consumer, message)
119+
120+
begin
121+
yield
122+
rescue
123+
finish_transaction(transaction, 500)
124+
raise
125+
end
126+
127+
finish_transaction(transaction, 200)
128+
scope.clear
129+
end
100130
end
101131
end
102132
end

lib/sbmt/kafka_consumer/testing/shared_contexts/with_sbmt_karafka_consumer.rb

+22-9
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,14 @@
2828

2929
def publish_to_sbmt_karafka(raw_payload, opts = {})
3030
message = Karafka::Messages::Message.new(raw_payload, Karafka::Messages::Metadata.new(metadata_defaults.merge(opts)))
31-
consumer.messages = Karafka::Messages::Messages.new(
32-
[message],
33-
Karafka::Messages::BatchMetadata.new(
34-
topic: test_topic.name,
35-
partition: 0,
36-
processed_at: Time.zone.now,
37-
created_at: Time.zone.now
38-
)
39-
)
31+
consumer.messages = consumer_messages([message])
32+
end
33+
34+
def publish_to_sbmt_karafka_batch(raw_payloads, opts = {})
35+
messages = raw_payloads.map do |p|
36+
Karafka::Messages::Message.new(p, Karafka::Messages::Metadata.new(metadata_defaults.merge(opts)))
37+
end
38+
consumer.messages = consumer_messages(messages)
4039
end
4140

4241
# @return [Hash] message default options
@@ -58,4 +57,18 @@ def build_consumer(instance)
5857
instance.singleton_class.include Karafka::Processing::Strategies::Default
5958
instance
6059
end
60+
61+
private
62+
63+
def consumer_messages(messages)
64+
Karafka::Messages::Messages.new(
65+
messages,
66+
Karafka::Messages::BatchMetadata.new(
67+
topic: test_topic.name,
68+
partition: 0,
69+
processed_at: Time.zone.now,
70+
created_at: Time.zone.now
71+
)
72+
)
73+
end
6174
end

lib/sbmt/kafka_consumer/version.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,6 @@
22

33
module Sbmt
44
module KafkaConsumer
5-
VERSION = "2.0.1"
5+
VERSION = "2.1.0"
66
end
77
end

0 commit comments

Comments
 (0)