Skip to content

Commit

Permalink
handle signals through UI (#151)
Browse files Browse the repository at this point in the history
  • Loading branch information
bedrock-adam authored Nov 9, 2024
1 parent 1cf4ffe commit 9732e97
Show file tree
Hide file tree
Showing 10 changed files with 184 additions and 69 deletions.
16 changes: 16 additions & 0 deletions db/migrate/create_outboxer_signals.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
class CreateOutboxerSignals < ActiveRecord::Migration[6.1]
def up
ActiveRecord::Base.transaction do
create_table :outboxer_signals do |t|
t.string :name, limit: 9, null: false
t.references :publisher, foreign_key: { to_table: :outboxer_publishers }, null: false

t.timestamps
end
end
end

def down
drop_table :outboxer_signals
end
end
4 changes: 1 addition & 3 deletions lib/outboxer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@
require_relative "outboxer/models/message"

require_relative "outboxer/models/publisher"

require_relative "outboxer/socket"
require_relative "outboxer/process"
require_relative "outboxer/models/signal"

require_relative "outboxer/logger"
require_relative "outboxer/database"
Expand Down
3 changes: 3 additions & 0 deletions lib/outboxer/models/publisher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ module Models
class Publisher < ::ActiveRecord::Base
self.table_name = :outboxer_publishers

has_many :signals, class_name: 'Outboxer::Models::Signal',
foreign_key: 'publisher_id', dependent: :destroy

validates :name, presence: true, length: { maximum: 263 }
# 255 (hostname) + 1 (colon) + 7 (pid)

Expand Down
12 changes: 12 additions & 0 deletions lib/outboxer/models/signal.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
module Outboxer
module Models
class Signal < ::ActiveRecord::Base
self.table_name = :outboxer_signals

validates :name, presence: true, length: { maximum: 9 }

belongs_to :publisher, class_name: 'Outboxer::Models::Publisher', foreign_key: 'publisher_id'
validates :publisher_id, presence: true
end
end
end
11 changes: 0 additions & 11 deletions lib/outboxer/process.rb

This file was deleted.

109 changes: 78 additions & 31 deletions lib/outboxer/publisher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def delete(id:, current_time: Time.now)
ActiveRecord::Base.transaction do
begin
publisher = Models::Publisher.lock.find_by!(id: id)
publisher.signals.destroy_all
publisher.destroy!
rescue ActiveRecord::RecordNotFound
# no op
Expand Down Expand Up @@ -98,6 +99,22 @@ def terminate(id:, current_time: Time.now)
end
end

def signal(id:, name:, current_utc_time: Time.now.utc)
ActiveRecord::Base.connection_pool.with_connection do
ActiveRecord::Base.transaction do
begin
publisher = Models::Publisher.lock.find(id)
rescue ActiveRecord::RecordNotFound => error
raise NotFound.new(id: id), cause: error
end

publisher.signals.create!(name: name, created_at: current_utc_time)

nil
end
end
end

# :nocov:
def sleep(duration, start_time:, tick_interval:, signal_read:, process:, kernel:)
while (
Expand Down Expand Up @@ -128,8 +145,10 @@ def trap_signals(id:)
def create_publisher_threads(id:, name:,
queue:, concurrency:,
logger:, time:, kernel:, &block)
concurrency.times.map do
concurrency.times.each_with_index.map do |_, index|
Thread.new do
Thread.current.name = "outboxer.publisher.#{index + 1}"

while (message = queue.pop)
break if message.nil?

Expand Down Expand Up @@ -183,6 +202,8 @@ def create_heartbeat_thread(id:, name:,
heartbeat_interval:, tick_interval:, signal_read:,
logger:, time:, socket:, process:, kernel:)
Thread.new do
Thread.current.name = "outboxer.heatbeat"

while @status != Status::TERMINATING
begin
current_time = time.now
Expand All @@ -200,6 +221,13 @@ def create_heartbeat_thread(id:, name:,
raise NotFound.new(id: id), cause: error
end

signal = publisher.signals.order(created_at: :asc).first

if !signal.nil?
handle_signal(id: id, name: signal.name, logger: logger, process: process)
signal.destroy
end

end_rtt = process.clock_gettime(process::CLOCK_MONOTONIC)
rtt = end_rtt - start_rtt

Expand Down Expand Up @@ -231,8 +259,15 @@ def create_heartbeat_thread(id:, name:,
tick_interval: tick_interval,
process: process, kernel: kernel)

rescue NotFound => e
logger.fatal("Thread TID-#{(Thread.object_id ^ process.pid).to_s(36)} #{Thread.current.name}")
logger.fatal("#{e.class} #{e.message}")
logger.fatal(e.backtrace.join("\n"))

terminate(id: id)
rescue StandardError => e
logger.error(e.message)
logger.error("Thread TID-#{(Thread.object_id ^ process.pid).to_s(36)} #{Thread.current.name}")
logger.error("#{e.class} #{e.message}")
logger.error(e.backtrace.join("\n"))

Publisher.sleep(
Expand All @@ -241,9 +276,9 @@ def create_heartbeat_thread(id:, name:,
start_time: process.clock_gettime(process::CLOCK_MONOTONIC),
tick_interval: tick_interval,
process: process, kernel: kernel)

rescue NotFound, Exception => e
logger.fatal(e.message)
rescue Exception => e
logger.fatal("Thread TID-#{(Thread.object_id ^ process.pid).to_s(36)} #{Thread.current.name}")
logger.fatal("#{e.class} #{e.message}")
logger.fatal(e.backtrace.join("\n"))

terminate(id: id)
Expand All @@ -252,6 +287,41 @@ def create_heartbeat_thread(id:, name:,
end
end

def handle_signal(id:, name:, logger:, process:)
case name
when 'TTIN'
Thread.list.each_with_index do |thread, index|
logger.info "Thread TID-#{(thread.object_id ^ process.pid).to_s(36)} #{thread.name}"

if thread.backtrace
logger.info thread.backtrace.join("\n")
else
logger.info "<no backtrace available>"
end
end
when 'TSTP'
begin
stop(id: id)
rescue NotFound => e
logger.fatal(e.message)
logger.fatal(e.backtrace.join("\n"))

terminate(id: id)
end
when 'CONT'
begin
continue(id: id)
rescue NotFound => e
logger.fatal(e.message)
logger.fatal(e.backtrace.join("\n"))

terminate(id: id)
end
when 'INT', 'TERM'
terminate(id: id)
end
end

def publish(
name: "#{::Socket.gethostname}:#{::Process.pid}",
batch_size: 100, concurrency: 1,
Expand All @@ -260,6 +330,8 @@ def publish(
time: ::Time, socket: ::Socket, process: ::Process, kernel: ::Kernel,
&block
)
Thread.current.name = "outboxer.main"

logger.info "Outboxer v#{Outboxer::VERSION} publishing in ruby #{RUBY_VERSION} "\
"(#{RUBY_RELEASE_DATE} revision #{RUBY_REVISION[0, 10]}) [#{RUBY_PLATFORM}]"

Expand Down Expand Up @@ -308,32 +380,7 @@ def publish(
if IO.select([signal_read], nil, nil, 0)
signal_name = signal_read.gets.strip rescue nil

case signal_name
when 'TTIN'
Thread.list.each_with_index do |thread, index|
logger.info thread.backtrace.join("\n") if thread.backtrace
end
when 'TSTP'
begin
stop(id: id)
rescue NotFound => e
logger.fatal(e.message)
logger.fatal(e.backtrace.join("\n"))

terminate(id: id)
end
when 'CONT'
begin
continue(id: id)
rescue NotFound => e
logger.fatal(e.message)
logger.fatal(e.backtrace.join("\n"))

terminate(id: id)
end
when 'INT', 'TERM'
terminate(id: id)
end
handle_signal(id: id, name: signal_name, logger: logger, process: process)
end
end

Expand Down
13 changes: 0 additions & 13 deletions lib/outboxer/socket.rb

This file was deleted.

26 changes: 25 additions & 1 deletion lib/outboxer/web.rb
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def human_readable_size(kilobytes)
per_page: denormalised_query_params[:per_page],
time_zone: denormalised_query_params[:time_zone])

redirect to("/messages#{normalised_query_string}")
redirect to(normalised_query_string)
end

get '/messages' do
Expand Down Expand Up @@ -574,5 +574,29 @@ def normalise_query_string(status: Messages::LIST_STATUS_DEFAULT,

redirect to("#{normalised_query_string}")
end

post '/publisher/:id/signals' do
denormalised_query_params = denormalise_query_params(
status: params[:status],
sort: params[:sort],
order: params[:order],
page: params[:page],
per_page: params[:per_page],
time_zone: params[:time_zone])

normalised_query_string = normalise_query_string(
status: denormalised_query_params[:status],
sort: denormalised_query_params[:sort],
order: denormalised_query_params[:order],
page: denormalised_query_params[:page],
per_page: denormalised_query_params[:per_page],
time_zone: denormalised_query_params[:time_zone])

Publisher.signal(id: params[:id], name: params[:name])

flash[:primary] = "Publisher #{params[:id]} signalled #{params[:name]}"

redirect to(normalised_query_string)
end
end
end
56 changes: 46 additions & 10 deletions lib/outboxer/web/views/home.erb
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,52 @@
<td><%= publisher.info['cpu'] %>%</td>
<td><%= human_readable_size(publisher.info['rss']) %></td>
<td><%= (publisher.info['rtt'] * 1000).round(2) %> ms</td>
<td> <!-- Action column -->
<form action="<%= outboxer_path("/publisher/#{publisher[:id]}/delete") %>" method="post">
<% normalised_query_params.each do |key, param| %>
<input type="hidden" name="<%= key %>" value="<%= param %>">
<% end %>
<button type="submit" class="btn btn-sm btn-outline-danger d-flex align-items-center justify-content-center">
<i class="bi bi-trash me-1"></i>
<span>Delete</span>
</button>
</form>
<td>
<div class="d-flex gap-1">
<!-- Pause Button -->
<form action="<%= outboxer_path("/publisher/#{publisher[:id]}/signals") %>" method="post" style="display: inline;">
<input type="hidden" name="name" value="TSTP">
<input type="hidden" name="time_zone" value="<%= denormalised_query_params[:time_zone] %>">
<button type="submit" class="btn btn-sm btn-outline-warning" title="Pause">
<i class="bi bi-pause-circle"></i>
</button>
</form>

<!-- Resume Button -->
<form action="<%= outboxer_path("/publisher/#{publisher[:id]}/signals") %>" method="post" style="display: inline;">
<input type="hidden" name="name" value="CONT">
<input type="hidden" name="time_zone" value="<%= denormalised_query_params[:time_zone] %>">
<button type="submit" class="btn btn-sm btn-outline-success" title="Resume">
<i class="bi bi-play-circle"></i>
</button>
</form>

<!-- Dump Threads Button -->
<form action="<%= outboxer_path("/publisher/#{publisher[:id]}/signals") %>" method="post" style="display: inline;">
<input type="hidden" name="name" value="TTIN">
<input type="hidden" name="time_zone" value="<%= denormalised_query_params[:time_zone] %>">
<button type="submit" class="btn btn-sm btn-outline-secondary" title="Dump Threads">
<i class="bi bi-bug"></i>
</button>
</form>

<!-- Kill Button -->
<form action="<%= outboxer_path("/publisher/#{publisher[:id]}/signals") %>" method="post" style="display: inline;">
<input type="hidden" name="name" value="TERM">
<input type="hidden" name="time_zone" value="<%= denormalised_query_params[:time_zone] %>">
<button type="submit" class="btn btn-sm btn-outline-danger" title="Kill">
<i class="bi bi-x-circle"></i>
</button>
</form>

<!-- Delete Button -->
<form action="<%= outboxer_path("/publisher/#{publisher[:id]}/delete") %>" method="post" style="display: inline;">
<input type="hidden" name="time_zone" value="<%= denormalised_query_params[:time_zone] %>">
<button type="submit" class="btn btn-sm btn-outline-danger" title="Delete">
<i class="bi bi-trash"></i>
</button>
</form>
</div>
</td>
</tr>
<% end %>
Expand Down
3 changes: 3 additions & 0 deletions tasks/database.rake
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ namespace :outboxer do
require_relative "../db/migrate/create_outboxer_publishers"
CreateOutboxerPublishers.new.up

require_relative "../db/migrate/create_outboxer_signals"
CreateOutboxerSignals.new.up

ActiveRecord::Base.connection.disconnect!
end

Expand Down

0 comments on commit 9732e97

Please sign in to comment.