Skip to content

Commit

Permalink
[DEX-2291] feat: add option max_error_count
Browse files Browse the repository at this point in the history
  • Loading branch information
Arlantir committed Jun 25, 2024
1 parent f3d0c62 commit 82feac8
Show file tree
Hide file tree
Showing 9 changed files with 89 additions and 8 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

### Fixed

## [2.5.0] - 2024-06-24

### Added

- Added option `max_error_count` for liveness probes, which is triggered when `librdkafka.error`

## [2.4.1] - 2024-06-15

### Fixed
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ default: &default
enabled: true
path: "/liveness"
timeout: 15
max_error_count: 15 # default 10
metrics: # optional section
port: 9090
path: "/metrics"
Expand Down
1 change: 1 addition & 0 deletions lib/sbmt/kafka_consumer/config/probes/liveness_probe.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ class Sbmt::KafkaConsumer::Config::Probes::LivenessProbe < Dry::Struct
.optional
.default("/liveness")
attribute :timeout, Sbmt::KafkaConsumer::Types::Coercible::Integer.optional.default(10)
attribute :max_error_count, Sbmt::KafkaConsumer::Types::Coercible::Integer.optional.default(10)
end
26 changes: 23 additions & 3 deletions lib/sbmt/kafka_consumer/instrumentation/liveness_listener.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,14 @@ class LivenessListener
include ListenerHelper
include KafkaConsumer::Probes::Probe

def initialize(timeout_sec: 10)
ERROR_TYPE = "Liveness probe error"

def initialize(timeout_sec: 10, max_error_count: 10)
@consumer_groups = Karafka::App.routes.map(&:name)
@timeout_sec = timeout_sec
@max_error_count = max_error_count
@error_count = 0
@error_backtrace = nil
@polls = {}

setup_subscription
Expand All @@ -18,16 +23,31 @@ def initialize(timeout_sec: 10)
def probe(_env)
now = current_time
timed_out_polls = select_timed_out_polls(now)
return probe_ok groups: meta_from_polls(polls, now) if timed_out_polls.empty?

probe_error failed_groups: meta_from_polls(timed_out_polls, now)
if timed_out_polls.empty? && @error_count < @max_error_count
probe_ok groups: meta_from_polls(polls, now) if timed_out_polls.empty?
elsif @error_count >= @max_error_count
probe_error error_type: ERROR_TYPE, failed_librdkafka: {error_count: @error_count, error_backtrace: @error_backtrace}
else
probe_error error_type: ERROR_TYPE, failed_groups: meta_from_polls(timed_out_polls, now)
end
end

def on_connection_listener_fetch_loop(event)
consumer_group = event.payload[:subscription_group].consumer_group
polls[consumer_group.name] = current_time
end

def on_error_occurred(event)
type = event[:type]

return unless type == "librdkafka.error"
error = event[:error]

@error_backtrace ||= (error.backtrace || []).join("\n")
@error_count += 1
end

private

attr_reader :polls, :timeout_sec, :consumer_groups
Expand Down
2 changes: 1 addition & 1 deletion lib/sbmt/kafka_consumer/probes/host.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def health_check_app(config)
liveness = config[:liveness]
if liveness[:enabled]
c.probe liveness[:path], Sbmt::KafkaConsumer::Instrumentation::LivenessListener.new(
timeout_sec: liveness[:timeout]
timeout_sec: liveness[:timeout], max_error_count: liveness[:max_error_count]
)
end

Expand Down
1 change: 1 addition & 0 deletions lib/sbmt/kafka_consumer/probes/probe.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ def probe_ok(extra_meta = {})
end

def probe_error(extra_meta = {})
KafkaConsumer.logger.error("probe error meta: #{meta.merge(extra_meta).inspect}")
[500, HEADERS, [meta.merge(extra_meta).to_json]]
end

Expand Down
2 changes: 1 addition & 1 deletion lib/sbmt/kafka_consumer/version.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@

module Sbmt
module KafkaConsumer
VERSION = "2.4.1"
VERSION = "2.5.0"
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,14 @@
expect(probe).to eq [
500,
{"Content-Type" => "application/json"},
[{failed_groups: {"CONSUMER_GROUP" => {had_poll: false}}}.to_json]
[
{
error_type: Sbmt::KafkaConsumer::Instrumentation::LivenessListener::ERROR_TYPE,
failed_groups:
{"CONSUMER_GROUP" =>
{had_poll: false}}
}.to_json
]
]
end
end
Expand Down Expand Up @@ -63,6 +70,7 @@
{"Content-Type" => "application/json"},
[
{
error_type: Sbmt::KafkaConsumer::Instrumentation::LivenessListener::ERROR_TYPE,
failed_groups: {
"CONSUMER_GROUP" => {
had_poll: true,
Expand All @@ -76,4 +84,39 @@
end
end
end

context "with librdkafka errors" do
let(:error_event) { {type: "librdkafka.error", error: StandardError.new("Test error")} }

before do
allow(error_event[:error]).to receive(:backtrace).and_return(["line 1", "line 2"])
end

it "increments error count and stores backtrace" do
expect { service.on_error_occurred(error_event) }.to change { service.instance_variable_get(:@error_count) }.by(1)
expect(service.instance_variable_get(:@error_backtrace)).to eq("line 1\nline 2")
end

context "when error count exceeds max_error_count" do
before do
10.times { service.on_error_occurred(error_event) }
end

it "returns error with error count and backtrace" do
expect(probe).to eq [
500,
{"Content-Type" => "application/json"},
[
{
error_type: Sbmt::KafkaConsumer::Instrumentation::LivenessListener::ERROR_TYPE,
failed_librdkafka: {
error_count: 10,
error_backtrace: "line 1\nline 2"
}
}.to_json
]
]
end
end
end
end
13 changes: 11 additions & 2 deletions spec/sbmt/kafka_consumer/probes/probe_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ def probe(_env); end

let(:env) { double(:env) }
let(:service) { subject_klass.new }
let(:logger) { instance_double(Logger) }

before do
allow(Sbmt::KafkaConsumer).to receive(:logger).and_return(logger)
allow(logger).to receive(:error)
end

describe ".call" do
it "calls probe with env" do
Expand All @@ -36,8 +42,11 @@ def probe(_env); end
end

describe ".probe_error" do
it "returns 500 with meta" do
expect(service.probe_error).to eq [500, {"Content-Type" => "application/json"}, ["{}"]]
it "logs the error message and returns 500 with meta" do
error_meta = {foo: "bar"}
expect(service.probe_error(error_meta)).to eq [500, {"Content-Type" => "application/json"}, [error_meta.to_json]]

expect(logger).to have_received(:error).with("probe error meta: #{error_meta.inspect}")
end
end
end
Expand Down

0 comments on commit 82feac8

Please sign in to comment.