From 64e43eaefc6c802cec11e4e364b20a34deecd335 Mon Sep 17 00:00:00 2001 From: Maciej Mensfeld Date: Sat, 6 Jan 2024 18:30:38 +0100 Subject: [PATCH 1/3] support for periodics --- CHANGELOG.md | 1 + .../web/tracking/consumers/contracts/job.rb | 2 +- .../consumers/listeners/processing.rb | 34 +++++++++++++++++-- 3 files changed, 33 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e0a79cac..7f0c1814 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ - **[Feature]** Introduce "Data transfers" chart with data received and data sent to the cluster. - **[Feature]** Introduce ability to download raw payloads. - **[Feature]** Introduce ability to download deserialized message payload as JSON. +- [Enhancement] Support Periodic Jobs reporting. - [Enhancement] Support multiplexed subscription groups. - [Enhancement] Split cluster info into two tabs, one for brokers and one for topics with partitions. - [Enhancement] Track pending jobs. Pending jobs are jobs that are not yet scheduled for execution by advanced schedulers. diff --git a/lib/karafka/web/tracking/consumers/contracts/job.rb b/lib/karafka/web/tracking/consumers/contracts/job.rb index 2df7ba28..aca62813 100644 --- a/lib/karafka/web/tracking/consumers/contracts/job.rb +++ b/lib/karafka/web/tracking/consumers/contracts/job.rb @@ -18,7 +18,7 @@ class Job < Web::Contracts::Base required(:last_offset) { |val| val.is_a?(Integer) && (val >= 0 || val == -1001) } required(:committed_offset) { |val| val.is_a?(Integer) } required(:messages) { |val| val.is_a?(Integer) && val >= 0 } - required(:type) { |val| %w[consume revoked shutdown].include?(val) } + required(:type) { |val| %w[consume revoked shutdown tick].include?(val) } required(:tags) { |val| val.is_a?(Karafka::Core::Taggable::Tags) } # -1 can be here for workless flows required(:consumption_lag) { |val| val.is_a?(Integer) && (val >= 0 || val == -1) } diff --git a/lib/karafka/web/tracking/consumers/listeners/processing.rb b/lib/karafka/web/tracking/consumers/listeners/processing.rb index 1af4978f..19219a32 100644 --- a/lib/karafka/web/tracking/consumers/listeners/processing.rb +++ b/lib/karafka/web/tracking/consumers/listeners/processing.rb @@ -21,6 +21,7 @@ def on_worker_processed(event) consume revoked shutdown + tick ].each do |action| # Tracks the job that is going to be scheduled so we can also display pending jobs class_eval <<~RUBY, __FILE__, __LINE__ + 1 @@ -46,7 +47,6 @@ def on_consumer_consume(event) messages_count = consumer.messages.size jid = job_id(consumer, 'consume') job_details = job_details(consumer, 'consume') - job_details[:status] = 'running' track do |sampler| # We count batches and messages prior to the execution, so they are tracked even @@ -69,6 +69,8 @@ def on_error_occurred(event) 'revoked' when 'consumer.shutdown.error' 'shutdown' + when 'consumer.tick.error' + 'tick' # This is not a user facing execution flow, but internal system one # that is why it will not be reported as a separate job for the UI when 'consumer.idle.error' @@ -151,6 +153,29 @@ def on_consumer_shutdown(event) end end + # @param event [Karafka::Core::Monitoring::Event] + def on_consumer_tick(event) + consumer = event.payload[:caller] + jid = job_id(consumer, 'tick') + job_details = job_details(consumer, 'tick') + + track do |sampler| + sampler.jobs[jid] = job_details + end + end + + # Removes the job from running jobs + # + # @param event [Karafka::Core::Monitoring::Event] + def on_consumer_ticked(event) + consumer = event.payload[:caller] + jid = job_id(consumer, 'tick') + + track do |sampler| + sampler.jobs.delete(jid) + end + end + private # Generates a job id that we can use to track jobs in an unique way @@ -177,7 +202,9 @@ def job_details(consumer, type) last_offset: consumer.messages.metadata.last_offset, processing_lag: consumer.messages.metadata.processing_lag, consumption_lag: consumer.messages.metadata.consumption_lag, - committed_offset: consumer.coordinator.seek_offset - 1, + # Committed offset may be -1 when there is no committed offset. This can happen in + # case of ticking that started before any consumption job happened + committed_offset: consumer.coordinator.seek_offset.to_i - 1, # In theory this is redundant because we have first and last offset, but it is # needed because VPs do not have linear count. For VPs first and last offset # will be further away than the total messages count for a particular VP @@ -185,7 +212,8 @@ def job_details(consumer, type) consumer: consumer.class.to_s, consumer_group: consumer.topic.consumer_group.id, type: type, - tags: consumer.tags + tags: consumer.tags, + status: 'running' } end end From c178a2ce0d4cb6893392fdbeedc4dda9a641575f Mon Sep 17 00:00:00 2001 From: Maciej Mensfeld Date: Sat, 6 Jan 2024 18:55:10 +0100 Subject: [PATCH 2/3] used checks --- .../consumers/listeners/processing.rb | 124 ++++++------------ .../web/ui/pro/controllers/jobs_spec.rb | 4 + 2 files changed, 47 insertions(+), 81 deletions(-) diff --git a/lib/karafka/web/tracking/consumers/listeners/processing.rb b/lib/karafka/web/tracking/consumers/listeners/processing.rb index 19219a32..5de6b3de 100644 --- a/lib/karafka/web/tracking/consumers/listeners/processing.rb +++ b/lib/karafka/web/tracking/consumers/listeners/processing.rb @@ -57,6 +57,19 @@ def on_consumer_consume(event) end end + # Collect info about consumption event that occurred and its metrics + # Removes the job from running jobs + # + # @param event [Karafka::Core::Monitoring::Event] + def on_consumer_consumed(event) + consumer = event.payload[:caller] + jid = job_id(consumer, 'consume') + + track do |sampler| + sampler.jobs.delete(jid) + end + end + # Removes failed job from active jobs # # @param event [Karafka::Core::Monitoring::Event] @@ -90,90 +103,39 @@ def on_error_occurred(event) end end - # Collect info about consumption event that occurred and its metrics - # Removes the job from running jobs - # - # @param event [Karafka::Core::Monitoring::Event] - def on_consumer_consumed(event) - consumer = event.payload[:caller] - jid = job_id(consumer, 'consume') - - track do |sampler| - sampler.jobs.delete(jid) - end - end - - # Stores this job details - # - # @param event [Karafka::Core::Monitoring::Event] - def on_consumer_revoke(event) - consumer = event.payload[:caller] - jid = job_id(consumer, 'revoked') - job_details = job_details(consumer, 'revoked') - - track do |sampler| - sampler.jobs[jid] = job_details - end - end - - # Removes the job from running jobs - # - # @param event [Karafka::Core::Monitoring::Event] - def on_consumer_revoked(event) - consumer = event.payload[:caller] - jid = job_id(consumer, 'revoked') - - track do |sampler| - sampler.jobs.delete(jid) - end - end - - # Stores this job details - # - # @param event [Karafka::Core::Monitoring::Event] - def on_consumer_shutting_down(event) - consumer = event.payload[:caller] - jid = job_id(consumer, 'shutdown') - job_details = job_details(consumer, 'shutdown') - - track do |sampler| - sampler.jobs[jid] = job_details - end - end - - # Removes the job from running jobs - # - # @param event [Karafka::Core::Monitoring::Event] - def on_consumer_shutdown(event) - consumer = event.payload[:caller] - jid = job_id(consumer, 'shutdown') - - track do |sampler| - sampler.jobs.delete(jid) - end - end - - # @param event [Karafka::Core::Monitoring::Event] - def on_consumer_tick(event) - consumer = event.payload[:caller] - jid = job_id(consumer, 'tick') - job_details = job_details(consumer, 'tick') + # Consume has a bit different reporting flow than other jobs because it bumps certain + # counters that other jobs do not. This is why it is defined above separately + [ + [:revoke, :revoked, 'revoked'], + [:shutting_down, :shutdown, 'shutdown'], + [:tick, :ticked, 'tick'] + ].each do |pre, post, action| + class_eval <<~METHOD, __FILE__, __LINE__ + 1 + # Stores this job details + # + # @param event [Karafka::Core::Monitoring::Event] + def on_consumer_#{pre}(event) + consumer = event.payload[:caller] + jid = job_id(consumer, '#{action}') + job_details = job_details(consumer, '#{action}') - track do |sampler| - sampler.jobs[jid] = job_details - end - end + track do |sampler| + sampler.jobs[jid] = job_details + end + end - # Removes the job from running jobs - # - # @param event [Karafka::Core::Monitoring::Event] - def on_consumer_ticked(event) - consumer = event.payload[:caller] - jid = job_id(consumer, 'tick') + # Removes the job from running jobs + # + # @param event [Karafka::Core::Monitoring::Event] + def on_consumer_#{post}(event) + consumer = event.payload[:caller] + jid = job_id(consumer, '#{action}') - track do |sampler| - sampler.jobs.delete(jid) - end + track do |sampler| + sampler.jobs.delete(jid) + end + end + METHOD end private diff --git a/spec/lib/karafka/web/ui/pro/controllers/jobs_spec.rb b/spec/lib/karafka/web/ui/pro/controllers/jobs_spec.rb index d8531c92..ccf1f1a0 100644 --- a/spec/lib/karafka/web/ui/pro/controllers/jobs_spec.rb +++ b/spec/lib/karafka/web/ui/pro/controllers/jobs_spec.rb @@ -174,6 +174,10 @@ end end end + + context 'when we visit shutdown, revoked and tick jobs' do + pending + end end describe '#pending' do From 13da1e505d0bb09bdceea2c1563459f19d977389 Mon Sep 17 00:00:00 2001 From: Maciej Mensfeld Date: Sat, 6 Jan 2024 18:56:18 +0100 Subject: [PATCH 3/3] remove pending --- spec/lib/karafka/web/ui/pro/controllers/jobs_spec.rb | 4 ---- 1 file changed, 4 deletions(-) diff --git a/spec/lib/karafka/web/ui/pro/controllers/jobs_spec.rb b/spec/lib/karafka/web/ui/pro/controllers/jobs_spec.rb index ccf1f1a0..d8531c92 100644 --- a/spec/lib/karafka/web/ui/pro/controllers/jobs_spec.rb +++ b/spec/lib/karafka/web/ui/pro/controllers/jobs_spec.rb @@ -174,10 +174,6 @@ end end end - - context 'when we visit shutdown, revoked and tick jobs' do - pending - end end describe '#pending' do