Skip to content

Commit

Permalink
add Message.backlog! (and Message.queue!) (#30)
Browse files Browse the repository at this point in the history
  • Loading branch information
bedrock-adam authored Mar 31, 2024
1 parent 0ca062e commit 7d4e412
Show file tree
Hide file tree
Showing 26 changed files with 153 additions and 171 deletions.
8 changes: 5 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@ bin/rails g outboxer:schema
bin/rake db:migrate
```

### include messageable into your event model
### create an outboxer message in the same transaction as the event record

```ruby
class Event < ActiveRecord::Base
include Outboxer::Messageable

# ...

after_create do |event|
Outboxer::Message.backlog!(messageable_type: event.class.name, messageable_id: event.id)
end
end
```

Expand Down
6 changes: 3 additions & 3 deletions bin/sidekiq_publishermon
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ end
loop do
system("clear") || system("cls")

total_unpublished = nil
total_backlogged = nil
total_publishing = nil
total_failed = nil

ActiveRecord::Base.connection_pool.with_connection do
total_unpublished = Outboxer::Models::Message.unpublished.count
total_backlogged = Outboxer::Models::Message.backlogged.count
total_publishing = Outboxer::Models::Message.publishing.count
total_failed = Outboxer::Models::Message.failed.count
end
Expand All @@ -43,7 +43,7 @@ loop do
puts "Outboxer Report"
puts Time.now.utc
puts "\n---- Overview ----"
puts format("%-15s: %-10s", "Unpublished", total_unpublished)
puts format("%-15s: %-10s", "Backlogged", total_backlogged)
puts format("%-15s: %-10s", "Publishing", total_publishing)
puts format("%-15s: %-10s", "Failed", total_failed)

Expand Down
2 changes: 1 addition & 1 deletion db/seeds.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
Outboxer::Models::Message.create!(
messageable_type: 'Event',
messageable_id: i,
status: Outboxer::Models::Message::Status::UNPUBLISHED)
status: Outboxer::Models::Message::Status::BACKLOGGED)
when 1
Outboxer::Models::Message.create!(
messageable_type: 'Event',
Expand Down
1 change: 0 additions & 1 deletion lib/outboxer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

require_relative "outboxer/message"
require_relative "outboxer/messages"
require_relative "outboxer/messageable"

require_relative "outboxer/database"
require_relative "outboxer/publisher"
Expand Down
17 changes: 15 additions & 2 deletions lib/outboxer/message.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,19 @@ class Error < Outboxer::Error; end;
class NotFound < Error; end
class InvalidTransition < Error; end

def backlog!(messageable_type:, messageable_id:)
ActiveRecord::Base.connection_pool.with_connection do
ActiveRecord::Base.transaction do
message = Models::Message.create!(
messageable_id: messageable_id,
messageable_type: messageable_type,
status: Models::Message::Status::BACKLOGGED)

{ 'id' => message.id }
end
end
end

def find_by_id!(id:)
ActiveRecord::Base.connection_pool.with_connection do
ActiveRecord::Base.transaction do
Expand Down Expand Up @@ -109,10 +122,10 @@ def republish!(id:)
if message.status != Models::Message::Status::FAILED
raise InvalidTransition,
"cannot transition outboxer message #{id} " \
"from #{message.status} to #{Models::Message::Status::UNPUBLISHED}"
"from #{message.status} to #{Models::Message::Status::BACKLOGGED}"
end

message.update!(status: Models::Message::Status::UNPUBLISHED)
message.update!(status: Models::Message::Status::BACKLOGGED)

{ 'id' => id }
end
Expand Down
28 changes: 0 additions & 28 deletions lib/outboxer/messageable.rb

This file was deleted.

18 changes: 11 additions & 7 deletions lib/outboxer/messages.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,28 @@ def counts_by_status
end
end

def unpublished!(limit: 1, order: :asc)
def queue!(limit: 1)
ActiveRecord::Base.connection_pool.with_connection do
ids = []

ActiveRecord::Base.transaction do
ids = Models::Message
.where(status: Models::Message::Status::UNPUBLISHED)
.order(created_at: order)
.where(status: Models::Message::Status::BACKLOGGED)
.order(updated_at: :asc)
.lock('FOR UPDATE SKIP LOCKED')
.limit(limit)
.pluck(:id)

Models::Message.where(id: ids).update_all(status: Models::Message::Status::PUBLISHING)
Models::Message
.where(id: ids)
.update_all(
updated_at: Time.current,
status: Models::Message::Status::PUBLISHING)
end

Models::Message
.where(id: ids, status: Models::Message::Status::PUBLISHING)
.order(created_at: order)
.order(updated_at: :asc)
.to_a
end
end
Expand Down Expand Up @@ -109,7 +113,7 @@ def republish_all!(batch_size: 100)
.pluck(:id)

updated_count = Models::Message.where(id: locked_ids).update_all(
status: Models::Message::Status::UNPUBLISHED, updated_at: DateTime.now.utc)
status: Models::Message::Status::BACKLOGGED, updated_at: DateTime.now.utc)
end

updated_total_count += updated_count
Expand Down Expand Up @@ -138,7 +142,7 @@ def republish_selected!(ids:)
end

updated_count = Models::Message.where(id: locked_ids).update_all(
status: Models::Message::Status::UNPUBLISHED, updated_at: DateTime.now.utc)
status: Models::Message::Status::BACKLOGGED, updated_at: DateTime.now.utc)
end
end

Expand Down
8 changes: 4 additions & 4 deletions lib/outboxer/models/message.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,18 @@ class Message < ::ActiveRecord::Base
self.table_name = :outboxer_messages

module Status
UNPUBLISHED = 'unpublished'
BACKLOGGED = 'backlogged'
PUBLISHING = 'publishing'
FAILED = 'failed'
end

STATUSES = [Status::UNPUBLISHED, Status::PUBLISHING, Status::FAILED]
STATUSES = [Status::BACKLOGGED, Status::PUBLISHING, Status::FAILED]

scope :unpublished, -> { where(status: Status::UNPUBLISHED) }
scope :backlogged, -> { where(status: Status::BACKLOGGED) }
scope :publishing, -> { where(status: Status::PUBLISHING) }
scope :failed, -> { where(status: Status::FAILED) }

attribute :status, default: -> { Status::UNPUBLISHED }
attribute :status, default: -> { Status::BACKLOGGED }
validates :status, inclusion: { in: STATUSES }, length: { maximum: 255 }

belongs_to :messageable, polymorphic: true
Expand Down
2 changes: 1 addition & 1 deletion lib/outboxer/publisher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def push_messages!(threads:, queue:, queue_size:, poll_interval:, logger:, kerne
messages = []

queue_remaining = queue_size - queue.length
messages = (queue_remaining > 0) ? Outboxer::Messages.unpublished!(limit: queue_remaining) : []
messages = (queue_remaining > 0) ? Messages.queue!(limit: queue_remaining) : []

if messages.empty?
debug_log("Sleeping for #{poll_interval} seconds because there are no messages",
Expand Down
4 changes: 2 additions & 2 deletions spec/factories/outboxer_messages.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
status { Outboxer::Models::Message::Status::PUBLISHING }
created_at { 1.day.ago }

trait :unpublished do
status { Outboxer::Models::Message::Status::UNPUBLISHED }
trait :backlogged do
status { Outboxer::Models::Message::Status::BACKLOGGED }
end

trait :publishing do
Expand Down
15 changes: 15 additions & 0 deletions spec/lib/outboxer/message/backlog_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
require 'spec_helper'

module Outboxer
RSpec.describe Message do
describe '.backlog!' do
let!(:backlogged_message) do
Message.backlog!(messageable_type: 'Event', messageable_id: '1')
end

it 'returns backlogged message' do
expect(backlogged_message['id']).to eq(Models::Message.backlogged.first.id)
end
end
end
end
16 changes: 8 additions & 8 deletions spec/lib/outboxer/message/failed_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,27 +49,27 @@ def raise_exception
end
end

context 'when unpublished messaged' do
let!(:unpublished_message) { create(:outboxer_message, :unpublished) }
context 'when backlogged message' do
let!(:backlogged_message) { create(:outboxer_message, :backlogged) }

it 'raises invalid transition error' do
expect do
Message.failed!(id: unpublished_message.id, exception: exception)
Message.failed!(id: backlogged_message.id, exception: exception)
end.to raise_error(
Message::InvalidTransition,
"cannot transition outboxer message #{unpublished_message.id} " +
"from unpublished to failed")
"cannot transition outboxer message #{backlogged_message.id} " +
"from backlogged to failed")
end

it 'does not delete unpublished message' do
it 'does not delete backlogged message' do
begin
Message.failed!(id: unpublished_message.id, exception: exception)
Message.failed!(id: backlogged_message.id, exception: exception)
rescue Message::InvalidTransition
# ignore
end

expect(Models::Message.count).to eq(1)
expect(Models::Message.first).to eq(unpublished_message)
expect(Models::Message.first).to eq(backlogged_message)
end
end
end
Expand Down
16 changes: 8 additions & 8 deletions spec/lib/outboxer/message/published_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,27 @@ module Outboxer
end
end

context 'when unpublished messaged' do
let(:unpublished_message) { create(:outboxer_message, :unpublished) }
context 'when backlogged messaged' do
let(:backlogged_message) { create(:outboxer_message, :backlogged) }

it 'raises invalid transition error' do
expect do
Message.published!(id: unpublished_message.id)
Message.published!(id: backlogged_message.id)
end.to raise_error(
Message::InvalidTransition,
"cannot transition message #{unpublished_message.id} " +
"from unpublished to (deleted)")
"cannot transition message #{backlogged_message.id} " +
"from backlogged to (deleted)")
end

it 'does not delete unpublished message' do
it 'does not delete backlogged message' do
begin
Message.published!(id: unpublished_message.id)
Message.published!(id: backlogged_message.id)
rescue Message::InvalidTransition
# ignore
end

expect(Models::Message.count).to eq(1)
expect(Models::Message.first).to eq(unpublished_message)
expect(Models::Message.first).to eq(backlogged_message)
end
end
end
Expand Down
26 changes: 13 additions & 13 deletions spec/lib/outboxer/message/republish_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,40 +6,40 @@ module Outboxer
context 'when failed message' do
let!(:failed_message) { create(:outboxer_message, :failed) }

let!(:unpublished_message) { Message.republish!(id: failed_message.id) }
let!(:backlogged_message) { Message.republish!(id: failed_message.id) }

it 'returns unpublished message' do
expect(unpublished_message['id']).to eq(failed_message.id)
it 'returns backlogged message' do
expect(backlogged_message['id']).to eq(failed_message.id)
end

it 'updates failed message status to unpublishied' do
it 'updates failed message status to backlogged' do
failed_message.reload

expect(failed_message.status).to eq(Models::Message::Status::UNPUBLISHED)
expect(failed_message.status).to eq(Models::Message::Status::BACKLOGGED)
end
end

context 'when unpublished messaged' do
let(:unpublished_message) { create(:outboxer_message, :unpublished) }
context 'when backlogged messaged' do
let(:backlogged_message) { create(:outboxer_message, :backlogged) }

it 'raises invalid transition error' do
expect do
Message.republish!(id: unpublished_message.id)
Message.republish!(id: backlogged_message.id)
end.to raise_error(
Message::InvalidTransition,
"cannot transition outboxer message #{unpublished_message.id} " +
"from unpublished to unpublished")
"cannot transition outboxer message #{backlogged_message.id} " +
"from backlogged to backlogged")
end

it 'does not delete unpublished message' do
it 'does not delete backlogged message' do
begin
Message.published!(id: unpublished_message.id)
Message.published!(id: backlogged_message.id)
rescue Message::InvalidTransition
# ignore
end

expect(Models::Message.count).to eq(1)
expect(Models::Message.first).to eq(unpublished_message)
expect(Models::Message.first).to eq(backlogged_message)
end
end
end
Expand Down
6 changes: 3 additions & 3 deletions spec/lib/outboxer/messages/counts_by_status_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,21 @@ module Outboxer
describe '.counts_by_status' do
context 'when no messages exist' do
it 'returns 0 for all statuses' do
expected_counts = { 'unpublished' => 0, 'publishing' => 0, 'failed' => 0 }
expected_counts = { 'backlogged' => 0, 'publishing' => 0, 'failed' => 0 }

expect(Outboxer::Messages.counts_by_status).to eq(expected_counts)
end
end

context 'when messages exist' do
before do
create_list(:outboxer_message, 2, :unpublished)
create_list(:outboxer_message, 2, :backlogged)
create_list(:outboxer_message, 3, :publishing)
create_list(:outboxer_message, 4, :failed)
end

it 'returns correct counts for each status' do
expected_counts = { 'unpublished' => 2, 'publishing' => 3, 'failed' => 4 }
expected_counts = { 'backlogged' => 2, 'publishing' => 3, 'failed' => 4 }

expect(Messages.counts_by_status).to eq(expected_counts)
end
Expand Down
Loading

0 comments on commit 7d4e412

Please sign in to comment.