Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add Message.backlog! (and Message.queue!) #30

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@ bin/rails g outboxer:schema
bin/rake db:migrate
```

### include messageable into your event model
### create an outboxer message in the same transaction as the event record

```ruby
class Event < ActiveRecord::Base
include Outboxer::Messageable

# ...

after_create do |event|
Outboxer::Message.backlog!(messageable_type: event.class.name, messageable_id: event.id)
end
end
```

Expand Down
6 changes: 3 additions & 3 deletions bin/sidekiq_publishermon
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ end
loop do
system("clear") || system("cls")

total_unpublished = nil
total_backlogged = nil
total_publishing = nil
total_failed = nil

ActiveRecord::Base.connection_pool.with_connection do
total_unpublished = Outboxer::Models::Message.unpublished.count
total_backlogged = Outboxer::Models::Message.backlogged.count
total_publishing = Outboxer::Models::Message.publishing.count
total_failed = Outboxer::Models::Message.failed.count
end
Expand All @@ -43,7 +43,7 @@ loop do
puts "Outboxer Report"
puts Time.now.utc
puts "\n---- Overview ----"
puts format("%-15s: %-10s", "Unpublished", total_unpublished)
puts format("%-15s: %-10s", "Backlogged", total_backlogged)
puts format("%-15s: %-10s", "Publishing", total_publishing)
puts format("%-15s: %-10s", "Failed", total_failed)

Expand Down
2 changes: 1 addition & 1 deletion db/seeds.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
Outboxer::Models::Message.create!(
messageable_type: 'Event',
messageable_id: i,
status: Outboxer::Models::Message::Status::UNPUBLISHED)
status: Outboxer::Models::Message::Status::BACKLOGGED)
when 1
Outboxer::Models::Message.create!(
messageable_type: 'Event',
Expand Down
1 change: 0 additions & 1 deletion lib/outboxer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

require_relative "outboxer/message"
require_relative "outboxer/messages"
require_relative "outboxer/messageable"

require_relative "outboxer/database"
require_relative "outboxer/publisher"
Expand Down
17 changes: 15 additions & 2 deletions lib/outboxer/message.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,19 @@ class Error < Outboxer::Error; end;
class NotFound < Error; end
class InvalidTransition < Error; end

def backlog!(messageable_type:, messageable_id:)
ActiveRecord::Base.connection_pool.with_connection do
ActiveRecord::Base.transaction do
message = Models::Message.create!(
messageable_id: messageable_id,
messageable_type: messageable_type,
status: Models::Message::Status::BACKLOGGED)

{ 'id' => message.id }
end
end
end

def find_by_id!(id:)
ActiveRecord::Base.connection_pool.with_connection do
ActiveRecord::Base.transaction do
Expand Down Expand Up @@ -109,10 +122,10 @@ def republish!(id:)
if message.status != Models::Message::Status::FAILED
raise InvalidTransition,
"cannot transition outboxer message #{id} " \
"from #{message.status} to #{Models::Message::Status::UNPUBLISHED}"
"from #{message.status} to #{Models::Message::Status::BACKLOGGED}"
end

message.update!(status: Models::Message::Status::UNPUBLISHED)
message.update!(status: Models::Message::Status::BACKLOGGED)

{ 'id' => id }
end
Expand Down
28 changes: 0 additions & 28 deletions lib/outboxer/messageable.rb

This file was deleted.

18 changes: 11 additions & 7 deletions lib/outboxer/messages.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,28 @@ def counts_by_status
end
end

def unpublished!(limit: 1, order: :asc)
def queue!(limit: 1)
ActiveRecord::Base.connection_pool.with_connection do
ids = []

ActiveRecord::Base.transaction do
ids = Models::Message
.where(status: Models::Message::Status::UNPUBLISHED)
.order(created_at: order)
.where(status: Models::Message::Status::BACKLOGGED)
.order(updated_at: :asc)
.lock('FOR UPDATE SKIP LOCKED')
.limit(limit)
.pluck(:id)

Models::Message.where(id: ids).update_all(status: Models::Message::Status::PUBLISHING)
Models::Message
.where(id: ids)
.update_all(
updated_at: Time.current,
status: Models::Message::Status::PUBLISHING)
end

Models::Message
.where(id: ids, status: Models::Message::Status::PUBLISHING)
.order(created_at: order)
.order(updated_at: :asc)
.to_a
end
end
Expand Down Expand Up @@ -109,7 +113,7 @@ def republish_all!(batch_size: 100)
.pluck(:id)

updated_count = Models::Message.where(id: locked_ids).update_all(
status: Models::Message::Status::UNPUBLISHED, updated_at: DateTime.now.utc)
status: Models::Message::Status::BACKLOGGED, updated_at: DateTime.now.utc)
end

updated_total_count += updated_count
Expand Down Expand Up @@ -138,7 +142,7 @@ def republish_selected!(ids:)
end

updated_count = Models::Message.where(id: locked_ids).update_all(
status: Models::Message::Status::UNPUBLISHED, updated_at: DateTime.now.utc)
status: Models::Message::Status::BACKLOGGED, updated_at: DateTime.now.utc)
end
end

Expand Down
8 changes: 4 additions & 4 deletions lib/outboxer/models/message.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,18 @@ class Message < ::ActiveRecord::Base
self.table_name = :outboxer_messages

module Status
UNPUBLISHED = 'unpublished'
BACKLOGGED = 'backlogged'
PUBLISHING = 'publishing'
FAILED = 'failed'
end

STATUSES = [Status::UNPUBLISHED, Status::PUBLISHING, Status::FAILED]
STATUSES = [Status::BACKLOGGED, Status::PUBLISHING, Status::FAILED]

scope :unpublished, -> { where(status: Status::UNPUBLISHED) }
scope :backlogged, -> { where(status: Status::BACKLOGGED) }
scope :publishing, -> { where(status: Status::PUBLISHING) }
scope :failed, -> { where(status: Status::FAILED) }

attribute :status, default: -> { Status::UNPUBLISHED }
attribute :status, default: -> { Status::BACKLOGGED }
validates :status, inclusion: { in: STATUSES }, length: { maximum: 255 }

belongs_to :messageable, polymorphic: true
Expand Down
2 changes: 1 addition & 1 deletion lib/outboxer/publisher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def push_messages!(threads:, queue:, queue_size:, poll_interval:, logger:, kerne
messages = []

queue_remaining = queue_size - queue.length
messages = (queue_remaining > 0) ? Outboxer::Messages.unpublished!(limit: queue_remaining) : []
messages = (queue_remaining > 0) ? Messages.queue!(limit: queue_remaining) : []

if messages.empty?
debug_log("Sleeping for #{poll_interval} seconds because there are no messages",
Expand Down
4 changes: 2 additions & 2 deletions spec/factories/outboxer_messages.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
status { Outboxer::Models::Message::Status::PUBLISHING }
created_at { 1.day.ago }

trait :unpublished do
status { Outboxer::Models::Message::Status::UNPUBLISHED }
trait :backlogged do
status { Outboxer::Models::Message::Status::BACKLOGGED }
end

trait :publishing do
Expand Down
15 changes: 15 additions & 0 deletions spec/lib/outboxer/message/backlog_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
require 'spec_helper'

module Outboxer
RSpec.describe Message do
describe '.backlog!' do
let!(:backlogged_message) do
Message.backlog!(messageable_type: 'Event', messageable_id: '1')
end

it 'returns backlogged message' do
expect(backlogged_message['id']).to eq(Models::Message.backlogged.first.id)
end
end
end
end
16 changes: 8 additions & 8 deletions spec/lib/outboxer/message/failed_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,27 +49,27 @@ def raise_exception
end
end

context 'when unpublished messaged' do
let!(:unpublished_message) { create(:outboxer_message, :unpublished) }
context 'when backlogged message' do
let!(:backlogged_message) { create(:outboxer_message, :backlogged) }

it 'raises invalid transition error' do
expect do
Message.failed!(id: unpublished_message.id, exception: exception)
Message.failed!(id: backlogged_message.id, exception: exception)
end.to raise_error(
Message::InvalidTransition,
"cannot transition outboxer message #{unpublished_message.id} " +
"from unpublished to failed")
"cannot transition outboxer message #{backlogged_message.id} " +
"from backlogged to failed")
end

it 'does not delete unpublished message' do
it 'does not delete backlogged message' do
begin
Message.failed!(id: unpublished_message.id, exception: exception)
Message.failed!(id: backlogged_message.id, exception: exception)
rescue Message::InvalidTransition
# ignore
end

expect(Models::Message.count).to eq(1)
expect(Models::Message.first).to eq(unpublished_message)
expect(Models::Message.first).to eq(backlogged_message)
end
end
end
Expand Down
16 changes: 8 additions & 8 deletions spec/lib/outboxer/message/published_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,27 @@ module Outboxer
end
end

context 'when unpublished messaged' do
let(:unpublished_message) { create(:outboxer_message, :unpublished) }
context 'when backlogged messaged' do
let(:backlogged_message) { create(:outboxer_message, :backlogged) }

it 'raises invalid transition error' do
expect do
Message.published!(id: unpublished_message.id)
Message.published!(id: backlogged_message.id)
end.to raise_error(
Message::InvalidTransition,
"cannot transition message #{unpublished_message.id} " +
"from unpublished to (deleted)")
"cannot transition message #{backlogged_message.id} " +
"from backlogged to (deleted)")
end

it 'does not delete unpublished message' do
it 'does not delete backlogged message' do
begin
Message.published!(id: unpublished_message.id)
Message.published!(id: backlogged_message.id)
rescue Message::InvalidTransition
# ignore
end

expect(Models::Message.count).to eq(1)
expect(Models::Message.first).to eq(unpublished_message)
expect(Models::Message.first).to eq(backlogged_message)
end
end
end
Expand Down
26 changes: 13 additions & 13 deletions spec/lib/outboxer/message/republish_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,40 +6,40 @@ module Outboxer
context 'when failed message' do
let!(:failed_message) { create(:outboxer_message, :failed) }

let!(:unpublished_message) { Message.republish!(id: failed_message.id) }
let!(:backlogged_message) { Message.republish!(id: failed_message.id) }

it 'returns unpublished message' do
expect(unpublished_message['id']).to eq(failed_message.id)
it 'returns backlogged message' do
expect(backlogged_message['id']).to eq(failed_message.id)
end

it 'updates failed message status to unpublishied' do
it 'updates failed message status to backlogged' do
failed_message.reload

expect(failed_message.status).to eq(Models::Message::Status::UNPUBLISHED)
expect(failed_message.status).to eq(Models::Message::Status::BACKLOGGED)
end
end

context 'when unpublished messaged' do
let(:unpublished_message) { create(:outboxer_message, :unpublished) }
context 'when backlogged messaged' do
let(:backlogged_message) { create(:outboxer_message, :backlogged) }

it 'raises invalid transition error' do
expect do
Message.republish!(id: unpublished_message.id)
Message.republish!(id: backlogged_message.id)
end.to raise_error(
Message::InvalidTransition,
"cannot transition outboxer message #{unpublished_message.id} " +
"from unpublished to unpublished")
"cannot transition outboxer message #{backlogged_message.id} " +
"from backlogged to backlogged")
end

it 'does not delete unpublished message' do
it 'does not delete backlogged message' do
begin
Message.published!(id: unpublished_message.id)
Message.published!(id: backlogged_message.id)
rescue Message::InvalidTransition
# ignore
end

expect(Models::Message.count).to eq(1)
expect(Models::Message.first).to eq(unpublished_message)
expect(Models::Message.first).to eq(backlogged_message)
end
end
end
Expand Down
6 changes: 3 additions & 3 deletions spec/lib/outboxer/messages/counts_by_status_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,21 @@ module Outboxer
describe '.counts_by_status' do
context 'when no messages exist' do
it 'returns 0 for all statuses' do
expected_counts = { 'unpublished' => 0, 'publishing' => 0, 'failed' => 0 }
expected_counts = { 'backlogged' => 0, 'publishing' => 0, 'failed' => 0 }

expect(Outboxer::Messages.counts_by_status).to eq(expected_counts)
end
end

context 'when messages exist' do
before do
create_list(:outboxer_message, 2, :unpublished)
create_list(:outboxer_message, 2, :backlogged)
create_list(:outboxer_message, 3, :publishing)
create_list(:outboxer_message, 4, :failed)
end

it 'returns correct counts for each status' do
expected_counts = { 'unpublished' => 2, 'publishing' => 3, 'failed' => 4 }
expected_counts = { 'backlogged' => 2, 'publishing' => 3, 'failed' => 4 }

expect(Messages.counts_by_status).to eq(expected_counts)
end
Expand Down
Loading
Loading