diff --git a/bin/outboxer_publisher b/bin/outboxer_publisher index 044b5de5..aa561500 100755 --- a/bin/outboxer_publisher +++ b/bin/outboxer_publisher @@ -12,6 +12,7 @@ options = { concurrency: ENV.fetch('OUTBOXER_CONCURRENCY', 1).to_i, poll_interval: ENV.fetch('OUTBOXER_POLL_INTERVAL', 10).to_f, tick_interval: ENV.fetch('OUTBOXER_TICK_INTERVAL', 0.1).to_f, + heartbeat_interval: ENV.fetch('OUTBOXER_HEARTBEAT_INTERVAL', 5).to_f, log_level: ENV.fetch('OUTBOXER_LOG_LEVEL', 'info').downcase, sidekiq_redis_url: ENV.fetch('SIDEKIQ_REDIS_URL', 'redis://localhost:6379/0') } @@ -28,19 +29,22 @@ logger.level = Logger.const_get(options[:log_level].upcase) Outboxer::Database.connect(config: db_config, logger: logger) -Outboxer::Publisher.publish( - batch_size: options[:batch_size], - concurrency: options[:concurrency], - poll_interval: options[:poll_interval], - tick_interval: options[:tick_interval], - logger: logger -) do |message| - case message[:messageable_type] - when 'Event' - EventCreatedJob.perform_async({ 'id' => message[:messageable_id] }) +begin + Outboxer::Publisher.publish( + batch_size: options[:batch_size], + concurrency: options[:concurrency], + poll_interval: options[:poll_interval], + tick_interval: options[:tick_interval], + heartbeat_interval: options[:heartbeat_interval], + logger: logger + ) do |message| + case message[:messageable_type] + when 'Event' + EventCreatedJob.perform_async({ 'id' => message[:messageable_id] }) + end end +ensure + Outboxer::Database.disconnect(logger: logger) end -Outboxer::Database.disconnect(logger: logger) - # bin/outboxer_publisher diff --git a/config/brakeman.ignore b/config/brakeman.ignore new file mode 100644 index 00000000..68d345a4 --- /dev/null +++ b/config/brakeman.ignore @@ -0,0 +1,16 @@ +{ + "ignored_warnings": [ + { + "fingerprint": "1c5ef0e65c33f7623a6ee8ad25776d3ff04ad14854fd09191147088740c4afb1", + "warning_type": "Command Injection", + "file": "lib/outboxer/publisher.rb", + "line": 187 + }, + { + "fingerprint": "df8ee41d7c5028a1429c07753f227d3a6d50128f04364d957988e9a60863b375", + "warning_type": "Command Injection", + "file": "lib/outboxer/publisher.rb", + "line": 188 + } + ] +} diff --git a/db/migrate/create_outboxer_publishers.rb b/db/migrate/create_outboxer_publishers.rb index 14ceba1d..434ebe3a 100644 --- a/db/migrate/create_outboxer_publishers.rb +++ b/db/migrate/create_outboxer_publishers.rb @@ -2,8 +2,8 @@ class CreateOutboxerPublishers < ActiveRecord::Migration[6.1] def up ActiveRecord::Base.transaction do create_table :outboxer_publishers do |t| - t.string :key, limit: 279, null: false - # 255 (hostname) + 1 (colon) + 10 (PID) + 1 (colon) + 12 (unique ID) = 279 characters + t.string :name, limit: 263, null: false + # 255 (hostname) + 1 (colon) + 7 (pid) t.string :status, limit: 255, null: false @@ -11,8 +11,6 @@ def up t.timestamps end - - add_index :outboxer_publishers, :key, unique: true end end diff --git a/lib/outboxer/message.rb b/lib/outboxer/message.rb index b151ce25..263f6141 100644 --- a/lib/outboxer/message.rb +++ b/lib/outboxer/message.rb @@ -107,7 +107,7 @@ def published(id:, current_utc_time: Time.now.utc, end def failed(id:, exception:, current_utc_time: Time.now.utc, - hostname: Socket.gethostname, process_id: Process.pid) + hostname: Socket.gethostname, process_id: Process.pid) ActiveRecord::Base.connection_pool.with_connection do ActiveRecord::Base.transaction do message = Models::Message.order(created_at: :asc).lock.find_by!(id: id) diff --git a/lib/outboxer/models/publisher.rb b/lib/outboxer/models/publisher.rb index 7cab8110..395b7e02 100644 --- a/lib/outboxer/models/publisher.rb +++ b/lib/outboxer/models/publisher.rb @@ -3,8 +3,8 @@ module Models class Publisher < ::ActiveRecord::Base self.table_name = :outboxer_publishers - validates :key, presence: true, length: { maximum: 279 } - # 255 (hostname) + 1 (colon) + 10 (PID) + 1 (colon) + 12 (unique ID) = 279 characters + validates :name, presence: true, length: { maximum: 263 } + # 255 (hostname) + 1 (colon) + 7 (pid) module Status PUBLISHING = 'publishing' diff --git a/lib/outboxer/publisher.rb b/lib/outboxer/publisher.rb index 0f72039f..fbea3b93 100644 --- a/lib/outboxer/publisher.rb +++ b/lib/outboxer/publisher.rb @@ -2,18 +2,32 @@ module Outboxer module Publisher extend self - def create(key:, current_time: Time.now) + class Error < StandardError; end + + class NotFound < Error + def initialize(id:) + super("Couldn't find Outboxer::Models::Publisher with 'id'=#{id}") + end + end + + def create(name:, current_time: Time.now) ActiveRecord::Base.connection_pool.with_connection do ActiveRecord::Base.transaction do publisher = Models::Publisher.create!( - key: key, status: Status::PUBLISHING, info: {}, + name: name, status: Status::PUBLISHING, info: { + 'throughput' => 0, + 'latency' => 0, + 'cpu' => 0, + 'rss ' => 0, + 'rtt' => 0 + }, created_at: current_time, updated_at: current_time) @status = Status::PUBLISHING { id: publisher.id, - key: publisher.key, + name: publisher.name, status: publisher.status, created_at: publisher.created_at, updated_at: publisher.updated_at @@ -22,21 +36,30 @@ def create(key:, current_time: Time.now) end end - def delete(key:, current_time: Time.now) + def delete(id:, current_time: Time.now) ActiveRecord::Base.connection_pool.with_connection do ActiveRecord::Base.transaction do - publisher = Models::Publisher.lock.find_by!(key: key) - publisher.destroy! + begin + publisher = Models::Publisher.lock.find_by!(id: id) + publisher.destroy! + rescue ActiveRecord::RecordNotFound + # no op + end end end end Status = Models::Publisher::Status - def stop(key:, current_time: Time.now) + def stop(id:, current_time: Time.now) ActiveRecord::Base.connection_pool.with_connection do ActiveRecord::Base.transaction do - publisher = Models::Publisher.lock.find_by!(key: key) + begin + publisher = Models::Publisher.lock.find(id) + rescue ActiveRecord::RecordNotFound => error + raise NotFound.new(id: id), cause: error + end + publisher.update!(status: Status::STOPPED, updated_at: current_time) @status = Status::STOPPED @@ -44,10 +67,15 @@ def stop(key:, current_time: Time.now) end end - def continue(key:, current_time: Time.now) + def continue(id:, current_time: Time.now) ActiveRecord::Base.connection_pool.with_connection do ActiveRecord::Base.transaction do - publisher = Models::Publisher.lock.find_by!(key: key) + begin + publisher = Models::Publisher.lock.find(id) + rescue ActiveRecord::RecordNotFound => error + raise NotFound.new(id: id), cause: error + end + publisher.update!(status: Status::PUBLISHING, updated_at: current_time) @status = Status::PUBLISHING @@ -55,13 +83,17 @@ def continue(key:, current_time: Time.now) end end - def terminate(key:, current_time: Time.now) + def terminate(id:, current_time: Time.now) ActiveRecord::Base.connection_pool.with_connection do ActiveRecord::Base.transaction do - publisher = Models::Publisher.lock.find_by!(key: key) - publisher.update!(status: Status::TERMINATING, updated_at: current_time) + begin + publisher = Models::Publisher.lock.find(id) + publisher.update!(status: Status::TERMINATING, updated_at: current_time) - @status = Status::TERMINATING + @status = Status::TERMINATING + rescue ActiveRecord::RecordNotFound + @status = Status::TERMINATING + end end end end @@ -80,9 +112,7 @@ def sleep(duration, start_time:, tick_interval:, signal_read:, process:, kernel: def trap_signals(id:) signal_read, signal_write = IO.pipe - signal_names = %w[TTIN TSTP CONT INT TERM] - - signal_names.each do |signal_name| + %w[TTIN TSTP CONT INT TERM].each do |signal_name| old_handler = Signal.trap(signal_name) do if old_handler.respond_to?(:call) old_handler.call @@ -95,14 +125,14 @@ def trap_signals(id:) [signal_read, signal_write] end - def create_publisher_threads(key:, queue:, concurrency:, logger:, time:, kernel:, &block) + def create_publisher_threads(id:, queue:, concurrency:, logger:, time:, kernel:, &block) concurrency.times.map do Thread.new do while (message = queue.pop) break if message.nil? publish_message( - key: key, + id: id, dequeued_message: message, logger: logger, time: time, kernel: kernel, &block) end @@ -110,7 +140,7 @@ def create_publisher_threads(key:, queue:, concurrency:, logger:, time:, kernel: end end - def dequeue_messages(key:, queue:, batch_size:, + def dequeue_messages(id:, queue:, batch_size:, poll_interval:, tick_interval:, signal_read:, logger:, process:, kernel:) dequeue_limit = batch_size - queue.size @@ -143,12 +173,86 @@ def dequeue_messages(key:, queue:, batch_size:, logger.fatal "#{exception.class}: #{exception.message}" exception.backtrace.each { |frame| logger.fatal frame } - terminate(key: key) + terminate(id: id) + end + + def create_heartbeat_thread(id:, name:, + heartbeat_interval:, tick_interval:, signal_read:, + logger:, time:, socket:, process:, kernel:) + Thread.new do + while @status != Status::TERMINATING + begin + current_time = time.now + + cpu = `ps -p #{process.pid} -o %cpu`.split("\n").last.to_f + rss = `ps -p #{process.pid} -o rss`.split("\n").last.to_i + + ActiveRecord::Base.connection_pool.with_connection do + ActiveRecord::Base.transaction do + start_rtt = process.clock_gettime(process::CLOCK_MONOTONIC) + + begin + publisher = Models::Publisher.lock.find(id) + rescue ActiveRecord::RecordNotFound => error + raise NotFound.new(id: id), cause: error + end + + end_rtt = process.clock_gettime(process::CLOCK_MONOTONIC) + rtt = end_rtt - start_rtt + + throughput = messages = Models::Message + .where(updated_by: name) + .where('updated_at >= ?', 1.second.ago) + .count + + last_updated_message = Models::Message + .where(updated_by: name) + .order(updated_at: :desc) + .first + + publisher.update!( + updated_at: current_time, + info: { + throughput: throughput, + latency: last_updated_message.nil? ? 0 : (time.now - last_updated_message.updated_at).to_i, + cpu: cpu, + rss: rss, + rtt: rtt } ) + end + end + + Publisher.sleep( + heartbeat_interval, + signal_read: signal_read, + start_time: process.clock_gettime(process::CLOCK_MONOTONIC), + tick_interval: tick_interval, + process: process, kernel: kernel) + + rescue StandardError => e + logger.error(e.message) + logger.error(e.backtrace.join("\n")) + + Publisher.sleep( + heartbeat_interval, + signal_read: signal_read, + start_time: process.clock_gettime(process::CLOCK_MONOTONIC), + tick_interval: tick_interval, + process: process, kernel: kernel) + + rescue NotFound, Exception => e + logger.fatal(e.message) + logger.fatal(e.backtrace.join("\n")) + + terminate(id: id) + end + end + end end def publish( + name: "#{::Socket.gethostname}:#{::Process.pid}", batch_size: 100, concurrency: 1, - poll_interval: 5, tick_interval: 0.1, + poll_interval: 5, tick_interval: 0.1, heartbeat_interval: 5, logger: Logger.new($stdout, level: Logger::INFO), time: ::Time, socket: ::Socket, process: ::Process, kernel: ::Kernel, &block @@ -158,26 +262,33 @@ def publish( queue = Queue.new - key = "#{::Socket.gethostname}:#{::Process.pid}:#{::SecureRandom.hex(6)}" - - publisher = create(key: key) + publisher = create(name: name) + id = publisher[:id] logger.info "Outboxer config {"\ " batch_size: #{batch_size}, concurrency: #{concurrency},"\ " poll_interval: #{poll_interval}, tick_interval: #{tick_interval} }"\ publisher_threads = create_publisher_threads( - key: key, queue: queue, concurrency: concurrency, + id: id, queue: queue, concurrency: concurrency, logger: logger, time: time, kernel: kernel, &block) signal_read, _signal_write = trap_signals(id: publisher[:id]) + heartbeat_thread = create_heartbeat_thread( + id: id, name: name, + heartbeat_interval: heartbeat_interval, + tick_interval: tick_interval, + signal_read: signal_read, + logger: logger, + time: time, socket: socket, process: process, kernel: kernel) + loop do case @status when Status::PUBLISHING dequeue_messages( - key: key, + id: id, queue: queue, batch_size: batch_size, poll_interval: poll_interval, tick_interval:, signal_read: signal_read, logger: logger, process: process, kernel: kernel) @@ -200,11 +311,25 @@ def publish( logger.info thread.backtrace.join("\n") if thread.backtrace end when 'TSTP' - stop(key: key) + begin + stop(id: id) + rescue NotFound => e + logger.fatal(e.message) + logger.fatal(e.backtrace.join("\n")) + + terminate(id: id) + end when 'CONT' - continue(key: key) + begin + continue(id: id) + rescue NotFound => e + logger.fatal(e.message) + logger.fatal(e.backtrace.join("\n")) + + terminate(id: id) + end when 'INT', 'TERM' - terminate(key: key) + terminate(id: id) end end end @@ -213,13 +338,14 @@ def publish( concurrency.times { queue.push(nil) } publisher_threads.each(&:join) + heartbeat_thread.join - delete(key: key) + delete(id: id) logger.info "Outboxer terminated" end - def publish_message(key:, dequeued_message:, logger:, time:, kernel:, &block) + def publish_message(id:, dequeued_message:, logger:, time:, kernel:, &block) dequeued_at = dequeued_message[:updated_at] message = Message.publishing(id: dequeued_message[:id]) @@ -251,7 +377,7 @@ def publish_message(key:, dequeued_message:, logger:, time:, kernel:, &block) "in #{(time.now.utc - dequeued_at).round(3)}s" exception.backtrace.each { |frame| logger.fatal frame } - terminate(key: key) + terminate(id: id) end end end diff --git a/lib/outboxer/web.rb b/lib/outboxer/web.rb index b25a02e4..c26427af 100755 --- a/lib/outboxer/web.rb +++ b/lib/outboxer/web.rb @@ -6,7 +6,7 @@ require 'uri' require 'rack/flash' -require 'pry-byebug' +require 'action_view' environment = ENV['APP_ENV'] || 'development' config = Outboxer::Database.config(environment: environment, pool: 5) @@ -20,6 +20,8 @@ class Web < Sinatra::Base set :show_exceptions, false helpers do + include ActionView::Helpers::DateHelper + def outboxer_path(path) "#{request.script_name}#{path}" end @@ -36,42 +38,6 @@ def human_readable_size(kilobytes) "#{size.round(2)} #{unit}" end - - def time_in_words(from_time, to_time = Time.now) - seconds_diff = (to_time - from_time).to_i - - case seconds_diff - when 0..59 - "#{seconds_diff} #{'second'.pluralize(seconds_diff)}" - when 60..3599 - minutes = seconds_diff / 60 - "#{minutes} #{'minute'.pluralize(minutes)}" - when 3600..86399 - hours = seconds_diff / 3600 - "#{hours} #{'hour'.pluralize(hours)}" - else - days = seconds_diff / 86400 - "#{days} #{'day'.pluralize(days)} ago" - end - end - - def time_ago_in_words(from_time, to_time = Time.now) - seconds_diff = (to_time - from_time).to_i - - case seconds_diff - when 0..59 - "#{seconds_diff} #{'second'.pluralize(seconds_diff)} ago" - when 60..3599 - minutes = seconds_diff / 60 - "#{minutes} #{'minute'.pluralize(minutes)} ago" - when 3600..86399 - hours = seconds_diff / 3600 - "#{hours} #{'hour'.pluralize(hours)} ago" - else - days = seconds_diff / 86400 - "#{days} #{'day'.pluralize(days)} ago" - end - end end error StandardError do @@ -584,5 +550,29 @@ def normalise_query_string(status: Messages::LIST_STATUS_DEFAULT, redirect to("/messages#{normalised_query_string}") end + + post '/publisher/:id/delete' do + denormalised_query_params = denormalise_query_params( + status: params[:status], + sort: params[:sort], + order: params[:order], + page: params[:page], + per_page: params[:per_page], + time_zone: params[:time_zone]) + + normalised_query_string = normalise_query_string( + status: denormalised_query_params[:status], + sort: denormalised_query_params[:sort], + order: denormalised_query_params[:order], + page: denormalised_query_params[:page], + per_page: denormalised_query_params[:per_page], + time_zone: denormalised_query_params[:time_zone]) + + Publisher.delete(id: params[:id]) + + flash[:primary] = "Publisher #{params[:id]} was deleted" + + redirect to("#{normalised_query_string}") + end end end diff --git a/lib/outboxer/web/views/home.erb b/lib/outboxer/web/views/home.erb index 1365d4f6..2715c68d 100644 --- a/lib/outboxer/web/views/home.erb +++ b/lib/outboxer/web/views/home.erb @@ -13,31 +13,43 @@ <% else %>