Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
mensfeld committed Feb 5, 2025
1 parent 2725c0c commit 8885bbb
Show file tree
Hide file tree
Showing 28 changed files with 586 additions and 248 deletions.
26 changes: 26 additions & 0 deletions lib/karafka/web/pro/commanding/commands/partitions/pause.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# frozen_string_literal: true

# This code is part of Karafka Pro, a commercial component not licensed under LGPL.
# See LICENSE for details.

module Karafka
module Web
module Pro
module Commanding
module Commands
module Partitions
class Pause < Base
self.id = 'partitions.pause'

def call
Handlers::Partitions::Tracker.instance << params

Dispatcher.acceptance(params, process_id, id)
end
end
end
end
end
end
end
end
29 changes: 29 additions & 0 deletions lib/karafka/web/pro/commanding/commands/partitions/resume.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# frozen_string_literal: true

# This code is part of Karafka Pro, a commercial component not licensed under LGPL.
# See LICENSE for details.

module Karafka
module Web
module Pro
module Commanding
module Commands
module Partitions
class Resume < Base
self.id = 'partitions.resume'

# Dispatches the seek request into the appropriate filter and indicates that the
# seeking is in an acceptance state
def call
Handlers::Partitions::Tracker.instance << params

# Publish back info on who did this with all the details for inspection
Dispatcher.acceptance(params, process_id, id)
end
end
end
end
end
end
end
end
90 changes: 62 additions & 28 deletions lib/karafka/web/pro/commanding/handlers/partitions/listener.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,52 +20,86 @@ def on_connection_listener_fetch_loop(event)

tracker.each_for(listener.subscription_group.id) do |details|
topic = details.fetch(:topic)
partition_id = details.fetch(:partition_id).to_i
desired_offset = details.fetch(:offset)
prevent_overtaking = details.fetch(:prevent_overtaking)
partition_id = details.fetch(:partition_id)
coordinator = listener.coordinators.find_or_create(topic, partition_id)
force_unpause = details.fetch(:force_unpause)

if prevent_overtaking && coordinator.seek_offset
first_offset = coordinator.seek_offset
if details[:name] == 'partitions.resume'
coordinator.pause_tracker.expire
coordinator.pause_tracker.reset if details[:reset_attempts]

if first_offset >= desired_offset
dispatcher.result(
details.merge(status: 'applied'),
process_id,
'partitions.resume',
)
elsif details[:name] == 'partitions.pause'
if coordinator.pause_tracker.paused? && details[:prevent_override]
dispatcher.result(
details.merge(status: 'prevented'),
process_id,
'partitions.seek'
'partitions.resume',
)

return
end
end

if desired_offset >= 0
assigned = client.mark_as_consumed!(
Messages::Seek.new(topic, partition_id, desired_offset - 1)
duration = details[:duration] * 1_000
duration = 10 * 365 * 24 * 60 * 60 * 1000 if duration.zero?

coordinator.pause_tracker.pause(duration)
client.pause(topic, partition_id, nil, duration)

dispatcher.result(
details.merge(status: 'applied'),
process_id,
'partitions.pause',
)
else
desired_offset = details.fetch(:offset)
prevent_overtaking = details.fetch(:prevent_overtaking)
force_resume = details.fetch(:force_resume)

if prevent_overtaking && coordinator.seek_offset
first_offset = coordinator.seek_offset

if first_offset >= desired_offset
dispatcher.result(
details.merge(status: 'prevented'),
process_id,
'partitions.seek'
)

return
end
end

unless assigned
dispatcher.result(
details.merge(status: 'lost_partition'),
process_id,
'partitions.seek'
if desired_offset >= 0
assigned = client.mark_as_consumed!(
Messages::Seek.new(topic, partition_id, desired_offset - 1)
)

return
unless assigned
dispatcher.result(
details.merge(status: 'lost_partition'),
process_id,
'partitions.seek'
)

return
end
end
end

client.seek(Messages::Seek.new(topic, partition_id, desired_offset))
coordinator.seek_offset = desired_offset
coordinator.pause_tracker.reset
coordinator.pause_tracker.expire if force_unpause
client.seek(Messages::Seek.new(topic, partition_id, desired_offset))
coordinator.seek_offset = desired_offset
coordinator.pause_tracker.reset
coordinator.pause_tracker.expire if force_resume

dispatcher.result(
details.merge(status: 'applied'),
process_id,
'partitions.seek',
)
dispatcher.result(
details.merge(status: 'applied'),
process_id,
'partitions.seek',
)
end
end
end

Expand Down
4 changes: 4 additions & 0 deletions lib/karafka/web/pro/commanding/manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ def control(params)
Commands::Consumers::Quiet
when Commands::Partitions::Seek.id
Commands::Partitions::Seek
when Commands::Partitions::Resume.id
Commands::Partitions::Resume
when Commands::Partitions::Pause.id
Commands::Partitions::Pause
else
# We raise it and will be rescued, reported and ignored. We raise it as
# this should not happen unless there are version conflicts
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,24 @@ module Ui
module Controllers
module Consumers
module Partitions
# Base controller for all the partition related management stuff
class BaseController < ConsumersController
private

# Finds all the needed details and if not found raises a not found.
# This prevents most cases where something would change between visiting the edit
# page and dispatching no longer valid data.
# This prevents most cases where something would change between visiting the pages
# and dispatching no longer valid data.
#
#
# @param consumer_groups [Array<Karafka::Web::Ui::Models::ConsumerGroup>] current
# process consumer groups in use.
def bootstrap!(consumer_groups, process_id, subscription_group_id, topic, partition_id)
# @param process_id [String]
# @param subscription_group_id [String]
# @param topic [String]
# @param partition_id [Integer]
def bootstrap!(
process_id,
subscription_group_id,
topic,
partition_id
)
subscriptions(process_id)

@subscription_group_id = subscription_group_id
Expand All @@ -30,7 +37,11 @@ def bootstrap!(consumer_groups, process_id, subscription_group_id, topic, partit
@subscription_group = nil
@partition_stats = nil

consumer_groups.each do |consumer_group|
# Looks for the appropriate details aobut given partition and so on in the
# current process data. Since we operate in the context of the given process,
# it must have those details. If not it means that assignment most likely have
# changed and it is no longer valid anyhow.
@process.consumer_groups.each do |consumer_group|
consumer_group.subscription_groups.each do |subscription_group|
next unless subscription_group.id == @subscription_group_id

Expand All @@ -55,10 +66,11 @@ def bootstrap!(consumer_groups, process_id, subscription_group_id, topic, partit
topic.subscription_group.id == @subscription_group.id && topic.name == @topic
end

# May not be found when not all routing is available
@topic_lrj = @routing_topic && @routing_topic.long_running_job?
@subscription_group || raise(Karafka::Web::Errors::Ui::NotFoundError)
@partition_stats || raise(Karafka::Web::Errors::Ui::NotFoundError)
# May not be found when not all routing is available. In such cases we assume
# that topic is not LRJ and it's up to the end user to handle this correctly.
@topic_lrj = @routing_topic && @routing_topic.long_running_job?
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ module Ui
module Controllers
module Consumers
module Partitions
# Partition offset management controller in the context of current consumer process
# assignments
class OffsetsController < BaseController
self.sortable_attributes = %w[
id
Expand All @@ -19,27 +21,30 @@ class OffsetsController < BaseController
poll_state
].freeze

# Displays the list of currently assigned partitions to this process with the
# processing details and edit/pause options (when applicable). It is the starting
# point for all the management.
#
# @param process_id [String] id of the process we're interested in
def index(process_id)
subscriptions(process_id)

render
end

# Displays the edit page
# Displays the offset edit page with the edit form or a warning when not applicable
#
# @param process_id [String] id of the process we're interested in
# @param subscription_group_id [String]
# @param topic [String]
# @param partition_id [Integer]
def edit(process_id, subscription_group_id, topic, partition_id)
subscriptions(process_id)
bootstrap!(@process.consumer_groups, process_id, subscription_group_id, topic, partition_id)
bootstrap!(process_id, subscription_group_id, topic, partition_id)

render
end

# Triggers the offset change in the running process via the commanding
# Triggers the offset change in the running process via the commanding API
#
# @param process_id [String] id of the process we're interested in
# @param subscription_group_id [String]
Expand All @@ -48,9 +53,9 @@ def edit(process_id, subscription_group_id, topic, partition_id)
def update(process_id, subscription_group_id, topic, partition_id)
edit(process_id, subscription_group_id, topic, partition_id)

offset = @params[:offset].to_i
prevent_overtaking = @params[:prevent_overtaking] == 'on'
force_unpause = @params[:force_unpause] == 'on'
offset = params.int(:offset)
prevent_overtaking = params.bool(:prevent_overtaking)
force_resume = params.bool(:force_resume)

Commanding::Dispatcher.command(
Commanding::Commands::Partitions::Seek.id,
Expand All @@ -62,7 +67,7 @@ def update(process_id, subscription_group_id, topic, partition_id)
partition_id: partition_id,
offset: offset,
prevent_overtaking: prevent_overtaking,
force_unpause: force_unpause
force_resume: force_resume
}
)

Expand Down
Loading

0 comments on commit 8885bbb

Please sign in to comment.