diff --git a/bin/outboxer_publisher b/bin/outboxer_publisher index aa561500..c9a476e7 100755 --- a/bin/outboxer_publisher +++ b/bin/outboxer_publisher @@ -8,11 +8,11 @@ require_relative '../app/jobs/event_created_job' options = { environment: ENV.fetch('OUTBOXER_ENV', 'development'), - batch_size: ENV.fetch('OUTBOXER_BATCH_SIZE', 100).to_i, + buffer: ENV.fetch('OUTBOXER_BUFFER', 100).to_i, concurrency: ENV.fetch('OUTBOXER_CONCURRENCY', 1).to_i, - poll_interval: ENV.fetch('OUTBOXER_POLL_INTERVAL', 10).to_f, - tick_interval: ENV.fetch('OUTBOXER_TICK_INTERVAL', 0.1).to_f, - heartbeat_interval: ENV.fetch('OUTBOXER_HEARTBEAT_INTERVAL', 5).to_f, + poll: ENV.fetch('OUTBOXER_POLL', 10).to_f, + tick: ENV.fetch('OUTBOXER_TICK', 0.1).to_f, + heartbeat: ENV.fetch('OUTBOXER_HEARTBEAT', 5).to_f, log_level: ENV.fetch('OUTBOXER_LOG_LEVEL', 'info').downcase, sidekiq_redis_url: ENV.fetch('SIDEKIQ_REDIS_URL', 'redis://localhost:6379/0') } @@ -31,11 +31,11 @@ Outboxer::Database.connect(config: db_config, logger: logger) begin Outboxer::Publisher.publish( - batch_size: options[:batch_size], + buffer: options[:buffer], concurrency: options[:concurrency], - poll_interval: options[:poll_interval], - tick_interval: options[:tick_interval], - heartbeat_interval: options[:heartbeat_interval], + poll: options[:poll], + tick: options[:tick], + heartbeat: options[:heartbeat], logger: logger ) do |message| case message[:messageable_type] diff --git a/lib/outboxer/publisher.rb b/lib/outboxer/publisher.rb index 3fa26529..4417368f 100644 --- a/lib/outboxer/publisher.rb +++ b/lib/outboxer/publisher.rb @@ -116,12 +116,12 @@ def signal(id:, name:, current_utc_time: Time.now.utc) end # :nocov: - def sleep(duration, start_time:, tick_interval:, signal_read:, process:, kernel:) + def sleep(duration, start_time:, tick:, signal_read:, process:, kernel:) while ( (@status != Status::TERMINATING) && ((process.clock_gettime(process::CLOCK_MONOTONIC) - start_time) < duration) && !IO.select([signal_read], nil, nil, 0)) - kernel.sleep(tick_interval) + kernel.sleep(tick) end end # :nocov: @@ -161,10 +161,9 @@ def create_publisher_threads(id:, name:, end def dequeue_messages(id:, name:, - queue:, batch_size:, - poll_interval:, tick_interval:, + queue:, buffer:, poll:, tick:, signal_read:, logger:, process:, kernel:) - dequeue_limit = batch_size - queue.size + dequeue_limit = buffer - queue.size if dequeue_limit > 0 dequeued_messages = Messages.dequeue( @@ -174,17 +173,17 @@ def dequeue_messages(id:, name:, dequeued_messages.each { |message| queue.push(message) } else Publisher.sleep( - poll_interval, + poll, start_time: process.clock_gettime(process::CLOCK_MONOTONIC), - tick_interval: tick_interval, + tick: tick, signal_read: signal_read, process: process, kernel: kernel) end else Publisher.sleep( - tick_interval, + tick, start_time: process.clock_gettime(process::CLOCK_MONOTONIC), - tick_interval: tick_interval, + tick: tick, signal_read: signal_read, process: process, kernel: kernel) end @@ -199,7 +198,7 @@ def dequeue_messages(id:, name:, end def create_heartbeat_thread(id:, name:, - heartbeat_interval:, tick_interval:, signal_read:, + heartbeat:, tick:, signal_read:, logger:, time:, socket:, process:, kernel:) Thread.new do Thread.current.name = "outboxer.heatbeat" @@ -253,10 +252,10 @@ def create_heartbeat_thread(id:, name:, end Publisher.sleep( - heartbeat_interval, + heartbeat, signal_read: signal_read, start_time: process.clock_gettime(process::CLOCK_MONOTONIC), - tick_interval: tick_interval, + tick: tick, process: process, kernel: kernel) rescue NotFound => e @@ -271,10 +270,10 @@ def create_heartbeat_thread(id:, name:, logger.error(e.backtrace.join("\n")) Publisher.sleep( - heartbeat_interval, + heartbeat, signal_read: signal_read, start_time: process.clock_gettime(process::CLOCK_MONOTONIC), - tick_interval: tick_interval, + tick: tick, process: process, kernel: kernel) rescue Exception => e logger.fatal("Thread TID-#{(Thread.object_id ^ process.pid).to_s(36)} #{Thread.current.name}") @@ -324,8 +323,8 @@ def handle_signal(id:, name:, logger:, process:) def publish( name: "#{::Socket.gethostname}:#{::Process.pid}", - batch_size: 100, concurrency: 1, - poll_interval: 5, tick_interval: 0.1, heartbeat_interval: 5, + buffer: 100, concurrency: 1, + poll: 5, tick: 0.1, heartbeat: 5, logger: Logger.new($stdout, level: Logger::INFO), time: ::Time, socket: ::Socket, process: ::Process, kernel: ::Kernel, &block @@ -341,8 +340,8 @@ def publish( id = publisher[:id] logger.info "Outboxer config {"\ - " batch_size: #{batch_size}, concurrency: #{concurrency},"\ - " poll_interval: #{poll_interval}, tick_interval: #{tick_interval} }"\ + " buffer: #{buffer}, concurrency: #{concurrency},"\ + " poll: #{poll}, tick: #{tick} }"\ publisher_threads = create_publisher_threads( id: id, name: name, queue: queue, concurrency: concurrency, @@ -353,8 +352,8 @@ def publish( heartbeat_thread = create_heartbeat_thread( id: id, name: name, - heartbeat_interval: heartbeat_interval, - tick_interval: tick_interval, + heartbeat: heartbeat, + tick: tick, signal_read: signal_read, logger: logger, time: time, socket: socket, process: process, kernel: kernel) @@ -364,14 +363,14 @@ def publish( when Status::PUBLISHING dequeue_messages( id: id, name: name, - queue: queue, batch_size: batch_size, - poll_interval: poll_interval, tick_interval:, + queue: queue, buffer: buffer, + poll: poll, tick:, signal_read: signal_read, logger: logger, process: process, kernel: kernel) when Status::STOPPED Publisher.sleep( - tick_interval, + tick, start_time: process.clock_gettime(process::CLOCK_MONOTONIC), - tick_interval: tick_interval, + tick: tick, signal_read: signal_read, process: process, kernel: kernel) when Status::TERMINATING break diff --git a/spec/lib/outboxer/publisher/publish_spec.rb b/spec/lib/outboxer/publisher/publish_spec.rb index c8274148..7d249ae2 100644 --- a/spec/lib/outboxer/publisher/publish_spec.rb +++ b/spec/lib/outboxer/publisher/publish_spec.rb @@ -3,9 +3,9 @@ module Outboxer RSpec.describe Publisher do describe '.publish' do - let(:batch_size) { 1 } - let(:poll_interval) { 1 } - let(:tick_interval) { 0.1 } + let(:buffer) { 1 } + let(:poll) { 1 } + let(:tick) { 0.1 } let(:logger) { instance_double(Logger, debug: true, error: true, fatal: true, info: true) } let(:kernel) { class_double(Kernel, sleep: nil) } @@ -19,9 +19,9 @@ module Outboxer it 'dumps stack trace' do publish_thread = Thread.new do Outboxer::Publisher.publish( - batch_size: batch_size, - poll_interval: poll_interval, - tick_interval: tick_interval, + buffer: buffer, + poll: poll, + tick: tick, logger: logger, kernel: kernel ) do |_message| ::Process.kill('TTIN', ::Process.pid) @@ -42,9 +42,9 @@ module Outboxer it 'stops and resumes the publishing process correctly' do publish_thread = Thread.new do Outboxer::Publisher.publish( - batch_size: batch_size, - poll_interval: poll_interval, - tick_interval: tick_interval, + buffer: buffer, + poll: poll, + tick: tick, logger: logger, kernel: kernel ) do |_message| ::Process.kill('TSTP', ::Process.pid) @@ -64,9 +64,9 @@ module Outboxer context 'when message published successfully' do it 'sets the message to published' do Publisher.publish( - batch_size: batch_size, - poll_interval: poll_interval, - tick_interval: tick_interval, + buffer: buffer, + poll: poll, + tick: tick, logger: logger, kernel: kernel ) do |message| @@ -88,9 +88,9 @@ module Outboxer before do Publisher.publish( - batch_size: batch_size, - poll_interval: poll_interval, - tick_interval: tick_interval, + buffer: buffer, + poll: poll, + tick: tick, logger: logger, kernel: kernel ) do |message| @@ -132,9 +132,9 @@ module Outboxer before do Publisher.publish( - batch_size: batch_size, - poll_interval: poll_interval, - tick_interval: tick_interval, + buffer: buffer, + poll: poll, + tick: tick, logger: logger, kernel: kernel ) do |dequeued_message| @@ -188,9 +188,9 @@ module Outboxer expect(logger).to receive(:error).with(include('StandardError: queue error')).once Publisher.publish( - batch_size: batch_size, - poll_interval: poll_interval, - tick_interval: tick_interval, + buffer: buffer, + poll: poll, + tick: tick, logger: logger, kernel: kernel) end @@ -206,9 +206,9 @@ module Outboxer .once Publisher.publish( - batch_size: batch_size, - poll_interval: poll_interval, - tick_interval: tick_interval, + buffer: buffer, + poll: poll, + tick: tick, logger: logger, kernel: kernel) end