Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix issues in existing services #24

Merged
merged 3 commits into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 56 additions & 46 deletions lib/outboxer/message.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,102 +10,112 @@ class NotFound < Error; end
class InvalidTransition < Error; end

def find_by_id!(id:)
message = Models::Message.includes(exceptions: :frames).find_by!(id: id)

{
'id' => message.id,
'status' => message.status,
'messageable' => "#{message.messageable_type}::#{message.messageable_id}",
'created_at' => message.created_at.utc.to_s,
'updated_at' => message.updated_at.utc.to_s,
'exceptions' => message.exceptions.map do |exception|
ActiveRecord::Base.connection_pool.with_connection do
ActiveRecord::Base.transaction do
message = Models::Message.includes(exceptions: :frames).find_by!(id: id)

{
'id' => exception.id,
'class_name' => exception.class_name,
'message_text' => exception.message_text,
'created_at' => exception.created_at.utc.to_s,
'frames' => exception.frames.map do |frame|
'id' => message.id,
'status' => message.status,
'messageable' => "#{message.messageable_type}::#{message.messageable_id}",
'created_at' => message.created_at.utc.to_s,
'updated_at' => message.updated_at.utc.to_s,
'exceptions' => message.exceptions.map do |exception|
{
'id' => frame.id,
'index' => frame.index,
'text' => frame.text
'id' => exception.id,
'class_name' => exception.class_name,
'message_text' => exception.message_text,
'created_at' => exception.created_at.utc.to_s,
'frames' => exception.frames.map do |frame|
{
'id' => frame.id,
'index' => frame.index,
'text' => frame.text
}
end
}
end
}
end
}
end
rescue ActiveRecord::RecordNotFound
raise NotFound, "Couldn't find Outboxer::Models::Message with id #{id}"
end

def published!(id:)
ActiveRecord::Base.connection_pool.with_connection do
outboxer_message = Models::Message.order(created_at: :asc).lock.find_by!(id: id)
ActiveRecord::Base.transaction do
message = Models::Message.lock.find_by!(id: id)

if outboxer_message.status != Models::Message::Status::PUBLISHING
raise InvalidTransition,
"cannot transition outboxer message #{outboxer_message.id} " \
"from #{outboxer_message.status} to (deleted)"
end
if message.status != Models::Message::Status::PUBLISHING
raise InvalidTransition,
"cannot transition message #{message.id} " \
"from #{message.status} to (deleted)"
end

outboxer_message.destroy!
message.exceptions.each { |exception| exception.frames.each(&:delete) }
message.exceptions.delete_all
message.delete

nil
{ 'id' => id }
end
end
end

def failed!(id:, exception:)
ActiveRecord::Base.connection_pool.with_connection do
ActiveRecord::Base.transaction do
outboxer_message = Models::Message.order(created_at: :asc).lock.find_by!(id: id)
message = Models::Message.order(created_at: :asc).lock.find_by!(id: id)

if outboxer_message.status != Models::Message::Status::PUBLISHING
if message.status != Models::Message::Status::PUBLISHING
raise InvalidTransition,
"cannot transition outboxer message #{id} " \
"from #{outboxer_message.status} to #{Models::Message::Status::FAILED}"
"from #{message.status} to #{Models::Message::Status::FAILED}"
end

outboxer_message.update!(status: Models::Message::Status::FAILED)
message.update!(status: Models::Message::Status::FAILED)

outboxer_exception = outboxer_message.exceptions.create!(
outboxer_exception = message.exceptions.create!(
class_name: exception.class.name, message_text: exception.message)

exception.backtrace.each_with_index do |frame, index|
outboxer_exception.frames.create!(index: index, text: frame)
end

outboxer_message
{ 'id' => id }
end
end
end

def delete!(id:)
ActiveRecord::Base.connection_pool.with_connection do
ActiveRecord::Base.transaction do
outboxer_message = Models::Message.includes(exceptions: :frames).find_by!(id: id)
message = Models::Message.includes(exceptions: :frames).lock.find_by!(id: id)

outboxer_message.exceptions.each { |exception| exception.frames.destroy_all }
outboxer_message.exceptions.destroy_all
outboxer_message.destroy
message.exceptions.each { |exception| exception.frames.each(&:delete) }
message.exceptions.delete_all
message.delete

{ 'id' => id }
end
end

{ 'id' => id }
end

def republish!(id:)
ActiveRecord::Base.connection_pool.with_connection do
outboxer_message = Models::Message.lock.find_by!(id: id)
ActiveRecord::Base.transaction do
message = Models::Message.lock.find_by!(id: id)

if outboxer_message.status != Models::Message::Status::FAILED
raise InvalidTransition,
"cannot transition outboxer message #{id} " \
"from #{outboxer_message.status} to #{Models::Message::Status::UNPUBLISHED}"
end
if message.status != Models::Message::Status::FAILED
raise InvalidTransition,
"cannot transition outboxer message #{id} " \
"from #{message.status} to #{Models::Message::Status::UNPUBLISHED}"
end

outboxer_message.update!(status: Models::Message::Status::UNPUBLISHED)
message.update!(status: Models::Message::Status::UNPUBLISHED)

outboxer_message
{ 'id' => id }
end
end
end
end
Expand Down
138 changes: 104 additions & 34 deletions lib/outboxer/messages.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,77 +9,147 @@ class Error < Outboxer::Error; end;
class InvalidTransition < Error; end

def counts_by_status
status_counts = Models::Message::STATUSES.each_with_object({}) do |status, hash|
hash[status.to_s] = 0
end
ActiveRecord::Base.connection_pool.with_connection do
status_counts = Models::Message::STATUSES.each_with_object({}) do |status, hash|
hash[status.to_s] = 0
end

Models::Message.group(:status).count.each do |status, count|
status_counts[status.to_s] = count
end
Models::Message.group(:status).count.each do |status, count|
status_counts[status.to_s] = count
end

status_counts
status_counts
end
end

def unpublished!(limit: 1, order: :asc)
ActiveRecord::Base.connection_pool.with_connection do
message_ids = ActiveRecord::Base.transaction do
ids = []

ActiveRecord::Base.transaction do
ids = Models::Message
.where(status: Models::Message::Status::UNPUBLISHED)
.order(created_at: order)
.lock('FOR UPDATE SKIP LOCKED')
.limit(limit)
.pluck(:id)

if !ids.empty?
Models::Message.where(id: ids).update_all(status: Models::Message::Status::PUBLISHING)
end

ids
Models::Message.where(id: ids).update_all(status: Models::Message::Status::PUBLISHING)
end

Models::Message
.where(id: message_ids, status: Models::Message::Status::PUBLISHING)
.where(id: ids, status: Models::Message::Status::PUBLISHING)
.order(created_at: order)
.to_a
end
end

def republish_all_failed!
def republish_all!(batch_size: 100)
updated_total_count = 0

ActiveRecord::Base.connection_pool.with_connection do
Models::Message
.where(status: Models::Message::Status::FAILED)
.order(updated_at: :asc)
.pluck(:id)
.each_slice(1000) { |message_ids| republish_batch_failed!(message_ids) }
updated_count = 0

loop do
ActiveRecord::Base.transaction do
locked_ids = Models::Message
.where(status: Models::Message::Status::FAILED)
.order(updated_at: :asc)
.limit(batch_size)
.lock('FOR UPDATE')
.pluck(:id)

updated_count = Models::Message.where(id: locked_ids).update_all(
status: Models::Message::Status::UNPUBLISHED, updated_at: DateTime.now.utc)
end

updated_total_count += updated_count

break if updated_count < batch_size
end
end

{ 'count' => updated_total_count }
end

def republish_selected!(ids:)
updated_count = 0

ActiveRecord::Base.connection_pool.with_connection do
ActiveRecord::Base.transaction do
locked_ids = Models::Message
.where(id: ids, status: Models::Message::Status::FAILED)
.order(updated_at: :asc)
.lock('FOR UPDATE')
.pluck(:id)

missing_ids = ids - locked_ids
if missing_ids.any?
raise NotFound, "Some IDs could not be found: #{missing_ids.join(', ')}"
end

updated_count = Models::Message.where(id: locked_ids).update_all(
status: Models::Message::Status::UNPUBLISHED, updated_at: DateTime.now.utc)
end
end

{ 'count' => updated_count }
end

def republish_batch_failed!(message_ids)
ActiveRecord::Base.transaction do
message_ids.each do |message_id|
message = Models::Message.lock.find(message_id)
def delete_all!(batch_size: 100)
deleted_total_count = 0

ActiveRecord::Base.connection_pool.with_connection do
loop do
deleted_count = 0

ActiveRecord::Base.transaction do
locked_ids = Models::Message
.order(:updated_at).limit(batch_size).lock('FOR UPDATE').pluck(:id)

Models::Frame
.joins(:exception)
.where(exception: { message_id: locked_ids })
.delete_all

Models::Exception.where(message_id: locked_ids).delete_all

if message.status != Models::Message::Status::FAILED
raise InvalidTransition,
"cannot transition outboxer message #{message.id} " \
"from #{message.status} to #{Models::Message::Status::UNPUBLISHED}"
deleted_count = Models::Message.where(id: locked_ids).delete_all
end

message.update!(status: Models::Message::Status::UNPUBLISHED)
deleted_total_count += deleted_count

break if deleted_count < batch_size
end
end

{ 'count' => deleted_total_count }
end

def delete_all!
def delete_selected!(ids:)
deleted_count = 0

ActiveRecord::Base.connection_pool.with_connection do
ActiveRecord::Base.transaction do
Models::Frame.delete_all
Models::Exception.delete_all
Models::Message.delete_all
locked_ids = Models::Message.where(id: ids).lock('FOR UPDATE').pluck(:id)

missing_ids = ids - locked_ids
if missing_ids.any?
raise NotFound, "Some IDs could not be found: #{missing_ids.join(', ')}"
end

Models::Frame
.joins(:exception)
.where(exception: { message_id: locked_ids })
.delete_all

Models::Exception.where(message_id: locked_ids).delete_all

deleted_count = Models::Message.where(id: locked_ids).delete_all
end
end

nil
{ 'count' => deleted_count }
end
end
end
2 changes: 2 additions & 0 deletions lib/outboxer/models.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
require "active_record"

require_relative "not_found"

require_relative "models/frame"
require_relative "models/exception"
require_relative "models/message"
4 changes: 4 additions & 0 deletions lib/outboxer/not_found.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
module Outboxer
class NotFound < Error
end
end
3 changes: 1 addition & 2 deletions spec/lib/outboxer/message/failed_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ def raise_exception
let!(:failed_message) { Message.failed!(id: publishing_message.id, exception: exception) }

it 'returns updated message' do
expect(failed_message.id).to eq(publishing_message.id)
expect(failed_message.status).to eq(Models::Message::Status::FAILED)
expect(failed_message['id']).to eq(publishing_message.id)
end

it 'updates message status to failed' do
Expand Down
4 changes: 2 additions & 2 deletions spec/lib/outboxer/message/published_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ module Outboxer
let!(:published_message) { Message.published!(id: publishing_message.id) }

it 'returns nil' do
expect(published_message).to be_nil
expect(published_message).to eq({ 'id' => publishing_message.id })
end

it 'deletes publishing message' do
Expand All @@ -25,7 +25,7 @@ module Outboxer
Message.published!(id: unpublished_message.id)
end.to raise_error(
Message::InvalidTransition,
"cannot transition outboxer message #{unpublished_message.id} " +
"cannot transition message #{unpublished_message.id} " +
"from unpublished to (deleted)")
end

Expand Down
3 changes: 1 addition & 2 deletions spec/lib/outboxer/message/republish_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ module Outboxer
let!(:unpublished_message) { Message.republish!(id: failed_message.id) }

it 'returns unpublished message' do
expect(unpublished_message.id).to eq(failed_message.id)
expect(unpublished_message.status).to eq(Models::Message::Status::UNPUBLISHED)
expect(unpublished_message['id']).to eq(failed_message.id)
end

it 'updates failed message status to unpublishied' do
Expand Down
Loading
Loading