Skip to content

Commit

Permalink
complete implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
bedrock-adam committed Mar 16, 2024
1 parent 653a2ef commit c46ce35
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 0 deletions.
26 changes: 26 additions & 0 deletions lib/outboxer/messages.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,32 @@ def unpublished!(limit: 1, order: :asc)
end
end

def republish_all_failed!
ActiveRecord::Base.connection_pool.with_connection do
Models::Message
.where(status: Models::Message::Status::FAILED)
.order(updated_at: :asc)
.pluck(:id)
.each_slice(1000) { |message_ids| republish_batch_failed!(message_ids) }
end
end

def republish_batch_failed!(message_ids)
ActiveRecord::Base.transaction do
message_ids.each do |message_id|
message = Models::Message.lock.find(message_id)

if message.status != Models::Message::Status::FAILED
raise InvalidTransition,
"cannot transition outboxer message #{message.id} " \
"from #{message.status} to #{Models::Message::Status::UNPUBLISHED}"
end

message.update!(status: Models::Message::Status::UNPUBLISHED)
end
end
end

def delete_all!
ActiveRecord::Base.connection_pool.with_connection do
ActiveRecord::Base.transaction do
Expand Down
40 changes: 40 additions & 0 deletions spec/lib/outboxer/messages/republish_all_failed_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
require 'spec_helper'

module Outboxer
RSpec.describe Messages do
describe '.republish_all_failed!' do
let!(:message_1) { create(:outboxer_message, :unpublished) }
let!(:exception_1) { create(:outboxer_exception, message: message_1) }
let!(:frame_1) { create(:outboxer_frame, exception: exception_1) }

let!(:message_2) { create(:outboxer_message, :publishing) }
let!(:exception_2) { create(:outboxer_exception, message: message_2) }
let!(:frame_2) { create(:outboxer_frame, exception: exception_2) }

let!(:message_3) { create(:outboxer_message, :failed) }
let!(:exception_3) { create(:outboxer_exception, message: message_3) }
let!(:frame_3) { create(:outboxer_frame, exception: exception_3) }

let!(:message_4) { create(:outboxer_message, :failed) }
let!(:exception_4) { create(:outboxer_exception, message: message_4) }
let!(:frame_4) { create(:outboxer_frame, exception: exception_4) }

let!(:result) { Messages.republish_all_failed! }

it 'sets failed messages to unpublished' do
expect(
Models::Message.where(
id: [message_1, message_3.id, message_4.id],
status: Models::Message::Status::UNPUBLISHED
).count
).to eq(3)
end

it 'does not change messages that are publishing' do
expect(
Models::Message.where(id: [message_2], status: Models::Message::Status::PUBLISHING).count
).to eq(1)
end
end
end
end

0 comments on commit c46ce35

Please sign in to comment.