Skip to content

Commit

Permalink
rename kwargs (#154)
Browse files Browse the repository at this point in the history
  • Loading branch information
bedrock-adam authored Nov 9, 2024
1 parent 9732e97 commit dd90fd8
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 56 deletions.
16 changes: 8 additions & 8 deletions bin/outboxer_publisher
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ require_relative '../app/jobs/event_created_job'

options = {
environment: ENV.fetch('OUTBOXER_ENV', 'development'),
batch_size: ENV.fetch('OUTBOXER_BATCH_SIZE', 100).to_i,
buffer: ENV.fetch('OUTBOXER_BUFFER', 100).to_i,
concurrency: ENV.fetch('OUTBOXER_CONCURRENCY', 1).to_i,
poll_interval: ENV.fetch('OUTBOXER_POLL_INTERVAL', 10).to_f,
tick_interval: ENV.fetch('OUTBOXER_TICK_INTERVAL', 0.1).to_f,
heartbeat_interval: ENV.fetch('OUTBOXER_HEARTBEAT_INTERVAL', 5).to_f,
poll: ENV.fetch('OUTBOXER_POLL', 10).to_f,
tick: ENV.fetch('OUTBOXER_TICK', 0.1).to_f,
heartbeat: ENV.fetch('OUTBOXER_HEARTBEAT', 5).to_f,
log_level: ENV.fetch('OUTBOXER_LOG_LEVEL', 'info').downcase,
sidekiq_redis_url: ENV.fetch('SIDEKIQ_REDIS_URL', 'redis://localhost:6379/0')
}
Expand All @@ -31,11 +31,11 @@ Outboxer::Database.connect(config: db_config, logger: logger)

begin
Outboxer::Publisher.publish(
batch_size: options[:batch_size],
buffer: options[:buffer],
concurrency: options[:concurrency],
poll_interval: options[:poll_interval],
tick_interval: options[:tick_interval],
heartbeat_interval: options[:heartbeat_interval],
poll: options[:poll],
tick: options[:tick],
heartbeat: options[:heartbeat],
logger: logger
) do |message|
case message[:messageable_type]
Expand Down
47 changes: 23 additions & 24 deletions lib/outboxer/publisher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,12 @@ def signal(id:, name:, current_utc_time: Time.now.utc)
end

# :nocov:
def sleep(duration, start_time:, tick_interval:, signal_read:, process:, kernel:)
def sleep(duration, start_time:, tick:, signal_read:, process:, kernel:)
while (
(@status != Status::TERMINATING) &&
((process.clock_gettime(process::CLOCK_MONOTONIC) - start_time) < duration) &&
!IO.select([signal_read], nil, nil, 0))
kernel.sleep(tick_interval)
kernel.sleep(tick)
end
end
# :nocov:
Expand Down Expand Up @@ -161,10 +161,9 @@ def create_publisher_threads(id:, name:,
end

def dequeue_messages(id:, name:,
queue:, batch_size:,
poll_interval:, tick_interval:,
queue:, buffer:, poll:, tick:,
signal_read:, logger:, process:, kernel:)
dequeue_limit = batch_size - queue.size
dequeue_limit = buffer - queue.size

if dequeue_limit > 0
dequeued_messages = Messages.dequeue(
Expand All @@ -174,17 +173,17 @@ def dequeue_messages(id:, name:,
dequeued_messages.each { |message| queue.push(message) }
else
Publisher.sleep(
poll_interval,
poll,
start_time: process.clock_gettime(process::CLOCK_MONOTONIC),
tick_interval: tick_interval,
tick: tick,
signal_read: signal_read,
process: process, kernel: kernel)
end
else
Publisher.sleep(
tick_interval,
tick,
start_time: process.clock_gettime(process::CLOCK_MONOTONIC),
tick_interval: tick_interval,
tick: tick,
signal_read: signal_read,
process: process, kernel: kernel)
end
Expand All @@ -199,7 +198,7 @@ def dequeue_messages(id:, name:,
end

def create_heartbeat_thread(id:, name:,
heartbeat_interval:, tick_interval:, signal_read:,
heartbeat:, tick:, signal_read:,
logger:, time:, socket:, process:, kernel:)
Thread.new do
Thread.current.name = "outboxer.heatbeat"
Expand Down Expand Up @@ -253,10 +252,10 @@ def create_heartbeat_thread(id:, name:,
end

Publisher.sleep(
heartbeat_interval,
heartbeat,
signal_read: signal_read,
start_time: process.clock_gettime(process::CLOCK_MONOTONIC),
tick_interval: tick_interval,
tick: tick,
process: process, kernel: kernel)

rescue NotFound => e
Expand All @@ -271,10 +270,10 @@ def create_heartbeat_thread(id:, name:,
logger.error(e.backtrace.join("\n"))

Publisher.sleep(
heartbeat_interval,
heartbeat,
signal_read: signal_read,
start_time: process.clock_gettime(process::CLOCK_MONOTONIC),
tick_interval: tick_interval,
tick: tick,
process: process, kernel: kernel)
rescue Exception => e
logger.fatal("Thread TID-#{(Thread.object_id ^ process.pid).to_s(36)} #{Thread.current.name}")
Expand Down Expand Up @@ -324,8 +323,8 @@ def handle_signal(id:, name:, logger:, process:)

def publish(
name: "#{::Socket.gethostname}:#{::Process.pid}",
batch_size: 100, concurrency: 1,
poll_interval: 5, tick_interval: 0.1, heartbeat_interval: 5,
buffer: 100, concurrency: 1,
poll: 5, tick: 0.1, heartbeat: 5,
logger: Logger.new($stdout, level: Logger::INFO),
time: ::Time, socket: ::Socket, process: ::Process, kernel: ::Kernel,
&block
Expand All @@ -341,8 +340,8 @@ def publish(
id = publisher[:id]

logger.info "Outboxer config {"\
" batch_size: #{batch_size}, concurrency: #{concurrency},"\
" poll_interval: #{poll_interval}, tick_interval: #{tick_interval} }"\
" buffer: #{buffer}, concurrency: #{concurrency},"\
" poll: #{poll}, tick: #{tick} }"\

publisher_threads = create_publisher_threads(
id: id, name: name, queue: queue, concurrency: concurrency,
Expand All @@ -353,8 +352,8 @@ def publish(

heartbeat_thread = create_heartbeat_thread(
id: id, name: name,
heartbeat_interval: heartbeat_interval,
tick_interval: tick_interval,
heartbeat: heartbeat,
tick: tick,
signal_read: signal_read,
logger: logger,
time: time, socket: socket, process: process, kernel: kernel)
Expand All @@ -364,14 +363,14 @@ def publish(
when Status::PUBLISHING
dequeue_messages(
id: id, name: name,
queue: queue, batch_size: batch_size,
poll_interval: poll_interval, tick_interval:,
queue: queue, buffer: buffer,
poll: poll, tick:,
signal_read: signal_read, logger: logger, process: process, kernel: kernel)
when Status::STOPPED
Publisher.sleep(
tick_interval,
tick,
start_time: process.clock_gettime(process::CLOCK_MONOTONIC),
tick_interval: tick_interval,
tick: tick,
signal_read: signal_read, process: process, kernel: kernel)
when Status::TERMINATING
break
Expand Down
48 changes: 24 additions & 24 deletions spec/lib/outboxer/publisher/publish_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
module Outboxer
RSpec.describe Publisher do
describe '.publish' do
let(:batch_size) { 1 }
let(:poll_interval) { 1 }
let(:tick_interval) { 0.1 }
let(:buffer) { 1 }
let(:poll) { 1 }
let(:tick) { 0.1 }
let(:logger) { instance_double(Logger, debug: true, error: true, fatal: true, info: true) }
let(:kernel) { class_double(Kernel, sleep: nil) }

Expand All @@ -19,9 +19,9 @@ module Outboxer
it 'dumps stack trace' do
publish_thread = Thread.new do
Outboxer::Publisher.publish(
batch_size: batch_size,
poll_interval: poll_interval,
tick_interval: tick_interval,
buffer: buffer,
poll: poll,
tick: tick,
logger: logger, kernel: kernel
) do |_message|
::Process.kill('TTIN', ::Process.pid)
Expand All @@ -42,9 +42,9 @@ module Outboxer
it 'stops and resumes the publishing process correctly' do
publish_thread = Thread.new do
Outboxer::Publisher.publish(
batch_size: batch_size,
poll_interval: poll_interval,
tick_interval: tick_interval,
buffer: buffer,
poll: poll,
tick: tick,
logger: logger, kernel: kernel
) do |_message|
::Process.kill('TSTP', ::Process.pid)
Expand All @@ -64,9 +64,9 @@ module Outboxer
context 'when message published successfully' do
it 'sets the message to published' do
Publisher.publish(
batch_size: batch_size,
poll_interval: poll_interval,
tick_interval: tick_interval,
buffer: buffer,
poll: poll,
tick: tick,
logger: logger,
kernel: kernel
) do |message|
Expand All @@ -88,9 +88,9 @@ module Outboxer

before do
Publisher.publish(
batch_size: batch_size,
poll_interval: poll_interval,
tick_interval: tick_interval,
buffer: buffer,
poll: poll,
tick: tick,
logger: logger,
kernel: kernel
) do |message|
Expand Down Expand Up @@ -132,9 +132,9 @@ module Outboxer

before do
Publisher.publish(
batch_size: batch_size,
poll_interval: poll_interval,
tick_interval: tick_interval,
buffer: buffer,
poll: poll,
tick: tick,
logger: logger,
kernel: kernel
) do |dequeued_message|
Expand Down Expand Up @@ -188,9 +188,9 @@ module Outboxer
expect(logger).to receive(:error).with(include('StandardError: queue error')).once

Publisher.publish(
batch_size: batch_size,
poll_interval: poll_interval,
tick_interval: tick_interval,
buffer: buffer,
poll: poll,
tick: tick,
logger: logger,
kernel: kernel)
end
Expand All @@ -206,9 +206,9 @@ module Outboxer
.once

Publisher.publish(
batch_size: batch_size,
poll_interval: poll_interval,
tick_interval: tick_interval,
buffer: buffer,
poll: poll,
tick: tick,
logger: logger,
kernel: kernel)
end
Expand Down

0 comments on commit dd90fd8

Please sign in to comment.