Skip to content

Commit

Permalink
update Messages.queue! to set message status to queued (#32)
Browse files Browse the repository at this point in the history
  • Loading branch information
bedrock-adam authored Mar 31, 2024
1 parent 7d4e412 commit bf5af55
Show file tree
Hide file tree
Showing 19 changed files with 73 additions and 66 deletions.
6 changes: 3 additions & 3 deletions bin/sidekiq_publishermon
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ loop do
system("clear") || system("cls")

total_backlogged = nil
total_publishing = nil
total_queued = nil
total_failed = nil

ActiveRecord::Base.connection_pool.with_connection do
total_backlogged = Outboxer::Models::Message.backlogged.count
total_publishing = Outboxer::Models::Message.publishing.count
total_queued = Outboxer::Models::Message.queued.count
total_failed = Outboxer::Models::Message.failed.count
end

Expand All @@ -44,7 +44,7 @@ loop do
puts Time.now.utc
puts "\n---- Overview ----"
puts format("%-15s: %-10s", "Backlogged", total_backlogged)
puts format("%-15s: %-10s", "Publishing", total_publishing)
puts format("%-15s: %-10s", "Queued", total_queued)
puts format("%-15s: %-10s", "Failed", total_failed)

if pids.empty?
Expand Down
2 changes: 1 addition & 1 deletion db/seeds.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
Outboxer::Models::Message.create!(
messageable_type: 'Event',
messageable_id: i,
status: Outboxer::Models::Message::Status::PUBLISHING)
status: Outboxer::Models::Message::Status::QUEUED)
else
failed_message = Outboxer::Models::Message.create!(
messageable_type: 'Event',
Expand Down
4 changes: 2 additions & 2 deletions lib/outboxer/message.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def published!(id:)
ActiveRecord::Base.transaction do
message = Models::Message.lock.find_by!(id: id)

if message.status != Models::Message::Status::PUBLISHING
if message.status != Models::Message::Status::QUEUED
raise InvalidTransition,
"cannot transition message #{message.id} " \
"from #{message.status} to (deleted)"
Expand All @@ -80,7 +80,7 @@ def failed!(id:, exception:)
ActiveRecord::Base.transaction do
message = Models::Message.order(created_at: :asc).lock.find_by!(id: id)

if message.status != Models::Message::Status::PUBLISHING
if message.status != Models::Message::Status::QUEUED
raise InvalidTransition,
"cannot transition outboxer message #{id} " \
"from #{message.status} to #{Models::Message::Status::FAILED}"
Expand Down
4 changes: 2 additions & 2 deletions lib/outboxer/messages.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ def queue!(limit: 1)
.where(id: ids)
.update_all(
updated_at: Time.current,
status: Models::Message::Status::PUBLISHING)
status: Models::Message::Status::QUEUED)
end

Models::Message
.where(id: ids, status: Models::Message::Status::PUBLISHING)
.where(id: ids, status: Models::Message::Status::QUEUED)
.order(updated_at: :asc)
.to_a
end
Expand Down
4 changes: 3 additions & 1 deletion lib/outboxer/models/message.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@ class Message < ::ActiveRecord::Base

module Status
BACKLOGGED = 'backlogged'
QUEUED = 'queued'
PUBLISHING = 'publishing'
FAILED = 'failed'
end

STATUSES = [Status::BACKLOGGED, Status::PUBLISHING, Status::FAILED]
STATUSES = [Status::BACKLOGGED, Status::QUEUED, Status::PUBLISHING, Status::FAILED]

scope :backlogged, -> { where(status: Status::BACKLOGGED) }
scope :queued, -> { where(status: Status::QUEUED) }
scope :publishing, -> { where(status: Status::PUBLISHING) }
scope :failed, -> { where(status: Status::FAILED) }

Expand Down
2 changes: 1 addition & 1 deletion lib/outboxer/publisher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def publish!(thread_count: 5, queue_size: 8, poll_interval: 1,

logger&.info "Created #{thread_count} worker threads"

logger&.info 'Publishing messages to queue...'
logger&.info 'Publishing messages...'

while @running
begin
Expand Down
6 changes: 5 additions & 1 deletion spec/factories/outboxer_messages.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,17 @@
factory :outboxer_message, class: 'Outboxer::Models::Message' do
messageable_type { 'Event' }
messageable_id { 1 }
status { Outboxer::Models::Message::Status::PUBLISHING }
status { Outboxer::Models::Message::Status::QUEUED }
created_at { 1.day.ago }

trait :backlogged do
status { Outboxer::Models::Message::Status::BACKLOGGED }
end

trait :queued do
status { Outboxer::Models::Message::Status::QUEUED }
end

trait :publishing do
status { Outboxer::Models::Message::Status::PUBLISHING }
end
Expand Down
28 changes: 14 additions & 14 deletions spec/lib/outboxer/message/failed_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,36 +15,36 @@ def raise_exception
raise_exception
end

context 'when publishing message' do
let!(:publishing_message) { create(:outboxer_message, :publishing) }
context 'when queued message' do
let!(:queued_message) { create(:outboxer_message, :queued) }

let!(:failed_message) { Message.failed!(id: publishing_message.id, exception: exception) }
let!(:failed_message) { Message.failed!(id: queued_message.id, exception: exception) }

it 'returns updated message' do
expect(failed_message['id']).to eq(publishing_message.id)
expect(failed_message['id']).to eq(queued_message.id)
end

it 'updates message status to failed' do
publishing_message.reload
queued_message.reload

expect(publishing_message.status).to eq(Models::Message::Status::FAILED)
expect(queued_message.status).to eq(Models::Message::Status::FAILED)
end

it 'creates exception' do
publishing_message.reload
queued_message.reload

expect(publishing_message.exceptions.length).to eq(1)
expect(publishing_message.exceptions[0].class_name).to eq(exception.class.name)
expect(publishing_message.exceptions[0].message_text).to eq(exception.message)
expect(queued_message.exceptions.length).to eq(1)
expect(queued_message.exceptions[0].class_name).to eq(exception.class.name)
expect(queued_message.exceptions[0].message_text).to eq(exception.message)
end

it 'creates frames' do
publishing_message.reload
queued_message.reload

expect(publishing_message.exceptions[0].frames.length).to eq(65)
expect(queued_message.exceptions[0].frames.length).to eq(65)

expect(publishing_message.exceptions[0].frames[0].index).to eq(0)
expect(publishing_message.exceptions[0].frames[0].text)
expect(queued_message.exceptions[0].frames[0].index).to eq(0)
expect(queued_message.exceptions[0].frames[0].text)
.to include('outboxer/spec/lib/outboxer/message/failed_spec.rb:9:in `raise_exception')
end
end
Expand Down
10 changes: 5 additions & 5 deletions spec/lib/outboxer/message/published_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@
module Outboxer
RSpec.describe Message do
describe '.published!' do
context 'when publishing message' do
let!(:publishing_message) { create(:outboxer_message, :publishing) }
context 'when queued message' do
let!(:queued_message) { create(:outboxer_message, :queued) }

let!(:published_message) { Message.published!(id: publishing_message.id) }
let!(:published_message) { Message.published!(id: queued_message.id) }

it 'returns nil' do
expect(published_message).to eq({ 'id' => publishing_message.id })
expect(published_message).to eq({ 'id' => queued_message.id })
end

it 'deletes publishing message' do
it 'deletes queued message' do
expect(Models::Message.count).to eq(0)
end
end
Expand Down
15 changes: 8 additions & 7 deletions spec/lib/outboxer/messages/counts_by_status_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,24 @@ module Outboxer
describe '.counts_by_status' do
context 'when no messages exist' do
it 'returns 0 for all statuses' do
expected_counts = { 'backlogged' => 0, 'publishing' => 0, 'failed' => 0 }

expect(Outboxer::Messages.counts_by_status).to eq(expected_counts)
expect(
Outboxer::Messages.counts_by_status
).to eq({ 'backlogged' => 0, 'queued' => 0, 'publishing' => 0, 'failed' => 0 })
end
end

context 'when messages exist' do
before do
create_list(:outboxer_message, 2, :backlogged)
create_list(:outboxer_message, 3, :publishing)
create_list(:outboxer_message, 3, :queued)
create_list(:outboxer_message, 5, :publishing)
create_list(:outboxer_message, 4, :failed)
end

it 'returns correct counts for each status' do
expected_counts = { 'backlogged' => 2, 'publishing' => 3, 'failed' => 4 }

expect(Messages.counts_by_status).to eq(expected_counts)
expect(
Messages.counts_by_status
).to eq({ 'backlogged' => 2, 'queued' => 3, 'publishing' => 5, 'failed' => 4 })
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion spec/lib/outboxer/messages/delete_selected_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ module Outboxer
describe '.delete_selected!' do
let!(:message_1) { create(:outboxer_message, :backlogged) }

let!(:message_2) { create(:outboxer_message, :publishing) }
let!(:message_2) { create(:outboxer_message, :queued) }

let!(:message_3) { create(:outboxer_message, :failed) }
let!(:exception_3) { create(:outboxer_exception, message: message_3) }
Expand Down
12 changes: 6 additions & 6 deletions spec/lib/outboxer/messages/list/filter_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ module Outboxer
create(:outboxer_message, :failed, id: 3,
messageable_type: 'Event', messageable_id: 2,
created_at: 4.minutes.ago, updated_at: 3.minutes.ago)
create(:outboxer_message, :publishing, id: 2,
create(:outboxer_message, :queued, id: 2,
messageable_type: 'Event', messageable_id: 3,
created_at: 3.minutes.ago, updated_at: 2.minutes.ago)
create(:outboxer_message, :backlogged, id: 1,
Expand Down Expand Up @@ -41,7 +41,7 @@ module Outboxer
},
{
'id' => 2,
'status' => 'publishing',
'status' => 'queued',
'messageable' => 'Event::3',
'created_at' => 3.minutes.ago.utc.to_s,
'updated_at' => 2.minutes.ago.utc.to_s
Expand Down Expand Up @@ -87,14 +87,14 @@ module Outboxer
end
end

context 'with publishing status' do
it 'returns publishing messages' do
context 'with queued status' do
it 'returns queued messages' do
expect(
Messages.list(status: :publishing, sort: :status, order: :asc)
Messages.list(status: :queued, sort: :status, order: :asc)
).to match_array([
{
'id' => 2,
'status' => 'publishing',
'status' => 'queued',
'messageable' => 'Event::3',
'created_at' => 3.minutes.ago.utc.to_s,
'updated_at' => 2.minutes.ago.utc.to_s
Expand Down
4 changes: 2 additions & 2 deletions spec/lib/outboxer/messages/list/no_arguments_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ module Outboxer
id: 1, messageable_type: 'Event', messageable_id: 1),
create(:outboxer_message, :failed,
id: 2, messageable_type: 'Event', messageable_id: 2),
create(:outboxer_message, :publishing,
create(:outboxer_message, :queued,
id: 3, messageable_type: 'Event', messageable_id: 3),
create(:outboxer_message, :backlogged,
id: 4, messageable_type: 'Event', messageable_id: 4)
Expand All @@ -22,7 +22,7 @@ module Outboxer
).to match_array([
{ 'id' => 1, 'status' => 'backlogged', 'messageable' => 'Event::1' },
{ 'id' => 2, 'status' => 'failed', 'messageable' => 'Event::2' },
{ 'id' => 3, 'status' => 'publishing', 'messageable' => 'Event::3' },
{ 'id' => 3, 'status' => 'queued', 'messageable' => 'Event::3' },
{ 'id' => 4, 'status' => 'backlogged', 'messageable' => 'Event::4' },
])
end
Expand Down
6 changes: 3 additions & 3 deletions spec/lib/outboxer/messages/list/sort_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ module Outboxer
create(:outboxer_message, id: 3, status: :failed,
messageable_type: 'Event', messageable_id: 2,
created_at: 4.minutes.ago, updated_at: 3.minutes.ago)
create(:outboxer_message, id: 2, status: :publishing,
create(:outboxer_message, id: 2, status: :queued,
messageable_type: 'Event', messageable_id: 3,
created_at: 3.minutes.ago, updated_at: 2.minutes.ago)
create(:outboxer_message, id: 1, status: :backlogged,
Expand Down Expand Up @@ -41,15 +41,15 @@ module Outboxer
Messages
.list(sort: :status, order: :asc)
.map { |message| message['status'] }
).to eq(['backlogged', 'backlogged', 'failed', 'publishing'])
).to eq(['backlogged', 'backlogged', 'failed', 'queued'])
end

it 'sorts messages by status in descending order' do
expect(
Messages
.list(sort: :status, order: :desc)
.map { |message| message['status'] }
).to eq(['publishing', 'failed', 'backlogged', 'backlogged'])
).to eq(['queued', 'failed', 'backlogged', 'backlogged'])
end
end

Expand Down
8 changes: 4 additions & 4 deletions spec/lib/outboxer/messages/queue_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ module Outboxer
end

context 'when limit is 1' do
let!(:publishing_messages) { Messages.queue!(limit: 1) }
let!(:queued_messages) { Messages.queue!(limit: 1) }

it 'returns first backlogged message' do
expect(publishing_messages.count).to eq(1)
expect(queued_messages.count).to eq(1)

publishing_message = publishing_messages.first
expect(publishing_message.status).to eq(Models::Message::Status::PUBLISHING)
queued_message = queued_messages.first
expect(queued_message.status).to eq(Models::Message::Status::QUEUED)
end

it 'keeps last backlogged message' do
Expand Down
6 changes: 3 additions & 3 deletions spec/lib/outboxer/messages/republish_all_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ module Outboxer
let!(:exception_1) { create(:outboxer_exception, message: message_1) }
let!(:frame_1) { create(:outboxer_frame, exception: exception_1) }

let!(:message_2) { create(:outboxer_message, :publishing) }
let!(:message_2) { create(:outboxer_message, :queued) }
let!(:exception_2) { create(:outboxer_exception, message: message_2) }
let!(:frame_2) { create(:outboxer_frame, exception: exception_2) }

Expand All @@ -30,9 +30,9 @@ module Outboxer
).to eq(3)
end

it 'does not change messages that are publishing' do
it 'does not change messages that are queued' do
expect(
Models::Message.where(id: [message_2], status: Models::Message::Status::PUBLISHING).count
Models::Message.where(id: [message_2], status: Models::Message::Status::QUEUED).count
).to eq(1)
end
end
Expand Down
6 changes: 3 additions & 3 deletions spec/lib/outboxer/publisher/pop_message_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
module Outboxer
RSpec.describe Publisher do
describe '.pop_message!' do
context 'when a publishing message is in the queue' do
context 'when a queued message is in the queue' do
let(:queue) { Queue.new }
let(:logger) { instance_double(Logger, debug: true, error: true) }

let(:message) { create(:outboxer_message, :publishing) }
let(:message) { create(:outboxer_message, :queued) }

before do
queue.push(message)
Expand All @@ -31,7 +31,7 @@ module Outboxer
let(:queue) { Queue.new }
let(:logger) { instance_double(Logger, debug: true, error: true) }

let(:message) { create(:outboxer_message, :publishing) }
let(:message) { create(:outboxer_message, :queued) }

let(:error) { StandardError.new('processing error') }

Expand Down
Loading

0 comments on commit bf5af55

Please sign in to comment.