Skip to content

Commit

Permalink
rename dequeued status to buffered (#174)
Browse files Browse the repository at this point in the history
  • Loading branch information
bedrock-adam authored Nov 11, 2024
1 parent 1f7481e commit 0355276
Show file tree
Hide file tree
Showing 20 changed files with 76 additions and 76 deletions.
4 changes: 2 additions & 2 deletions lib/outboxer/message.rb
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def publishing(id:, publisher_id: nil, publisher_name: nil,
ActiveRecord::Base.transaction do
message = Models::Message.lock.find_by!(id: id)

if message.status != Models::Message::Status::DEQUEUED
if message.status != Models::Message::Status::BUFFERED
raise ArgumentError,
"cannot transition outboxer message #{message.id} " \
"from #{message.status} to #{Models::Message::Status::PUBLISHING}"
Expand Down Expand Up @@ -159,7 +159,7 @@ def delete(id:)
end
end

REQUEUE_STATUSES = [:dequeued, :publishing, :failed]
REQUEUE_STATUSES = [:buffered, :publishing, :failed]

def can_requeue?(status:)
REQUEUE_STATUSES.include?(status&.to_sym)
Expand Down
10 changes: 5 additions & 5 deletions lib/outboxer/messages.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ module Outboxer
module Messages
extend self

def dequeue(limit: 1, publisher_id: nil, publisher_name: nil,
current_utc_time: Time.now.utc)
def buffer(limit: 1, publisher_id: nil, publisher_name: nil,
current_utc_time: Time.now.utc)
ActiveRecord::Base.connection_pool.with_connection do
ActiveRecord::Base.transaction do
messages = Models::Message
Expand All @@ -17,7 +17,7 @@ def dequeue(limit: 1, publisher_id: nil, publisher_name: nil,
Models::Message
.where(id: messages.map { |message| message[:id] })
.update_all(
status: Models::Message::Status::DEQUEUED,
status: Models::Message::Status::BUFFERED,
updated_at: current_utc_time,
updated_by_publisher_id: publisher_id,
updated_by_publisher_name: publisher_name)
Expand All @@ -35,7 +35,7 @@ def dequeue(limit: 1, publisher_id: nil, publisher_name: nil,
end
end

LIST_STATUS_OPTIONS = [nil, :queued, :dequeued, :publishing, :published, :failed]
LIST_STATUS_OPTIONS = [nil, :queued, :buffered, :publishing, :published, :failed]
LIST_STATUS_DEFAULT = nil

LIST_SORT_OPTIONS = [:id, :status, :messageable, :created_at, :updated_at, :updated_by_publisher_name]
Expand Down Expand Up @@ -115,7 +115,7 @@ def list(status: LIST_STATUS_DEFAULT,
}
end

REQUEUE_STATUSES = [:dequeued, :publishing, :failed]
REQUEUE_STATUSES = [:buffered, :publishing, :failed]

def can_requeue?(status:)
REQUEUE_STATUSES.include?(status&.to_sym)
Expand Down
6 changes: 3 additions & 3 deletions lib/outboxer/models/message.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,22 @@ class Message < ::ActiveRecord::Base

module Status
QUEUED = 'queued'
DEQUEUED = 'dequeued'
BUFFERED = 'buffered'
PUBLISHING = 'publishing'
PUBLISHED = 'published'
FAILED = 'failed'
end

STATUSES = [
Status::QUEUED,
Status::DEQUEUED,
Status::BUFFERED,
Status::PUBLISHING,
Status::PUBLISHED,
Status::FAILED
]

scope :queued, -> { where(status: Status::QUEUED) }
scope :dequeued, -> { where(status: Status::DEQUEUED) }
scope :buffered, -> { where(status: Status::BUFFERED) }
scope :publishing, -> { where(status: Status::PUBLISHING) }
scope :published, -> { where(status: Status::PUBLISHED) }
scope :failed, -> { where(status: Status::FAILED) }
Expand Down
34 changes: 17 additions & 17 deletions lib/outboxer/publisher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -191,24 +191,24 @@ def create_publisher_threads(id:, name:,
break if message.nil?

publish_message(
id: id, name: name, dequeued_message: message,
id: id, name: name, buffered_message: message,
logger: logger, process: process, kernel: kernel, &block)
end
end
end
end

def dequeue_messages(id:, name:,
queue:, buffer:, poll:, tick:,
signal_read:, logger:, process:, kernel:)
dequeue_limit = buffer - queue.size
def buffer_messages(id:, name:,
queue:, buffer:, poll:, tick:,
signal_read:, logger:, process:, kernel:)
buffer_limit = buffer - queue.size

if dequeue_limit > 0
dequeued_messages = Messages.dequeue(
limit: dequeue_limit, publisher_id: id, publisher_name: name)
if buffer_limit > 0
buffered_messages = Messages.buffer(
limit: buffer_limit, publisher_id: id, publisher_name: name)

if dequeued_messages.count > 0
dequeued_messages.each { |message| queue.push(message) }
if buffered_messages.count > 0
buffered_messages.each { |message| queue.push(message) }
else
Publisher.sleep(
poll,
Expand Down Expand Up @@ -410,7 +410,7 @@ def publish(
loop do
case @status
when Status::PUBLISHING
dequeue_messages(
buffer_messages(
id: id, name: name,
queue: queue, buffer: buffer,
poll: poll, tick:,
Expand Down Expand Up @@ -445,30 +445,30 @@ def publish(
database.disconnect(logger: logger)
end

def publish_message(id:, name:, dequeued_message:, logger:, kernel:, process:, &block)
dequeued_at = process.clock_gettime(Process::CLOCK_MONOTONIC)
def publish_message(id:, name:, buffered_message:, logger:, kernel:, process:, &block)
buffered_at = process.clock_gettime(Process::CLOCK_MONOTONIC)

message = Message.publishing(
id: dequeued_message[:id], publisher_id: id, publisher_name: name)
id: buffered_message[:id], publisher_id: id, publisher_name: name)
logger.debug "Outboxer publishing message #{message[:id]} for "\
"#{message[:messageable_type]}::#{message[:messageable_id]} "\
"in #{(process.clock_gettime(Process::CLOCK_MONOTONIC) - dequeued_at).round(3)}s"
"in #{(process.clock_gettime(Process::CLOCK_MONOTONIC) - buffered_at).round(3)}s"

begin
block.call(message)
rescue Exception => e
Message.failed(id: message[:id], exception: e, publisher_id: id, publisher_name: name)
logger.debug "Outboxer failed to publish message #{message[:id]} for "\
"#{message[:messageable_type]}::#{message[:messageable_id]} "\
"in #{(process.clock_gettime(Process::CLOCK_MONOTONIC) - dequeued_at).round(3)}s"
"in #{(process.clock_gettime(Process::CLOCK_MONOTONIC) - buffered_at).round(3)}s"

raise
end

Message.published(id: message[:id], publisher_id: id, publisher_name: name)
logger.debug "Outboxer published message #{message[:id]} for "\
"#{message[:messageable_type]}::#{message[:messageable_id]} "\
"in #{(process.clock_gettime(Process::CLOCK_MONOTONIC) - dequeued_at).round(3)}s"
"in #{(process.clock_gettime(Process::CLOCK_MONOTONIC) - buffered_at).round(3)}s"
rescue StandardError => e
logger.error(
"#{e.class}: #{e.message}\n"\
Expand Down
2 changes: 1 addition & 1 deletion lib/outboxer/web/views/error.erb
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
<a class="nav-link" href="<%= outboxer_path('/messages?status=publishing') %>">Publishing</a>
</li>
<li class="nav-item">
<a class="nav-link" href="<%= outboxer_path('/messages?status=dequeued') %>">Dequeued</a>
<a class="nav-link" href="<%= outboxer_path('/messages?status=buffered') %>">Buffered</a>
</li>
<li class="nav-item">
<a class="nav-link" href="<%= outboxer_path('/messages?status=queued') %>">Queued</a>
Expand Down
2 changes: 1 addition & 1 deletion lib/outboxer/web/views/home.erb
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@
</tr>
</thead>
<tbody>
<% ['queued', 'dequeued', 'publishing', 'published', 'failed'].each do |status| %>
<% ['queued', 'buffered', 'publishing', 'published', 'failed'].each do |status| %>
<tr>
<td class="text-capitalize">
<a class="custom-link" href="<%= outboxer_path("/messages?status=#{status}") %>">
Expand Down
2 changes: 1 addition & 1 deletion lib/outboxer/web/views/layout.erb
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
<ul class="navbar-nav">
<% statuses = [
{ name: 'Queued', key: 'queued' },
{ name: 'Dequeued', key: 'dequeued' },
{ name: 'Buffered', key: 'buffered' },
{ name: 'Publishing', key: 'publishing' },
{ name: 'Published', key: 'published' },
{ name: 'Failed', key: 'failed' }
Expand Down
4 changes: 2 additions & 2 deletions spec/factories/outboxer_messages.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
status { Outboxer::Models::Message::Status::QUEUED }
end

trait :dequeued do
status { Outboxer::Models::Message::Status::DEQUEUED }
trait :buffered do
status { Outboxer::Models::Message::Status::BUFFERED }
end

trait :publishing do
Expand Down
6 changes: 3 additions & 3 deletions spec/lib/outboxer/message/can_requeue_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ module Outboxer
RSpec.describe Message do
describe '.can_requeue?' do
let!(:message_1) { create(:outboxer_message, :queued) }
let!(:message_2) { create(:outboxer_message, :dequeued) }
let!(:message_2) { create(:outboxer_message, :buffered) }
let!(:message_3) { create(:outboxer_message, :failed) }
let!(:message_4) { create(:outboxer_message, :failed) }
let!(:message_5) { create(:outboxer_message, :publishing) }
Expand All @@ -16,9 +16,9 @@ module Outboxer
end
end

context 'when status is dequeued' do
context 'when status is buffered' do
it 'returns true' do
expect(Message.can_requeue?(status: Message::Status::DEQUEUED)).to eq true
expect(Message.can_requeue?(status: Message::Status::BUFFERED)).to eq true
end
end

Expand Down
6 changes: 3 additions & 3 deletions spec/lib/outboxer/message/publishing_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
module Outboxer
RSpec.describe Message do
describe '.publishing' do
context 'when dequeued message' do
let!(:dequeued_message) { create(:outboxer_message, :dequeued) }
let!(:publishing_message) { Message.publishing(id: dequeued_message.id) }
context 'when buffered message' do
let!(:buffered_message) { create(:outboxer_message, :buffered) }
let!(:publishing_message) { Message.publishing(id: buffered_message.id) }

it 'returns publishing message' do
expect(publishing_message[:id]).to eq(Models::Message.publishing.last.id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

module Outboxer
RSpec.describe Messages do
describe '.dequeue' do
describe '.buffer' do
context 'when there are 2 queued messages' do
let!(:queued_messages) do
[
Expand All @@ -12,13 +12,13 @@ module Outboxer
end

context 'when limit is 1' do
let!(:dequeued_messages) { Messages.dequeue(limit: 1) }
let!(:buffered_messages) { Messages.buffer(limit: 1) }

it 'returns first dequeued message' do
expect(dequeued_messages.count).to eq(1)
it 'returns first buffered message' do
expect(buffered_messages.count).to eq(1)

dequeued_message = dequeued_messages.first
expect(dequeued_message[:id]).to eq(queued_messages[0].id)
buffered_message = buffered_messages.first
expect(buffered_message[:id]).to eq(queued_messages[0].id)
end

it 'keeps last queued message' do
Expand Down
6 changes: 3 additions & 3 deletions spec/lib/outboxer/messages/can_requeue_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ module Outboxer
RSpec.describe Messages do
describe '.can_requeue?' do
let!(:message_1) { create(:outboxer_message, :queued) }
let!(:message_2) { create(:outboxer_message, :dequeued) }
let!(:message_2) { create(:outboxer_message, :buffered) }
let!(:message_3) { create(:outboxer_message, :failed) }
let!(:message_4) { create(:outboxer_message, :failed) }
let!(:message_5) { create(:outboxer_message, :publishing) }
Expand All @@ -16,9 +16,9 @@ module Outboxer
end
end

context 'when status is dequeued' do
context 'when status is buffered' do
it 'returns true' do
expect(Messages.can_requeue?(status: Message::Status::DEQUEUED)).to eq true
expect(Messages.can_requeue?(status: Message::Status::BUFFERED)).to eq true
end
end

Expand Down
6 changes: 3 additions & 3 deletions spec/lib/outboxer/messages/delete_all_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ module Outboxer
end

let!(:message_1) { create(:outboxer_message, :queued) }
let!(:message_2) { create(:outboxer_message, :dequeued) }
let!(:message_2) { create(:outboxer_message, :buffered) }

let!(:message_3) { create(:outboxer_message, :failed) }
let!(:exception_1) { create(:outboxer_exception, message: message_3) }
Expand Down Expand Up @@ -54,9 +54,9 @@ module Outboxer
end
end

context 'when status is dequeued' do
context 'when status is buffered' do
before do
Messages.delete_all(status: Message::Status::DEQUEUED, batch_size: 1)
Messages.delete_all(status: Message::Status::BUFFERED, batch_size: 1)
end

it 'deletes queued messages' do
Expand Down
2 changes: 1 addition & 1 deletion spec/lib/outboxer/messages/delete_by_ids_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ module Outboxer
let!(:exception_1) { create(:outboxer_exception, message: message_1) }
let!(:frame_1) { create(:outboxer_frame, exception: exception_1) }

let!(:message_2) { create(:outboxer_message, :dequeued) }
let!(:message_2) { create(:outboxer_message, :buffered) }
let!(:exception_2) { create(:outboxer_exception, message: message_2) }
let!(:frame_2) { create(:outboxer_frame, exception: exception_2) }

Expand Down
8 changes: 4 additions & 4 deletions spec/lib/outboxer/messages/list/filter_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ module Outboxer
end

let!(:message_3) do
create(:outboxer_message, :dequeued,
create(:outboxer_message, :buffered,
messageable_type: 'Event', messageable_id: '3',
updated_by_publisher_id: 43000, updated_by_publisher_name: 'server-03:43000')
end
Expand Down Expand Up @@ -72,9 +72,9 @@ module Outboxer
end
end

context 'with dequeued status' do
it 'returns dequeued messages' do
expect(Messages.list(status: :dequeued)).to eq({
context 'with buffered status' do
it 'returns buffered messages' do
expect(Messages.list(status: :buffered)).to eq({
current_page: 1, limit_value: 100, total_count: 1, total_pages: 1,
messages: [
{
Expand Down
2 changes: 1 addition & 1 deletion spec/lib/outboxer/messages/list/no_arguments_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ module Outboxer
end

let!(:message_3) do
create(:outboxer_message, :dequeued,
create(:outboxer_message, :buffered,
messageable_type: 'Event', messageable_id: '3',
updated_by_publisher_id: 43000, updated_by_publisher_name: 'server-03:43000')
end
Expand Down
6 changes: 3 additions & 3 deletions spec/lib/outboxer/messages/list/sort_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ module Outboxer
end

let!(:message_3) do
create(:outboxer_message, :dequeued, messageable_type: 'Event', messageable_id: '3')
create(:outboxer_message, :buffered, messageable_type: 'Event', messageable_id: '3')
end

let!(:message_4) do
Expand Down Expand Up @@ -46,15 +46,15 @@ module Outboxer
Messages
.list(sort: :status, order: :asc)[:messages]
.map { |message| message[:status] }
).to eq([:dequeued, :failed, :publishing, :queued, :queued])
).to eq([:buffered, :failed, :publishing, :queued, :queued])
end

it 'sorts messages by status in descending order' do
expect(
Messages
.list(sort: :status, order: :desc)[:messages]
.map { |message| message[:status] }
).to eq([:queued, :queued, :publishing, :failed, :dequeued])
).to eq([:queued, :queued, :publishing, :failed, :buffered])
end
end

Expand Down
Loading

0 comments on commit 0355276

Please sign in to comment.