diff --git a/CHANGELOG.md b/CHANGELOG.md index 0748e79..9186259 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,12 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ### Fixed +## [2.5.0] - 2024-06-24 + +### Added + +- Added option `max_error_count` for liveness probes, which is triggered when `librdkafka.error` + ## [2.4.1] - 2024-06-15 ### Fixed diff --git a/README.md b/README.md index 9697f3c..207c3d2 100644 --- a/README.md +++ b/README.md @@ -92,6 +92,7 @@ default: &default enabled: true path: "/liveness" timeout: 15 + max_error_count: 15 # default 10 metrics: # optional section port: 9090 path: "/metrics" diff --git a/lib/sbmt/kafka_consumer/config/probes/liveness_probe.rb b/lib/sbmt/kafka_consumer/config/probes/liveness_probe.rb index f8fc650..a811c4a 100644 --- a/lib/sbmt/kafka_consumer/config/probes/liveness_probe.rb +++ b/lib/sbmt/kafka_consumer/config/probes/liveness_probe.rb @@ -8,4 +8,5 @@ class Sbmt::KafkaConsumer::Config::Probes::LivenessProbe < Dry::Struct .optional .default("/liveness") attribute :timeout, Sbmt::KafkaConsumer::Types::Coercible::Integer.optional.default(10) + attribute :max_error_count, Sbmt::KafkaConsumer::Types::Coercible::Integer.optional.default(10) end diff --git a/lib/sbmt/kafka_consumer/instrumentation/liveness_listener.rb b/lib/sbmt/kafka_consumer/instrumentation/liveness_listener.rb index 1520292..9a831b9 100644 --- a/lib/sbmt/kafka_consumer/instrumentation/liveness_listener.rb +++ b/lib/sbmt/kafka_consumer/instrumentation/liveness_listener.rb @@ -7,9 +7,14 @@ class LivenessListener include ListenerHelper include KafkaConsumer::Probes::Probe - def initialize(timeout_sec: 10) + ERROR_TYPE = "Liveness probe error" + + def initialize(timeout_sec: 10, max_error_count: 10) @consumer_groups = Karafka::App.routes.map(&:name) @timeout_sec = timeout_sec + @max_error_count = max_error_count + @error_count = 0 + @error_backtrace = nil @polls = {} setup_subscription @@ -18,9 +23,14 @@ def initialize(timeout_sec: 10) def probe(_env) now = current_time timed_out_polls = select_timed_out_polls(now) - return probe_ok groups: meta_from_polls(polls, now) if timed_out_polls.empty? - probe_error failed_groups: meta_from_polls(timed_out_polls, now) + if timed_out_polls.empty? && @error_count < @max_error_count + probe_ok groups: meta_from_polls(polls, now) if timed_out_polls.empty? + elsif @error_count >= @max_error_count + probe_error error_type: ERROR_TYPE, failed_librdkafka: {error_count: @error_count, error_backtrace: @error_backtrace} + else + probe_error error_type: ERROR_TYPE, failed_groups: meta_from_polls(timed_out_polls, now) + end end def on_connection_listener_fetch_loop(event) @@ -28,6 +38,16 @@ def on_connection_listener_fetch_loop(event) polls[consumer_group.name] = current_time end + def on_error_occurred(event) + type = event[:type] + + return unless type == "librdkafka.error" + error = event[:error] + + @error_backtrace ||= (error.backtrace || []).join("\n") + @error_count += 1 + end + private attr_reader :polls, :timeout_sec, :consumer_groups diff --git a/lib/sbmt/kafka_consumer/probes/host.rb b/lib/sbmt/kafka_consumer/probes/host.rb index 85e5d82..e796098 100644 --- a/lib/sbmt/kafka_consumer/probes/host.rb +++ b/lib/sbmt/kafka_consumer/probes/host.rb @@ -23,7 +23,7 @@ def health_check_app(config) liveness = config[:liveness] if liveness[:enabled] c.probe liveness[:path], Sbmt::KafkaConsumer::Instrumentation::LivenessListener.new( - timeout_sec: liveness[:timeout] + timeout_sec: liveness[:timeout], max_error_count: liveness[:max_error_count] ) end diff --git a/lib/sbmt/kafka_consumer/probes/probe.rb b/lib/sbmt/kafka_consumer/probes/probe.rb index 7ac2e64..d007516 100644 --- a/lib/sbmt/kafka_consumer/probes/probe.rb +++ b/lib/sbmt/kafka_consumer/probes/probe.rb @@ -19,6 +19,7 @@ def probe_ok(extra_meta = {}) end def probe_error(extra_meta = {}) + KafkaConsumer.logger.error("probe error meta: #{meta.merge(extra_meta).inspect}") [500, HEADERS, [meta.merge(extra_meta).to_json]] end diff --git a/lib/sbmt/kafka_consumer/version.rb b/lib/sbmt/kafka_consumer/version.rb index 5a418e2..78b2219 100644 --- a/lib/sbmt/kafka_consumer/version.rb +++ b/lib/sbmt/kafka_consumer/version.rb @@ -2,6 +2,6 @@ module Sbmt module KafkaConsumer - VERSION = "2.4.1" + VERSION = "2.5.0" end end diff --git a/spec/sbmt/kafka_consumer/instrumentation/liveness_listener_spec.rb b/spec/sbmt/kafka_consumer/instrumentation/liveness_listener_spec.rb index 693a112..2317b63 100644 --- a/spec/sbmt/kafka_consumer/instrumentation/liveness_listener_spec.rb +++ b/spec/sbmt/kafka_consumer/instrumentation/liveness_listener_spec.rb @@ -19,7 +19,14 @@ expect(probe).to eq [ 500, {"Content-Type" => "application/json"}, - [{failed_groups: {"CONSUMER_GROUP" => {had_poll: false}}}.to_json] + [ + { + error_type: Sbmt::KafkaConsumer::Instrumentation::LivenessListener::ERROR_TYPE, + failed_groups: + {"CONSUMER_GROUP" => + {had_poll: false}} + }.to_json + ] ] end end @@ -63,6 +70,7 @@ {"Content-Type" => "application/json"}, [ { + error_type: Sbmt::KafkaConsumer::Instrumentation::LivenessListener::ERROR_TYPE, failed_groups: { "CONSUMER_GROUP" => { had_poll: true, @@ -76,4 +84,39 @@ end end end + + context "with librdkafka errors" do + let(:error_event) { {type: "librdkafka.error", error: StandardError.new("Test error")} } + + before do + allow(error_event[:error]).to receive(:backtrace).and_return(["line 1", "line 2"]) + end + + it "increments error count and stores backtrace" do + expect { service.on_error_occurred(error_event) }.to change { service.instance_variable_get(:@error_count) }.by(1) + expect(service.instance_variable_get(:@error_backtrace)).to eq("line 1\nline 2") + end + + context "when error count exceeds max_error_count" do + before do + 10.times { service.on_error_occurred(error_event) } + end + + it "returns error with error count and backtrace" do + expect(probe).to eq [ + 500, + {"Content-Type" => "application/json"}, + [ + { + error_type: Sbmt::KafkaConsumer::Instrumentation::LivenessListener::ERROR_TYPE, + failed_librdkafka: { + error_count: 10, + error_backtrace: "line 1\nline 2" + } + }.to_json + ] + ] + end + end + end end diff --git a/spec/sbmt/kafka_consumer/probes/probe_spec.rb b/spec/sbmt/kafka_consumer/probes/probe_spec.rb index 93592aa..989073b 100644 --- a/spec/sbmt/kafka_consumer/probes/probe_spec.rb +++ b/spec/sbmt/kafka_consumer/probes/probe_spec.rb @@ -13,6 +13,12 @@ def probe(_env); end let(:env) { double(:env) } let(:service) { subject_klass.new } + let(:logger) { instance_double(Logger) } + + before do + allow(Sbmt::KafkaConsumer).to receive(:logger).and_return(logger) + allow(logger).to receive(:error) + end describe ".call" do it "calls probe with env" do @@ -36,8 +42,11 @@ def probe(_env); end end describe ".probe_error" do - it "returns 500 with meta" do - expect(service.probe_error).to eq [500, {"Content-Type" => "application/json"}, ["{}"]] + it "logs the error message and returns 500 with meta" do + error_meta = {foo: "bar"} + expect(service.probe_error(error_meta)).to eq [500, {"Content-Type" => "application/json"}, [error_meta.to_json]] + + expect(logger).to have_received(:error).with("probe error meta: #{error_meta.inspect}") end end end