@@ -7,9 +7,14 @@ class LivenessListener
7
7
include ListenerHelper
8
8
include KafkaConsumer ::Probes ::Probe
9
9
10
- def initialize ( timeout_sec : 10 )
10
+ ERROR_TYPE = "Liveness probe error"
11
+
12
+ def initialize ( timeout_sec : 10 , max_error_count : 10 )
11
13
@consumer_groups = Karafka ::App . routes . map ( &:name )
12
14
@timeout_sec = timeout_sec
15
+ @max_error_count = max_error_count
16
+ @error_count = 0
17
+ @error_backtrace = nil
13
18
@polls = { }
14
19
15
20
setup_subscription
@@ -18,16 +23,31 @@ def initialize(timeout_sec: 10)
18
23
def probe ( _env )
19
24
now = current_time
20
25
timed_out_polls = select_timed_out_polls ( now )
21
- return probe_ok groups : meta_from_polls ( polls , now ) if timed_out_polls . empty?
22
26
23
- probe_error failed_groups : meta_from_polls ( timed_out_polls , now )
27
+ if timed_out_polls . empty? && @error_count < @max_error_count
28
+ probe_ok groups : meta_from_polls ( polls , now ) if timed_out_polls . empty?
29
+ elsif @error_count >= @max_error_count
30
+ probe_error error_type : ERROR_TYPE , failed_librdkafka : { error_count : @error_count , error_backtrace : @error_backtrace }
31
+ else
32
+ probe_error error_type : ERROR_TYPE , failed_groups : meta_from_polls ( timed_out_polls , now )
33
+ end
24
34
end
25
35
26
36
def on_connection_listener_fetch_loop ( event )
27
37
consumer_group = event . payload [ :subscription_group ] . consumer_group
28
38
polls [ consumer_group . name ] = current_time
29
39
end
30
40
41
+ def on_error_occurred ( event )
42
+ type = event [ :type ]
43
+
44
+ return unless type == "librdkafka.error"
45
+ error = event [ :error ]
46
+
47
+ @error_backtrace ||= ( error . backtrace || [ ] ) . join ( "\n " )
48
+ @error_count += 1
49
+ end
50
+
31
51
private
32
52
33
53
attr_reader :polls , :timeout_sec , :consumer_groups
0 commit comments