diff --git a/CHANGELOG.md b/CHANGELOG.md index 9186259..d494f22 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,16 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ### Fixed +## [2.6.0] - 2024-07-01 + +### Added + +- Added instrumentation for methods `process_message` and `mark_as_consumed!` + +### Fixed + +- From `do_consume(message)` to `yield` + ## [2.5.0] - 2024-06-24 ### Added diff --git a/lib/sbmt/kafka_consumer/base_consumer.rb b/lib/sbmt/kafka_consumer/base_consumer.rb index f2fcdf3..95beef3 100644 --- a/lib/sbmt/kafka_consumer/base_consumer.rb +++ b/lib/sbmt/kafka_consumer/base_consumer.rb @@ -52,7 +52,7 @@ def with_instrumentation(message) "consumer.consumed_one", caller: self, message: message, trace_id: trace_id ) do - do_consume(message) + yield rescue SkipUndeserializableMessage => ex instrument_error(ex, message) logger.warn("skipping undeserializable message: #{ex.message}") @@ -87,6 +87,23 @@ def with_batch_instrumentation(messages) end end + def with_common_instrumentation(name, message) + @trace_id = SecureRandom.base58 + + logger.tagged( + trace_id: trace_id + ) do + ::Sbmt::KafkaConsumer.monitor.instrument( + "consumer.#{name}", + caller: self, + message: message, + trace_id: trace_id + ) do + yield + end + end + end + def do_consume(message) log_message(message) if log_payload? @@ -94,9 +111,13 @@ def do_consume(message) # so we trigger it explicitly to catch undeserializable message early message.payload - call_middlewares(message, middlewares) { process_message(message) } + with_common_instrumentation("process_message", message) do + call_middlewares(message, middlewares) { process_message(message) } + end - mark_as_consumed!(message) + with_common_instrumentation("mark_as_consumed", message) do + mark_as_consumed!(message) + end end def skip_on_error diff --git a/lib/sbmt/kafka_consumer/instrumentation/base_monitor.rb b/lib/sbmt/kafka_consumer/instrumentation/base_monitor.rb index 6249d7d..e367543 100644 --- a/lib/sbmt/kafka_consumer/instrumentation/base_monitor.rb +++ b/lib/sbmt/kafka_consumer/instrumentation/base_monitor.rb @@ -10,6 +10,8 @@ class BaseMonitor < Karafka::Instrumentation::Monitor consumer.consumed_one consumer.inbox.consumed_one consumer.consumed_batch + consumer.process_message + consumer.mark_as_consumed ].freeze def initialize diff --git a/lib/sbmt/kafka_consumer/instrumentation/logger_listener.rb b/lib/sbmt/kafka_consumer/instrumentation/logger_listener.rb index 3b31f12..9680172 100644 --- a/lib/sbmt/kafka_consumer/instrumentation/logger_listener.rb +++ b/lib/sbmt/kafka_consumer/instrumentation/logger_listener.rb @@ -32,6 +32,14 @@ def on_consumer_consumed_one(event) logger.info("Successfully consumed message in #{event.payload[:time]} ms") end + def on_consumer_mark_as_consumed(event) + logger.info("Processing message in #{event.payload[:time]} ms") + end + + def on_consumer_process_message(event) + logger.info("Commit offset in #{event.payload[:time]} ms") + end + # InboxConsumer events def on_consumer_inbox_consumed_one(event) logger.tagged(status: event[:status]) do diff --git a/lib/sbmt/kafka_consumer/instrumentation/open_telemetry_tracer.rb b/lib/sbmt/kafka_consumer/instrumentation/open_telemetry_tracer.rb index 5068cfa..f604777 100644 --- a/lib/sbmt/kafka_consumer/instrumentation/open_telemetry_tracer.rb +++ b/lib/sbmt/kafka_consumer/instrumentation/open_telemetry_tracer.rb @@ -6,6 +6,11 @@ module Sbmt module KafkaConsumer module Instrumentation class OpenTelemetryTracer < ::Sbmt::KafkaConsumer::Instrumentation::Tracer + CONSUMED_EVENTS = %w[ + consumer.process_message + consumer.mark_as_consumed + ].freeze + class << self def enabled? !!@enabled @@ -22,6 +27,7 @@ def trace(&block) return handle_consumed_one(&block) if @event_id == "consumer.consumed_one" return handle_consumed_batch(&block) if @event_id == "consumer.consumed_batch" return handle_inbox_consumed_one(&block) if @event_id == "consumer.inbox.consumed_one" + return handle_common_event(&block) if CONSUMED_EVENTS.include?(@event_id) return handle_error(&block) if @event_id == "error.occurred" yield @@ -79,6 +85,16 @@ def handle_inbox_consumed_one end end + def handle_common_event(&block) + return yield unless enabled? + + if @payload[:inbox_name].present? + handle_inbox_consumed_one(&block) + else + handle_consumed_one(&block) + end + end + def handle_error return yield unless enabled? diff --git a/lib/sbmt/kafka_consumer/instrumentation/sentry_tracer.rb b/lib/sbmt/kafka_consumer/instrumentation/sentry_tracer.rb index 4b83bdb..a66c9e0 100644 --- a/lib/sbmt/kafka_consumer/instrumentation/sentry_tracer.rb +++ b/lib/sbmt/kafka_consumer/instrumentation/sentry_tracer.rb @@ -13,8 +13,14 @@ class SentryTracer < ::Sbmt::KafkaConsumer::Instrumentation::Tracer consumer.inbox.consume_one ].freeze + EVENTS = %w[ + consumer.consumed_one + consumer.process_message + consumer.mark_as_consumed + ].freeze + def trace(&block) - return handle_consumed_one(&block) if @event_id == "consumer.consumed_one" + return handle_consumed_one(&block) if EVENTS.include?(@event_id) return handle_consumed_batch(&block) if @event_id == "consumer.consumed_batch" return handle_error(&block) if @event_id == "error.occurred" diff --git a/lib/sbmt/kafka_consumer/version.rb b/lib/sbmt/kafka_consumer/version.rb index 78b2219..bb86d65 100644 --- a/lib/sbmt/kafka_consumer/version.rb +++ b/lib/sbmt/kafka_consumer/version.rb @@ -2,6 +2,6 @@ module Sbmt module KafkaConsumer - VERSION = "2.5.0" + VERSION = "2.6.0" end end diff --git a/spec/sbmt/kafka_consumer/base_consumer_spec.rb b/spec/sbmt/kafka_consumer/base_consumer_spec.rb index 16f0e89..2e726e9 100644 --- a/spec/sbmt/kafka_consumer/base_consumer_spec.rb +++ b/spec/sbmt/kafka_consumer/base_consumer_spec.rb @@ -77,6 +77,8 @@ def consumed? it "logs message" do expect(Rails.logger).to receive(:info).with(/Successfully consumed message/) + expect(Rails.logger).to receive(:info).with(/Processing message/) + expect(Rails.logger).to receive(:info).with(/Commit offset/) expect(Rails.logger).to receive(:info).with(/#{payload}/) consume_with_sbmt_karafka diff --git a/spec/sbmt/kafka_consumer/inbox_consumer_spec.rb b/spec/sbmt/kafka_consumer/inbox_consumer_spec.rb index 03dcf42..b0026ad 100644 --- a/spec/sbmt/kafka_consumer/inbox_consumer_spec.rb +++ b/spec/sbmt/kafka_consumer/inbox_consumer_spec.rb @@ -43,6 +43,8 @@ it "creates inbox item" do expect(kafka_client).to receive(:mark_as_consumed!) expect(Rails.logger).to receive(:info).with(/Successfully consumed/).twice + expect(Rails.logger).to receive(:info).with(/Processing message/) + expect(Rails.logger).to receive(:info).with(/Commit offset/) expect { consume_with_sbmt_karafka } .to change(TestInboxItem, :count).by(1) .and increment_yabeda_counter(Yabeda.kafka_consumer.inbox_consumes) @@ -160,6 +162,8 @@ context "with poisoned message" do before do allow(Rails.logger).to receive(:info).with(/Successfully consumed/) + allow(Rails.logger).to receive(:info).with(/Processing message/) + allow(Rails.logger).to receive(:info).with(/Commit offset/) allow(Rails.logger).to receive(:error) end @@ -249,6 +253,8 @@ def extra_message_attrs(_message) it "merges with default inbox-item attributes" do expect(kafka_client).to receive(:mark_as_consumed!) expect(Rails.logger).to receive(:info).with(/Successfully consumed/).twice + expect(Rails.logger).to receive(:info).with(/Processing message/) + expect(Rails.logger).to receive(:info).with(/Commit offset/) expect { consume_with_sbmt_karafka }.to change(TestInboxItem, :count).by(1) expect(TestInboxItem.last.event_name).to eq("custom-value") end diff --git a/spec/sbmt/kafka_consumer/instrumentation/open_telemetry_tracer_spec.rb b/spec/sbmt/kafka_consumer/instrumentation/open_telemetry_tracer_spec.rb index a5240d6..30401a2 100644 --- a/spec/sbmt/kafka_consumer/instrumentation/open_telemetry_tracer_spec.rb +++ b/spec/sbmt/kafka_consumer/instrumentation/open_telemetry_tracer_spec.rb @@ -15,9 +15,34 @@ let(:consumer_group) { OpenStruct.new(id: consumer_group_name) } let(:consumer_topic) { OpenStruct.new(consumer_group: consumer_group) } let(:consumer) { OpenStruct.new(topic: consumer_topic, inbox_name: "inbox/name", event_name: nil) } - let(:event_payload) { OpenStruct.new(caller: consumer, message: message, inbox_name: "inbox/name", event_name: nil, status: "failure") } + let(:event_payload) { OpenStruct.new(caller: consumer, message: message, event_name: nil, status: "failure") } + let(:event_inbox_payload) { OpenStruct.new(caller: consumer, message: message, inbox_name: "inbox/name", event_name: nil, status: "failure") } let(:event_payload_with_batch) { OpenStruct.new(caller: consumer, messages: batch_messages, inbox_name: "inbox/name", event_name: nil, status: "failure") } + shared_examples "traces message" do |event_name, span_name| + it "traces #{event_name} message" do + expect(tracer).to receive(:in_span).with(span_name, links: nil, kind: :consumer, attributes: { + "messaging.destination" => topic_name, + "messaging.destination_kind" => "topic", + "messaging.kafka.consumer_group" => consumer_group_name, + "messaging.kafka.offset" => 0, + "messaging.kafka.partition" => 1, + "messaging.system" => "kafka" + }) + described_class.new(event_name, event_payload).trace {} + end + end + + shared_examples "traces message with inbox" do |event_name, span_name| + it "traces #{event_name} message" do + expect(tracer).to receive(:in_span).with(span_name, kind: :consumer, attributes: { + "inbox.inbox_name" => "inbox/name", + "inbox.status" => "failure" + }) + described_class.new(event_name, event_inbox_payload).trace {} + end + end + describe "when disabled" do before { described_class.enabled = false } @@ -39,17 +64,13 @@ allow(instrumentation_instance).to receive(:tracer).and_return(tracer) end - it "traces message" do - expect(tracer).to receive(:in_span).with("consume topic", links: nil, kind: :consumer, attributes: { - "messaging.destination" => topic_name, - "messaging.destination_kind" => "topic", - "messaging.kafka.consumer_group" => consumer_group_name, - "messaging.kafka.offset" => 0, - "messaging.kafka.partition" => 1, - "messaging.system" => "kafka" - }) - described_class.new("consumer.consumed_one", event_payload).trace {} - end + it_behaves_like "traces message", "consumer.consumed_one", "consume topic" + it_behaves_like "traces message", "consumer.process_message", "consume topic" + it_behaves_like "traces message", "consumer.mark_as_consumed", "consume topic" + + it_behaves_like "traces message with inbox", "consumer.inbox.consumed_one", "inbox inbox/name process" + it_behaves_like "traces message with inbox", "consumer.process_message", "inbox inbox/name process" + it_behaves_like "traces message with inbox", "consumer.mark_as_consumed", "inbox inbox/name process" it "traces messages" do expect(tracer).to receive(:in_span).with("consume batch", links: [], kind: :consumer, attributes: { @@ -63,13 +84,5 @@ }) described_class.new("consumer.consumed_batch", event_payload_with_batch).trace {} end - - it "traces inbox message" do - expect(tracer).to receive(:in_span).with("inbox inbox/name process", kind: :consumer, attributes: { - "inbox.inbox_name" => "inbox/name", - "inbox.status" => "failure" - }) - described_class.new("consumer.inbox.consumed_one", event_payload).trace {} - end end end