Skip to content

Commit

Permalink
Merge branch 'feat/DEX-2307/add-instrumentations' into 'master'
Browse files Browse the repository at this point in the history
[DEX-2307] feat: add instrumentations

Closes DEX-2307

See merge request nstmrt/rubygems/sbmt-kafka_consumer!57
  • Loading branch information
Arlantir committed Jul 1, 2024
2 parents 2076cbd + 15b6965 commit 94fb4b6
Show file tree
Hide file tree
Showing 10 changed files with 109 additions and 25 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,16 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

### Fixed

## [2.6.0] - 2024-07-01

### Added

- Added instrumentation for methods `process_message` and `mark_as_consumed!`

### Fixed

- From `do_consume(message)` to `yield`

## [2.5.0] - 2024-06-24

### Added
Expand Down
27 changes: 24 additions & 3 deletions lib/sbmt/kafka_consumer/base_consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def with_instrumentation(message)
"consumer.consumed_one",
caller: self, message: message, trace_id: trace_id
) do
do_consume(message)
yield
rescue SkipUndeserializableMessage => ex
instrument_error(ex, message)
logger.warn("skipping undeserializable message: #{ex.message}")
Expand Down Expand Up @@ -87,16 +87,37 @@ def with_batch_instrumentation(messages)
end
end

def with_common_instrumentation(name, message)
@trace_id = SecureRandom.base58

logger.tagged(
trace_id: trace_id
) do
::Sbmt::KafkaConsumer.monitor.instrument(
"consumer.#{name}",
caller: self,
message: message,
trace_id: trace_id
) do
yield
end
end
end

def do_consume(message)
log_message(message) if log_payload?

# deserialization process is lazy (and cached)
# so we trigger it explicitly to catch undeserializable message early
message.payload

call_middlewares(message, middlewares) { process_message(message) }
with_common_instrumentation("process_message", message) do
call_middlewares(message, middlewares) { process_message(message) }
end

mark_as_consumed!(message)
with_common_instrumentation("mark_as_consumed", message) do
mark_as_consumed!(message)
end
end

def skip_on_error
Expand Down
2 changes: 2 additions & 0 deletions lib/sbmt/kafka_consumer/instrumentation/base_monitor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ class BaseMonitor < Karafka::Instrumentation::Monitor
consumer.consumed_one
consumer.inbox.consumed_one
consumer.consumed_batch
consumer.process_message
consumer.mark_as_consumed
].freeze

def initialize
Expand Down
8 changes: 8 additions & 0 deletions lib/sbmt/kafka_consumer/instrumentation/logger_listener.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ def on_consumer_consumed_one(event)
logger.info("Successfully consumed message in #{event.payload[:time]} ms")
end

def on_consumer_mark_as_consumed(event)
logger.info("Processing message in #{event.payload[:time]} ms")
end

def on_consumer_process_message(event)
logger.info("Commit offset in #{event.payload[:time]} ms")
end

# InboxConsumer events
def on_consumer_inbox_consumed_one(event)
logger.tagged(status: event[:status]) do
Expand Down
16 changes: 16 additions & 0 deletions lib/sbmt/kafka_consumer/instrumentation/open_telemetry_tracer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ module Sbmt
module KafkaConsumer
module Instrumentation
class OpenTelemetryTracer < ::Sbmt::KafkaConsumer::Instrumentation::Tracer
CONSUMED_EVENTS = %w[
consumer.process_message
consumer.mark_as_consumed
].freeze

class << self
def enabled?
!!@enabled
Expand All @@ -22,6 +27,7 @@ def trace(&block)
return handle_consumed_one(&block) if @event_id == "consumer.consumed_one"
return handle_consumed_batch(&block) if @event_id == "consumer.consumed_batch"
return handle_inbox_consumed_one(&block) if @event_id == "consumer.inbox.consumed_one"
return handle_common_event(&block) if CONSUMED_EVENTS.include?(@event_id)
return handle_error(&block) if @event_id == "error.occurred"

yield
Expand Down Expand Up @@ -79,6 +85,16 @@ def handle_inbox_consumed_one
end
end

def handle_common_event(&block)
return yield unless enabled?

if @payload[:inbox_name].present?
handle_inbox_consumed_one(&block)
else
handle_consumed_one(&block)
end
end

def handle_error
return yield unless enabled?

Expand Down
8 changes: 7 additions & 1 deletion lib/sbmt/kafka_consumer/instrumentation/sentry_tracer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,14 @@ class SentryTracer < ::Sbmt::KafkaConsumer::Instrumentation::Tracer
consumer.inbox.consume_one
].freeze

EVENTS = %w[
consumer.consumed_one
consumer.process_message
consumer.mark_as_consumed
].freeze

def trace(&block)
return handle_consumed_one(&block) if @event_id == "consumer.consumed_one"
return handle_consumed_one(&block) if EVENTS.include?(@event_id)
return handle_consumed_batch(&block) if @event_id == "consumer.consumed_batch"
return handle_error(&block) if @event_id == "error.occurred"

Expand Down
2 changes: 1 addition & 1 deletion lib/sbmt/kafka_consumer/version.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@

module Sbmt
module KafkaConsumer
VERSION = "2.5.0"
VERSION = "2.6.0"
end
end
2 changes: 2 additions & 0 deletions spec/sbmt/kafka_consumer/base_consumer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ def consumed?

it "logs message" do
expect(Rails.logger).to receive(:info).with(/Successfully consumed message/)
expect(Rails.logger).to receive(:info).with(/Processing message/)
expect(Rails.logger).to receive(:info).with(/Commit offset/)
expect(Rails.logger).to receive(:info).with(/#{payload}/)

consume_with_sbmt_karafka
Expand Down
6 changes: 6 additions & 0 deletions spec/sbmt/kafka_consumer/inbox_consumer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
it "creates inbox item" do
expect(kafka_client).to receive(:mark_as_consumed!)
expect(Rails.logger).to receive(:info).with(/Successfully consumed/).twice
expect(Rails.logger).to receive(:info).with(/Processing message/)
expect(Rails.logger).to receive(:info).with(/Commit offset/)
expect { consume_with_sbmt_karafka }
.to change(TestInboxItem, :count).by(1)
.and increment_yabeda_counter(Yabeda.kafka_consumer.inbox_consumes)
Expand Down Expand Up @@ -160,6 +162,8 @@
context "with poisoned message" do
before do
allow(Rails.logger).to receive(:info).with(/Successfully consumed/)
allow(Rails.logger).to receive(:info).with(/Processing message/)
allow(Rails.logger).to receive(:info).with(/Commit offset/)
allow(Rails.logger).to receive(:error)
end

Expand Down Expand Up @@ -249,6 +253,8 @@ def extra_message_attrs(_message)
it "merges with default inbox-item attributes" do
expect(kafka_client).to receive(:mark_as_consumed!)
expect(Rails.logger).to receive(:info).with(/Successfully consumed/).twice
expect(Rails.logger).to receive(:info).with(/Processing message/)
expect(Rails.logger).to receive(:info).with(/Commit offset/)
expect { consume_with_sbmt_karafka }.to change(TestInboxItem, :count).by(1)
expect(TestInboxItem.last.event_name).to eq("custom-value")
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,34 @@
let(:consumer_group) { OpenStruct.new(id: consumer_group_name) }
let(:consumer_topic) { OpenStruct.new(consumer_group: consumer_group) }
let(:consumer) { OpenStruct.new(topic: consumer_topic, inbox_name: "inbox/name", event_name: nil) }
let(:event_payload) { OpenStruct.new(caller: consumer, message: message, inbox_name: "inbox/name", event_name: nil, status: "failure") }
let(:event_payload) { OpenStruct.new(caller: consumer, message: message, event_name: nil, status: "failure") }
let(:event_inbox_payload) { OpenStruct.new(caller: consumer, message: message, inbox_name: "inbox/name", event_name: nil, status: "failure") }
let(:event_payload_with_batch) { OpenStruct.new(caller: consumer, messages: batch_messages, inbox_name: "inbox/name", event_name: nil, status: "failure") }

shared_examples "traces message" do |event_name, span_name|
it "traces #{event_name} message" do
expect(tracer).to receive(:in_span).with(span_name, links: nil, kind: :consumer, attributes: {
"messaging.destination" => topic_name,
"messaging.destination_kind" => "topic",
"messaging.kafka.consumer_group" => consumer_group_name,
"messaging.kafka.offset" => 0,
"messaging.kafka.partition" => 1,
"messaging.system" => "kafka"
})
described_class.new(event_name, event_payload).trace {}
end
end

shared_examples "traces message with inbox" do |event_name, span_name|
it "traces #{event_name} message" do
expect(tracer).to receive(:in_span).with(span_name, kind: :consumer, attributes: {
"inbox.inbox_name" => "inbox/name",
"inbox.status" => "failure"
})
described_class.new(event_name, event_inbox_payload).trace {}
end
end

describe "when disabled" do
before { described_class.enabled = false }

Expand All @@ -39,17 +64,13 @@
allow(instrumentation_instance).to receive(:tracer).and_return(tracer)
end

it "traces message" do
expect(tracer).to receive(:in_span).with("consume topic", links: nil, kind: :consumer, attributes: {
"messaging.destination" => topic_name,
"messaging.destination_kind" => "topic",
"messaging.kafka.consumer_group" => consumer_group_name,
"messaging.kafka.offset" => 0,
"messaging.kafka.partition" => 1,
"messaging.system" => "kafka"
})
described_class.new("consumer.consumed_one", event_payload).trace {}
end
it_behaves_like "traces message", "consumer.consumed_one", "consume topic"
it_behaves_like "traces message", "consumer.process_message", "consume topic"
it_behaves_like "traces message", "consumer.mark_as_consumed", "consume topic"

it_behaves_like "traces message with inbox", "consumer.inbox.consumed_one", "inbox inbox/name process"
it_behaves_like "traces message with inbox", "consumer.process_message", "inbox inbox/name process"
it_behaves_like "traces message with inbox", "consumer.mark_as_consumed", "inbox inbox/name process"

it "traces messages" do
expect(tracer).to receive(:in_span).with("consume batch", links: [], kind: :consumer, attributes: {
Expand All @@ -63,13 +84,5 @@
})
described_class.new("consumer.consumed_batch", event_payload_with_batch).trace {}
end

it "traces inbox message" do
expect(tracer).to receive(:in_span).with("inbox inbox/name process", kind: :consumer, attributes: {
"inbox.inbox_name" => "inbox/name",
"inbox.status" => "failure"
})
described_class.new("consumer.inbox.consumed_one", event_payload).trace {}
end
end
end

0 comments on commit 94fb4b6

Please sign in to comment.