From 20a369ba0a39435d8911f20c17634c84fcc4e28a Mon Sep 17 00:00:00 2001 From: bedrock-adam Date: Sun, 31 Mar 2024 16:27:50 +1100 Subject: [PATCH 1/6] commit changes --- bin/sidekiq_publishermon | 6 ++-- db/seeds.rb | 2 +- lib/outboxer/message.rb | 4 +-- lib/outboxer/messages.rb | 8 ++--- lib/outboxer/models/message.rb | 8 ++--- lib/outboxer/publisher.rb | 2 +- spec/factories/outboxer_messages.rb | 4 +-- spec/lib/outboxer/message/failed_spec.rb | 16 +++++----- spec/lib/outboxer/message/published_spec.rb | 16 +++++----- spec/lib/outboxer/message/republish_spec.rb | 26 +++++++-------- .../messages/counts_by_status_spec.rb | 6 ++-- .../outboxer/messages/delete_selected_spec.rb | 4 +-- .../lib/outboxer/messages/list/filter_spec.rb | 18 +++++------ .../messages/list/no_arguments_spec.rb | 8 ++--- .../outboxer/messages/list/pagination_spec.rb | 2 +- spec/lib/outboxer/messages/list/sort_spec.rb | 8 ++--- .../outboxer/messages/republish_all_spec.rb | 6 ++-- .../messages/republish_selected_spec.rb | 4 +-- .../lib/outboxer/messages/unpublished_spec.rb | 32 +++++++++---------- spec/lib/outboxer/publisher/publish_spec.rb | 2 +- .../outboxer/publisher/push_messages_spec.rb | 4 +-- 21 files changed, 93 insertions(+), 93 deletions(-) diff --git a/bin/sidekiq_publishermon b/bin/sidekiq_publishermon index 1db59314..66172d48 100755 --- a/bin/sidekiq_publishermon +++ b/bin/sidekiq_publishermon @@ -28,12 +28,12 @@ end loop do system("clear") || system("cls") - total_unpublished = nil + total_backlogged = nil total_publishing = nil total_failed = nil ActiveRecord::Base.connection_pool.with_connection do - total_unpublished = Outboxer::Models::Message.unpublished.count + total_backlogged = Outboxer::Models::Message.backlogged.count total_publishing = Outboxer::Models::Message.publishing.count total_failed = Outboxer::Models::Message.failed.count end @@ -43,7 +43,7 @@ loop do puts "Outboxer Report" puts Time.now.utc puts "\n---- Overview ----" - puts format("%-15s: %-10s", "Unpublished", total_unpublished) + puts format("%-15s: %-10s", "Backlogged", total_backlogged) puts format("%-15s: %-10s", "Publishing", total_publishing) puts format("%-15s: %-10s", "Failed", total_failed) diff --git a/db/seeds.rb b/db/seeds.rb index d0ce521f..1388cfa4 100644 --- a/db/seeds.rb +++ b/db/seeds.rb @@ -7,7 +7,7 @@ Outboxer::Models::Message.create!( messageable_type: 'Event', messageable_id: i, - status: Outboxer::Models::Message::Status::UNPUBLISHED) + status: Outboxer::Models::Message::Status::BACKLOGGED) when 1 Outboxer::Models::Message.create!( messageable_type: 'Event', diff --git a/lib/outboxer/message.rb b/lib/outboxer/message.rb index 008b97e5..e343f1d6 100644 --- a/lib/outboxer/message.rb +++ b/lib/outboxer/message.rb @@ -109,10 +109,10 @@ def republish!(id:) if message.status != Models::Message::Status::FAILED raise InvalidTransition, "cannot transition outboxer message #{id} " \ - "from #{message.status} to #{Models::Message::Status::UNPUBLISHED}" + "from #{message.status} to #{Models::Message::Status::BACKLOGGED}" end - message.update!(status: Models::Message::Status::UNPUBLISHED) + message.update!(status: Models::Message::Status::BACKLOGGED) { 'id' => id } end diff --git a/lib/outboxer/messages.rb b/lib/outboxer/messages.rb index d899755e..4114a958 100644 --- a/lib/outboxer/messages.rb +++ b/lib/outboxer/messages.rb @@ -22,13 +22,13 @@ def counts_by_status end end - def unpublished!(limit: 1, order: :asc) + def queue!(limit: 1, order: :asc) ActiveRecord::Base.connection_pool.with_connection do ids = [] ActiveRecord::Base.transaction do ids = Models::Message - .where(status: Models::Message::Status::UNPUBLISHED) + .where(status: Models::Message::Status::BACKLOGGED) .order(created_at: order) .lock('FOR UPDATE SKIP LOCKED') .limit(limit) @@ -109,7 +109,7 @@ def republish_all!(batch_size: 100) .pluck(:id) updated_count = Models::Message.where(id: locked_ids).update_all( - status: Models::Message::Status::UNPUBLISHED, updated_at: DateTime.now.utc) + status: Models::Message::Status::BACKLOGGED, updated_at: DateTime.now.utc) end updated_total_count += updated_count @@ -138,7 +138,7 @@ def republish_selected!(ids:) end updated_count = Models::Message.where(id: locked_ids).update_all( - status: Models::Message::Status::UNPUBLISHED, updated_at: DateTime.now.utc) + status: Models::Message::Status::BACKLOGGED, updated_at: DateTime.now.utc) end end diff --git a/lib/outboxer/models/message.rb b/lib/outboxer/models/message.rb index b70df92f..d565e013 100644 --- a/lib/outboxer/models/message.rb +++ b/lib/outboxer/models/message.rb @@ -20,18 +20,18 @@ class Message < ::ActiveRecord::Base self.table_name = :outboxer_messages module Status - UNPUBLISHED = 'unpublished' + BACKLOGGED = 'backlogged' PUBLISHING = 'publishing' FAILED = 'failed' end - STATUSES = [Status::UNPUBLISHED, Status::PUBLISHING, Status::FAILED] + STATUSES = [Status::BACKLOGGED, Status::PUBLISHING, Status::FAILED] - scope :unpublished, -> { where(status: Status::UNPUBLISHED) } + scope :backlogged, -> { where(status: Status::BACKLOGGED) } scope :publishing, -> { where(status: Status::PUBLISHING) } scope :failed, -> { where(status: Status::FAILED) } - attribute :status, default: -> { Status::UNPUBLISHED } + attribute :status, default: -> { Status::BACKLOGGED } validates :status, inclusion: { in: STATUSES }, length: { maximum: 255 } belongs_to :messageable, polymorphic: true diff --git a/lib/outboxer/publisher.rb b/lib/outboxer/publisher.rb index 016fbf9a..aa53efc8 100644 --- a/lib/outboxer/publisher.rb +++ b/lib/outboxer/publisher.rb @@ -41,7 +41,7 @@ def push_messages!(threads:, queue:, queue_size:, poll_interval:, logger:, kerne messages = [] queue_remaining = queue_size - queue.length - messages = (queue_remaining > 0) ? Outboxer::Messages.unpublished!(limit: queue_remaining) : [] + messages = (queue_remaining > 0) ? Outboxer::Messages.queue!(limit: queue_remaining) : [] if messages.empty? debug_log("Sleeping for #{poll_interval} seconds because there are no messages", diff --git a/spec/factories/outboxer_messages.rb b/spec/factories/outboxer_messages.rb index b81dd35c..c8cc46aa 100644 --- a/spec/factories/outboxer_messages.rb +++ b/spec/factories/outboxer_messages.rb @@ -5,8 +5,8 @@ status { Outboxer::Models::Message::Status::PUBLISHING } created_at { 1.day.ago } - trait :unpublished do - status { Outboxer::Models::Message::Status::UNPUBLISHED } + trait :backlogged do + status { Outboxer::Models::Message::Status::BACKLOGGED } end trait :publishing do diff --git a/spec/lib/outboxer/message/failed_spec.rb b/spec/lib/outboxer/message/failed_spec.rb index c32a60d5..6d30d659 100644 --- a/spec/lib/outboxer/message/failed_spec.rb +++ b/spec/lib/outboxer/message/failed_spec.rb @@ -49,27 +49,27 @@ def raise_exception end end - context 'when unpublished messaged' do - let!(:unpublished_message) { create(:outboxer_message, :unpublished) } + context 'when backlogged message' do + let!(:backlogged_message) { create(:outboxer_message, :backlogged) } it 'raises invalid transition error' do expect do - Message.failed!(id: unpublished_message.id, exception: exception) + Message.failed!(id: backlogged_message.id, exception: exception) end.to raise_error( Message::InvalidTransition, - "cannot transition outboxer message #{unpublished_message.id} " + - "from unpublished to failed") + "cannot transition outboxer message #{backlogged_message.id} " + + "from backlogged to failed") end - it 'does not delete unpublished message' do + it 'does not delete backlogged message' do begin - Message.failed!(id: unpublished_message.id, exception: exception) + Message.failed!(id: backlogged_message.id, exception: exception) rescue Message::InvalidTransition # ignore end expect(Models::Message.count).to eq(1) - expect(Models::Message.first).to eq(unpublished_message) + expect(Models::Message.first).to eq(backlogged_message) end end end diff --git a/spec/lib/outboxer/message/published_spec.rb b/spec/lib/outboxer/message/published_spec.rb index 174f9503..3a5a3d74 100644 --- a/spec/lib/outboxer/message/published_spec.rb +++ b/spec/lib/outboxer/message/published_spec.rb @@ -17,27 +17,27 @@ module Outboxer end end - context 'when unpublished messaged' do - let(:unpublished_message) { create(:outboxer_message, :unpublished) } + context 'when backlogged messaged' do + let(:backlogged_message) { create(:outboxer_message, :backlogged) } it 'raises invalid transition error' do expect do - Message.published!(id: unpublished_message.id) + Message.published!(id: backlogged_message.id) end.to raise_error( Message::InvalidTransition, - "cannot transition message #{unpublished_message.id} " + - "from unpublished to (deleted)") + "cannot transition message #{backlogged_message.id} " + + "from backlogged to (deleted)") end - it 'does not delete unpublished message' do + it 'does not delete backlogged message' do begin - Message.published!(id: unpublished_message.id) + Message.published!(id: backlogged_message.id) rescue Message::InvalidTransition # ignore end expect(Models::Message.count).to eq(1) - expect(Models::Message.first).to eq(unpublished_message) + expect(Models::Message.first).to eq(backlogged_message) end end end diff --git a/spec/lib/outboxer/message/republish_spec.rb b/spec/lib/outboxer/message/republish_spec.rb index 3194c28b..11230c3b 100644 --- a/spec/lib/outboxer/message/republish_spec.rb +++ b/spec/lib/outboxer/message/republish_spec.rb @@ -6,40 +6,40 @@ module Outboxer context 'when failed message' do let!(:failed_message) { create(:outboxer_message, :failed) } - let!(:unpublished_message) { Message.republish!(id: failed_message.id) } + let!(:backlogged_message) { Message.republish!(id: failed_message.id) } - it 'returns unpublished message' do - expect(unpublished_message['id']).to eq(failed_message.id) + it 'returns backlogged message' do + expect(backlogged_message['id']).to eq(failed_message.id) end - it 'updates failed message status to unpublishied' do + it 'updates failed message status to backlogged' do failed_message.reload - expect(failed_message.status).to eq(Models::Message::Status::UNPUBLISHED) + expect(failed_message.status).to eq(Models::Message::Status::BACKLOGGED) end end - context 'when unpublished messaged' do - let(:unpublished_message) { create(:outboxer_message, :unpublished) } + context 'when backlogged messaged' do + let(:backlogged_message) { create(:outboxer_message, :backlogged) } it 'raises invalid transition error' do expect do - Message.republish!(id: unpublished_message.id) + Message.republish!(id: backlogged_message.id) end.to raise_error( Message::InvalidTransition, - "cannot transition outboxer message #{unpublished_message.id} " + - "from unpublished to unpublished") + "cannot transition outboxer message #{backlogged_message.id} " + + "from backlogged to backlogged") end - it 'does not delete unpublished message' do + it 'does not delete backlogged message' do begin - Message.published!(id: unpublished_message.id) + Message.published!(id: backlogged_message.id) rescue Message::InvalidTransition # ignore end expect(Models::Message.count).to eq(1) - expect(Models::Message.first).to eq(unpublished_message) + expect(Models::Message.first).to eq(backlogged_message) end end end diff --git a/spec/lib/outboxer/messages/counts_by_status_spec.rb b/spec/lib/outboxer/messages/counts_by_status_spec.rb index 42a11c46..e2e113d8 100644 --- a/spec/lib/outboxer/messages/counts_by_status_spec.rb +++ b/spec/lib/outboxer/messages/counts_by_status_spec.rb @@ -5,7 +5,7 @@ module Outboxer describe '.counts_by_status' do context 'when no messages exist' do it 'returns 0 for all statuses' do - expected_counts = { 'unpublished' => 0, 'publishing' => 0, 'failed' => 0 } + expected_counts = { 'backlogged' => 0, 'publishing' => 0, 'failed' => 0 } expect(Outboxer::Messages.counts_by_status).to eq(expected_counts) end @@ -13,13 +13,13 @@ module Outboxer context 'when messages exist' do before do - create_list(:outboxer_message, 2, :unpublished) + create_list(:outboxer_message, 2, :backlogged) create_list(:outboxer_message, 3, :publishing) create_list(:outboxer_message, 4, :failed) end it 'returns correct counts for each status' do - expected_counts = { 'unpublished' => 2, 'publishing' => 3, 'failed' => 4 } + expected_counts = { 'backlogged' => 2, 'publishing' => 3, 'failed' => 4 } expect(Messages.counts_by_status).to eq(expected_counts) end diff --git a/spec/lib/outboxer/messages/delete_selected_spec.rb b/spec/lib/outboxer/messages/delete_selected_spec.rb index c08adf3f..de2d085b 100644 --- a/spec/lib/outboxer/messages/delete_selected_spec.rb +++ b/spec/lib/outboxer/messages/delete_selected_spec.rb @@ -3,7 +3,7 @@ module Outboxer RSpec.describe Messages do describe '.delete_selected!' do - let!(:message_1) { create(:outboxer_message, :unpublished) } + let!(:message_1) { create(:outboxer_message, :backlogged) } let!(:message_2) { create(:outboxer_message, :publishing) } @@ -11,7 +11,7 @@ module Outboxer let!(:exception_3) { create(:outboxer_exception, message: message_3) } let!(:frame_3) { create(:outboxer_frame, exception: exception_3) } - let!(:message_4) { create(:outboxer_message, :unpublished) } + let!(:message_4) { create(:outboxer_message, :backlogged) } describe 'when ids exist' do let!(:ids) { [message_1.id, message_2.id, message_3.id] } diff --git a/spec/lib/outboxer/messages/list/filter_spec.rb b/spec/lib/outboxer/messages/list/filter_spec.rb index 5e28142e..ee207a97 100644 --- a/spec/lib/outboxer/messages/list/filter_spec.rb +++ b/spec/lib/outboxer/messages/list/filter_spec.rb @@ -3,7 +3,7 @@ module Outboxer RSpec.describe Messages do before do - create(:outboxer_message, :unpublished, id: 4, + create(:outboxer_message, :backlogged, id: 4, messageable_type: 'Event', messageable_id: 1, created_at: 5.minutes.ago, updated_at: 4.minutes.ago) create(:outboxer_message, :failed, id: 3, @@ -12,7 +12,7 @@ module Outboxer create(:outboxer_message, :publishing, id: 2, messageable_type: 'Event', messageable_id: 3, created_at: 3.minutes.ago, updated_at: 2.minutes.ago) - create(:outboxer_message, :unpublished, id: 1, + create(:outboxer_message, :backlogged, id: 1, messageable_type: 'Event', messageable_id: 4, created_at: 2.minutes.ago, updated_at: 1.minute.ago) end @@ -48,14 +48,14 @@ module Outboxer }, { 'id' => 4, - 'status' => 'unpublished', + 'status' => 'backlogged', 'messageable' => 'Event::1', 'created_at' => 5.minutes.ago.utc.to_s, 'updated_at' => 4.minutes.ago.utc.to_s }, { 'id' => 1, - 'status' => 'unpublished', + 'status' => 'backlogged', 'messageable' => 'Event::4', 'created_at' => 2.minutes.ago.utc.to_s, 'updated_at' => 1.minute.ago.utc.to_s @@ -64,21 +64,21 @@ module Outboxer end end - context 'with unpublished status' do - it 'returns unpublished messages' do + context 'with backlogged status' do + it 'returns backlogged messages' do expect( - Messages.list(status: :unpublished, sort: :status, order: :asc) + Messages.list(status: :backlogged, sort: :status, order: :asc) ).to match_array([ { 'id' => 4, - 'status' => 'unpublished', + 'status' => 'backlogged', 'messageable' => 'Event::1', 'created_at' => 5.minutes.ago.utc.to_s, 'updated_at' => 4.minutes.ago.utc.to_s }, { 'id' => 1, - 'status' => 'unpublished', + 'status' => 'backlogged', 'messageable' => 'Event::4', 'created_at' => 2.minutes.ago.utc.to_s, 'updated_at' => 1.minute.ago.utc.to_s diff --git a/spec/lib/outboxer/messages/list/no_arguments_spec.rb b/spec/lib/outboxer/messages/list/no_arguments_spec.rb index 692b0722..b9b07919 100644 --- a/spec/lib/outboxer/messages/list/no_arguments_spec.rb +++ b/spec/lib/outboxer/messages/list/no_arguments_spec.rb @@ -5,13 +5,13 @@ module Outboxer describe '.list' do let!(:messages) do [ - create(:outboxer_message, :unpublished, + create(:outboxer_message, :backlogged, id: 1, messageable_type: 'Event', messageable_id: 1), create(:outboxer_message, :failed, id: 2, messageable_type: 'Event', messageable_id: 2), create(:outboxer_message, :publishing, id: 3, messageable_type: 'Event', messageable_id: 3), - create(:outboxer_message, :unpublished, + create(:outboxer_message, :backlogged, id: 4, messageable_type: 'Event', messageable_id: 4) ] end @@ -20,10 +20,10 @@ module Outboxer expect( Messages.list.map { |message| message.slice('id', 'status', 'messageable') } ).to match_array([ - { 'id' => 1, 'status' => 'unpublished', 'messageable' => 'Event::1' }, + { 'id' => 1, 'status' => 'backlogged', 'messageable' => 'Event::1' }, { 'id' => 2, 'status' => 'failed', 'messageable' => 'Event::2' }, { 'id' => 3, 'status' => 'publishing', 'messageable' => 'Event::3' }, - { 'id' => 4, 'status' => 'unpublished', 'messageable' => 'Event::4' }, + { 'id' => 4, 'status' => 'backlogged', 'messageable' => 'Event::4' }, ]) end end diff --git a/spec/lib/outboxer/messages/list/pagination_spec.rb b/spec/lib/outboxer/messages/list/pagination_spec.rb index 335ba3fe..b4f88cb6 100644 --- a/spec/lib/outboxer/messages/list/pagination_spec.rb +++ b/spec/lib/outboxer/messages/list/pagination_spec.rb @@ -3,7 +3,7 @@ module Outboxer RSpec.describe Messages do before do - create_list(:outboxer_message, 101, status: :unpublished, + create_list(:outboxer_message, 101, status: :backlogged, messageable_type: 'Event', messageable_id: 1) end diff --git a/spec/lib/outboxer/messages/list/sort_spec.rb b/spec/lib/outboxer/messages/list/sort_spec.rb index 0f6c4ee7..d00f9ed5 100644 --- a/spec/lib/outboxer/messages/list/sort_spec.rb +++ b/spec/lib/outboxer/messages/list/sort_spec.rb @@ -3,7 +3,7 @@ module Outboxer RSpec.describe Messages do before do - create(:outboxer_message, id: 4, status: :unpublished, + create(:outboxer_message, id: 4, status: :backlogged, messageable_type: 'Event', messageable_id: 1, created_at: 5.minutes.ago, updated_at: 4.minutes.ago) create(:outboxer_message, id: 3, status: :failed, @@ -12,7 +12,7 @@ module Outboxer create(:outboxer_message, id: 2, status: :publishing, messageable_type: 'Event', messageable_id: 3, created_at: 3.minutes.ago, updated_at: 2.minutes.ago) - create(:outboxer_message, id: 1, status: :unpublished, + create(:outboxer_message, id: 1, status: :backlogged, messageable_type: 'Event', messageable_id: 4, created_at: 2.minutes.ago, updated_at: 1.minute.ago) end @@ -41,7 +41,7 @@ module Outboxer Messages .list(sort: :status, order: :asc) .map { |message| message['status'] } - ).to eq(['failed', 'publishing', 'unpublished', 'unpublished']) + ).to eq(['failed', 'publishing', 'backlogged', 'backlogged']) end it 'sorts messages by status in descending order' do @@ -49,7 +49,7 @@ module Outboxer Messages .list(sort: :status, order: :desc) .map { |message| message['status'] } - ).to eq(['unpublished', 'unpublished', 'publishing', 'failed']) + ).to eq(['backlogged', 'backlogged', 'publishing', 'failed']) end end diff --git a/spec/lib/outboxer/messages/republish_all_spec.rb b/spec/lib/outboxer/messages/republish_all_spec.rb index 292732ae..a051c924 100644 --- a/spec/lib/outboxer/messages/republish_all_spec.rb +++ b/spec/lib/outboxer/messages/republish_all_spec.rb @@ -3,7 +3,7 @@ module Outboxer RSpec.describe Messages do describe '.republish_all!' do - let!(:message_1) { create(:outboxer_message, :unpublished) } + let!(:message_1) { create(:outboxer_message, :backlogged) } let!(:exception_1) { create(:outboxer_exception, message: message_1) } let!(:frame_1) { create(:outboxer_frame, exception: exception_1) } @@ -21,11 +21,11 @@ module Outboxer let!(:result) { Messages.republish_all!(batch_size: 1) } - it 'sets failed messages to unpublished' do + it 'sets failed messages to backlogged' do expect( Models::Message.where( id: [message_1, message_3.id, message_4.id], - status: Models::Message::Status::UNPUBLISHED + status: Models::Message::Status::BACKLOGGED ).count ).to eq(3) end diff --git a/spec/lib/outboxer/messages/republish_selected_spec.rb b/spec/lib/outboxer/messages/republish_selected_spec.rb index 2d9433f1..731e7841 100644 --- a/spec/lib/outboxer/messages/republish_selected_spec.rb +++ b/spec/lib/outboxer/messages/republish_selected_spec.rb @@ -15,10 +15,10 @@ module Outboxer let!(:result) { Messages.republish_selected!(ids: ids) } describe 'when ids exist' do - it 'sets message status to unpublished' do + it 'sets message status to backlogged' do expect( Models::Message - .where(status: Models::Message::Status::UNPUBLISHED) + .where(status: Models::Message::Status::BACKLOGGED) .order(id: :asc) .pluck(:id) ).to eq(ids) diff --git a/spec/lib/outboxer/messages/unpublished_spec.rb b/spec/lib/outboxer/messages/unpublished_spec.rb index 71c4bb98..465cf248 100644 --- a/spec/lib/outboxer/messages/unpublished_spec.rb +++ b/spec/lib/outboxer/messages/unpublished_spec.rb @@ -2,54 +2,54 @@ module Outboxer RSpec.describe Messages do - describe '.unpublished!' do - context 'when there are 2 unpublished messages' do - let!(:unpublished_messages) do + describe '.backlogged!' do + context 'when there are 2 backlogged messages' do + let!(:backlogged_messages) do [ - create(:outboxer_message, :unpublished, created_at: 2.minutes.ago), - create(:outboxer_message, :unpublished, created_at: 1.minute.ago) + create(:outboxer_message, :backlogged, created_at: 2.minutes.ago), + create(:outboxer_message, :backlogged, created_at: 1.minute.ago) ] end context 'when order asc' do context 'when limit is 1' do - let!(:publishing_messages) { Messages.unpublished!(limit: 1) } + let!(:publishing_messages) { Messages.queue!(limit: 1) } - it 'returns first unpublished message' do + it 'returns first backlogged message' do expect(publishing_messages.count).to eq(1) publishing_message = publishing_messages.first expect(publishing_message.status).to eq(Models::Message::Status::PUBLISHING) end - it 'keeps last unpublished message' do - remaining_messages = Models::Message.where(status: Models::Message::Status::UNPUBLISHED) + it 'keeps last backlogged message' do + remaining_messages = Models::Message.where(status: Models::Message::Status::BACKLOGGED) expect(remaining_messages.count).to eq(1) remaining_message = remaining_messages.last - expect(remaining_message).to eq(unpublished_messages.last) + expect(remaining_message).to eq(backlogged_messages.last) end end end context 'when order desc' do context 'when limit is 1' do - let!(:publishing_messages) { Messages.unpublished!(limit: 1, order: :desc) } + let!(:publishing_messages) { Messages.queue!(limit: 1, order: :desc) } - it 'returns first unpublished message' do + it 'returns first backlogged message' do expect(publishing_messages.count).to eq(1) publishing_message = publishing_messages.last - expect(publishing_message).to eq(unpublished_messages.last) + expect(publishing_message).to eq(backlogged_messages.last) end - it 'keeps first unpublished message' do - remaining_messages = Models::Message.where(status: Models::Message::Status::UNPUBLISHED) + it 'keeps first backlogged message' do + remaining_messages = Models::Message.where(status: Models::Message::Status::BACKLOGGED) expect(remaining_messages.count).to eq(1) remaining_message = remaining_messages.first - expect(remaining_message).to eq(unpublished_messages.first) + expect(remaining_message).to eq(backlogged_messages.first) end end end diff --git a/spec/lib/outboxer/publisher/publish_spec.rb b/spec/lib/outboxer/publisher/publish_spec.rb index d350fdfc..a659b957 100644 --- a/spec/lib/outboxer/publisher/publish_spec.rb +++ b/spec/lib/outboxer/publisher/publish_spec.rb @@ -13,7 +13,7 @@ module Outboxer allow(logger).to receive(:level=) end - let!(:message) { create(:outboxer_message, :unpublished) } + let!(:message) { create(:outboxer_message, :backlogged) } context 'when message published successfully' do before do diff --git a/spec/lib/outboxer/publisher/push_messages_spec.rb b/spec/lib/outboxer/publisher/push_messages_spec.rb index 58d1158f..6195753f 100644 --- a/spec/lib/outboxer/publisher/push_messages_spec.rb +++ b/spec/lib/outboxer/publisher/push_messages_spec.rb @@ -3,7 +3,7 @@ module Outboxer RSpec.describe Publisher do describe '.push_messages!' do - context 'when an unpublished message has been created' do + context 'when an backlogged message has been created' do let(:queue) { Queue.new } let(:logger) { instance_double(Logger, debug: true, error: true) } let(:threads) { [] } @@ -11,7 +11,7 @@ module Outboxer let(:poll_interval) { 1 } let(:kernel) { class_double(Kernel, sleep: nil) } - let!(:message) { create(:outboxer_message, :unpublished) } + let!(:message) { create(:outboxer_message, :backlogged) } before do Publisher.push_messages!( From aa644c33fe25ec4b413eaadc5f4a27fc66653f40 Mon Sep 17 00:00:00 2001 From: bedrock-adam Date: Sun, 31 Mar 2024 16:30:18 +1100 Subject: [PATCH 2/6] fix specs --- spec/lib/outboxer/messages/list/sort_spec.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spec/lib/outboxer/messages/list/sort_spec.rb b/spec/lib/outboxer/messages/list/sort_spec.rb index d00f9ed5..c3b965d3 100644 --- a/spec/lib/outboxer/messages/list/sort_spec.rb +++ b/spec/lib/outboxer/messages/list/sort_spec.rb @@ -41,7 +41,7 @@ module Outboxer Messages .list(sort: :status, order: :asc) .map { |message| message['status'] } - ).to eq(['failed', 'publishing', 'backlogged', 'backlogged']) + ).to eq(['backlogged', 'backlogged', 'failed', 'publishing']) end it 'sorts messages by status in descending order' do @@ -49,7 +49,7 @@ module Outboxer Messages .list(sort: :status, order: :desc) .map { |message| message['status'] } - ).to eq(['backlogged', 'backlogged', 'publishing', 'failed']) + ).to eq(['publishing', 'failed', 'backlogged', 'backlogged']) end end From 1c304f8e3ebb6d52a87b2f33a7a7962df943d50c Mon Sep 17 00:00:00 2001 From: bedrock-adam Date: Sun, 31 Mar 2024 18:16:15 +1100 Subject: [PATCH 3/6] add Messages.queue spec --- lib/outboxer/messageable.rb | 2 + lib/outboxer/messages.rb | 12 ++-- spec/lib/outboxer/messages/queue_spec.rb | 36 +++++++++++ .../lib/outboxer/messages/unpublished_spec.rb | 59 ------------------- 4 files changed, 46 insertions(+), 63 deletions(-) create mode 100644 spec/lib/outboxer/messages/queue_spec.rb delete mode 100644 spec/lib/outboxer/messages/unpublished_spec.rb diff --git a/lib/outboxer/messageable.rb b/lib/outboxer/messageable.rb index d834a15d..1e1ad39e 100644 --- a/lib/outboxer/messageable.rb +++ b/lib/outboxer/messageable.rb @@ -22,6 +22,8 @@ def has_outboxer_message( as: as after_create :create_outboxer_message! + + # after_create { |event| Outboxer::Message.backlog(messageable: event) } end end end diff --git a/lib/outboxer/messages.rb b/lib/outboxer/messages.rb index 4114a958..7fae43a7 100644 --- a/lib/outboxer/messages.rb +++ b/lib/outboxer/messages.rb @@ -22,24 +22,28 @@ def counts_by_status end end - def queue!(limit: 1, order: :asc) + def queue!(limit: 1) ActiveRecord::Base.connection_pool.with_connection do ids = [] ActiveRecord::Base.transaction do ids = Models::Message .where(status: Models::Message::Status::BACKLOGGED) - .order(created_at: order) + .order(updated_at: :asc) .lock('FOR UPDATE SKIP LOCKED') .limit(limit) .pluck(:id) - Models::Message.where(id: ids).update_all(status: Models::Message::Status::PUBLISHING) + Models::Message + .where(id: ids) + .update_all( + updated_at: Time.current, + status: Models::Message::Status::PUBLISHING) end Models::Message .where(id: ids, status: Models::Message::Status::PUBLISHING) - .order(created_at: order) + .order(updated_at: :asc) .to_a end end diff --git a/spec/lib/outboxer/messages/queue_spec.rb b/spec/lib/outboxer/messages/queue_spec.rb new file mode 100644 index 00000000..e7ee51df --- /dev/null +++ b/spec/lib/outboxer/messages/queue_spec.rb @@ -0,0 +1,36 @@ +require 'spec_helper' + +module Outboxer + RSpec.describe Messages do + describe '.queue!' do + context 'when there are 2 backlogged messages' do + let!(:backlogged_messages) do + [ + create(:outboxer_message, :backlogged, updated_at: 2.minutes.ago), + create(:outboxer_message, :backlogged, updated_at: 1.minute.ago) + ] + end + + context 'when limit is 1' do + let!(:publishing_messages) { Messages.queue!(limit: 1) } + + it 'returns first backlogged message' do + expect(publishing_messages.count).to eq(1) + + publishing_message = publishing_messages.first + expect(publishing_message.status).to eq(Models::Message::Status::PUBLISHING) + end + + it 'keeps last backlogged message' do + remaining_messages = Models::Message.where(status: Models::Message::Status::BACKLOGGED) + + expect(remaining_messages.count).to eq(1) + + remaining_message = remaining_messages.last + expect(remaining_message).to eq(backlogged_messages.last) + end + end + end + end + end +end diff --git a/spec/lib/outboxer/messages/unpublished_spec.rb b/spec/lib/outboxer/messages/unpublished_spec.rb deleted file mode 100644 index 465cf248..00000000 --- a/spec/lib/outboxer/messages/unpublished_spec.rb +++ /dev/null @@ -1,59 +0,0 @@ -require 'spec_helper' - -module Outboxer - RSpec.describe Messages do - describe '.backlogged!' do - context 'when there are 2 backlogged messages' do - let!(:backlogged_messages) do - [ - create(:outboxer_message, :backlogged, created_at: 2.minutes.ago), - create(:outboxer_message, :backlogged, created_at: 1.minute.ago) - ] - end - - context 'when order asc' do - context 'when limit is 1' do - let!(:publishing_messages) { Messages.queue!(limit: 1) } - - it 'returns first backlogged message' do - expect(publishing_messages.count).to eq(1) - - publishing_message = publishing_messages.first - expect(publishing_message.status).to eq(Models::Message::Status::PUBLISHING) - end - - it 'keeps last backlogged message' do - remaining_messages = Models::Message.where(status: Models::Message::Status::BACKLOGGED) - - expect(remaining_messages.count).to eq(1) - - remaining_message = remaining_messages.last - expect(remaining_message).to eq(backlogged_messages.last) - end - end - end - - context 'when order desc' do - context 'when limit is 1' do - let!(:publishing_messages) { Messages.queue!(limit: 1, order: :desc) } - - it 'returns first backlogged message' do - expect(publishing_messages.count).to eq(1) - - publishing_message = publishing_messages.last - expect(publishing_message).to eq(backlogged_messages.last) - end - - it 'keeps first backlogged message' do - remaining_messages = Models::Message.where(status: Models::Message::Status::BACKLOGGED) - expect(remaining_messages.count).to eq(1) - - remaining_message = remaining_messages.first - expect(remaining_message).to eq(backlogged_messages.first) - end - end - end - end - end - end -end From e1bca18b51f0ef73aa1dc73c2c7d8e994f6c3e98 Mon Sep 17 00:00:00 2001 From: bedrock-adam Date: Sun, 31 Mar 2024 18:33:09 +1100 Subject: [PATCH 4/6] finish refactoring --- README.md | 8 +++--- lib/outboxer.rb | 1 - lib/outboxer/message.rb | 12 +++++++++ lib/outboxer/messageable.rb | 30 ----------------------- spec/lib/outboxer/message/backlog_spec.rb | 15 ++++++++++++ 5 files changed, 32 insertions(+), 34 deletions(-) delete mode 100644 lib/outboxer/messageable.rb create mode 100644 spec/lib/outboxer/message/backlog_spec.rb diff --git a/README.md b/README.md index a1c3b1fa..55021a9e 100644 --- a/README.md +++ b/README.md @@ -37,13 +37,15 @@ bin/rails g outboxer:schema bin/rake db:migrate ``` -### include messageable into your event model +### create an outboxer message in the same transaction as the event record ```ruby class Event < ActiveRecord::Base - include Outboxer::Messageable - # ... + + after_create do |event| + Outboxer::Message.backlog!(messageable_type: event.class.name, messageable_id: event.id) + end end ``` diff --git a/lib/outboxer.rb b/lib/outboxer.rb index d3c1562f..1e247bd8 100644 --- a/lib/outboxer.rb +++ b/lib/outboxer.rb @@ -13,7 +13,6 @@ require_relative "outboxer/message" require_relative "outboxer/messages" -require_relative "outboxer/messageable" require_relative "outboxer/database" require_relative "outboxer/publisher" diff --git a/lib/outboxer/message.rb b/lib/outboxer/message.rb index e343f1d6..06e11d11 100644 --- a/lib/outboxer/message.rb +++ b/lib/outboxer/message.rb @@ -9,6 +9,18 @@ class Error < Outboxer::Error; end; class NotFound < Error; end class InvalidTransition < Error; end + def backlog!(messageable_type:, messageable_id:) + ActiveRecord::Base.connection_pool.with_connection do + ActiveRecord::Base.transaction do + message = Models::Message.create!( + messageable_id: messageable_id, + messageable_type: messageable_type) + + { 'id' => message.id } + end + end + end + def find_by_id!(id:) ActiveRecord::Base.connection_pool.with_connection do ActiveRecord::Base.transaction do diff --git a/lib/outboxer/messageable.rb b/lib/outboxer/messageable.rb deleted file mode 100644 index 1e1ad39e..00000000 --- a/lib/outboxer/messageable.rb +++ /dev/null @@ -1,30 +0,0 @@ -module Outboxer - module Messageable - # The Messageable module, when included in a class, provides the ability to associate - # that class with an Outboxer message. This association is set up through a series - # of class methods defined in the included ClassMethods module. - - def self.included(base) - base.extend ClassMethods - - base.has_outboxer_message - end - - module ClassMethods - def has_outboxer_message( - dependent: nil, - class_name: 'Outboxer::Models::Message', - as: :messageable - ) - has_one :outboxer_message, - dependent: dependent, - class_name: class_name, - as: as - - after_create :create_outboxer_message! - - # after_create { |event| Outboxer::Message.backlog(messageable: event) } - end - end - end -end diff --git a/spec/lib/outboxer/message/backlog_spec.rb b/spec/lib/outboxer/message/backlog_spec.rb new file mode 100644 index 00000000..8347b271 --- /dev/null +++ b/spec/lib/outboxer/message/backlog_spec.rb @@ -0,0 +1,15 @@ +require 'spec_helper' + +module Outboxer + RSpec.describe Message do + describe '.backlog!' do + let!(:backlogged_message) do + Message.backlog!(messageable_type: 'Event', messageable_id: '1') + end + + it 'returns backlogged message' do + expect(backlogged_message['id']).to eq(Models::Message.backlogged.first.id) + end + end + end +end From bd4a27ab2c8d9f9805769dbba640cd9aae31f3a7 Mon Sep 17 00:00:00 2001 From: bedrock-adam Date: Sun, 31 Mar 2024 18:37:11 +1100 Subject: [PATCH 5/6] set status explicitly --- lib/outboxer/message.rb | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/outboxer/message.rb b/lib/outboxer/message.rb index 06e11d11..987002e2 100644 --- a/lib/outboxer/message.rb +++ b/lib/outboxer/message.rb @@ -14,7 +14,8 @@ def backlog!(messageable_type:, messageable_id:) ActiveRecord::Base.transaction do message = Models::Message.create!( messageable_id: messageable_id, - messageable_type: messageable_type) + messageable_type: messageable_type, + status: Models::Message::Status::BACKLOGGED) { 'id' => message.id } end From 2d921225d51a2352689fd5e24c7bf59c77f37a7c Mon Sep 17 00:00:00 2001 From: bedrock-adam Date: Sun, 31 Mar 2024 18:39:07 +1100 Subject: [PATCH 6/6] remove overqualified namespace resolution --- lib/outboxer/publisher.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/outboxer/publisher.rb b/lib/outboxer/publisher.rb index aa53efc8..f9e2e9b2 100644 --- a/lib/outboxer/publisher.rb +++ b/lib/outboxer/publisher.rb @@ -41,7 +41,7 @@ def push_messages!(threads:, queue:, queue_size:, poll_interval:, logger:, kerne messages = [] queue_remaining = queue_size - queue.length - messages = (queue_remaining > 0) ? Outboxer::Messages.queue!(limit: queue_remaining) : [] + messages = (queue_remaining > 0) ? Messages.queue!(limit: queue_remaining) : [] if messages.empty? debug_log("Sleeping for #{poll_interval} seconds because there are no messages",