Skip to content

Commit

Permalink
inject current utc time as dependency (#87)
Browse files Browse the repository at this point in the history
  • Loading branch information
bedrock-adam authored Aug 3, 2024
1 parent 02543d4 commit 758ac0f
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 19 deletions.
23 changes: 10 additions & 13 deletions lib/outboxer/message.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,14 @@ module Message
Status = Models::Message::Status

def queue(messageable: nil,
messageable_type: nil, messageable_id: nil)
current_time = Time.now.utc

messageable_type: nil, messageable_id: nil,
current_utc_time: Time.now.utc)
message = Models::Message.create!(
messageable_id: messageable&.id || messageable_id,
messageable_type: messageable&.class&.name || messageable_type,
status: Models::Message::Status::QUEUED,
created_at: current_time,
updated_at: current_time)
created_at: current_utc_time,
updated_at: current_utc_time)

{ id: message.id }
end
Expand Down Expand Up @@ -50,7 +49,7 @@ def find_by_id(id:)
end
end

def publishing(id:)
def publishing(id:, current_utc_time: Time.now.utc)
ActiveRecord::Base.connection_pool.with_connection do
ActiveRecord::Base.transaction do
message = Models::Message.lock.find_by!(id: id)
Expand All @@ -63,7 +62,7 @@ def publishing(id:)

message.update!(
status: Models::Message::Status::PUBLISHING,
updated_at: Time.now.utc)
updated_at: current_utc_time)

{
id: id,
Expand Down Expand Up @@ -95,7 +94,7 @@ def published(id:)
end
end

def failed(id:, exception:)
def failed(id:, exception:, current_utc_time: Time.now.utc)
ActiveRecord::Base.connection_pool.with_connection do
ActiveRecord::Base.transaction do
message = Models::Message.order(created_at: :asc).lock.find_by!(id: id)
Expand All @@ -108,7 +107,7 @@ def failed(id:, exception:)

message.update!(
status: Models::Message::Status::FAILED,
updated_at: Time.now.utc)
updated_at: current_utc_time)

outboxer_exception = message.exceptions.create!(
class_name: exception.class.name, message_text: exception.message)
Expand Down Expand Up @@ -136,14 +135,12 @@ def delete(id:)
end
end

def requeue(id:)
def requeue(id:, current_utc_time: Time.now.utc)
ActiveRecord::Base.connection_pool.with_connection do
ActiveRecord::Base.transaction do
message = Models::Message.lock.find_by!(id: id)

message.update!(
status: Models::Message::Status::QUEUED,
updated_at: Time.now.utc)
message.update!(status: Models::Message::Status::QUEUED, updated_at: current_utc_time)

{ id: id }
end
Expand Down
12 changes: 6 additions & 6 deletions lib/outboxer/messages.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def counts_by_status
end
end

def dequeue(limit: 1)
def dequeue(limit: 1, current_utc_time: Time.now.utc)
ActiveRecord::Base.connection_pool.with_connection do
ActiveRecord::Base.transaction do
messages = Models::Message
Expand All @@ -30,7 +30,7 @@ def dequeue(limit: 1)
if messages.present?
Models::Message
.where(id: messages.map { |message| message[:id] })
.update_all(updated_at: Time.current, status: Models::Message::Status::DEQUEUED)
.update_all(updated_at: current_utc_time, status: Models::Message::Status::DEQUEUED)
end

messages.map do |message|
Expand Down Expand Up @@ -119,7 +119,7 @@ def can_requeue_all?(status:)
REQUEUE_ALL_STATUSES.include?(status&.to_sym)
end

def requeue_all(status:, batch_size: 100)
def requeue_all(status:, batch_size: 100, time: Time)
if !can_requeue_all?(status: status)
status_formatted = status.nil? ? 'nil' : status

Expand All @@ -143,7 +143,7 @@ def requeue_all(status:, batch_size: 100)

requeued_count_batch = Models::Message
.where(id: locked_ids)
.update_all(status: Models::Message::Status::QUEUED, updated_at: Time.now.utc)
.update_all(status: Models::Message::Status::QUEUED, updated_at: time.now.utc)

requeued_count += requeued_count_batch
end
Expand All @@ -155,7 +155,7 @@ def requeue_all(status:, batch_size: 100)
{ requeued_count: requeued_count }
end

def requeue_by_ids(ids:)
def requeue_by_ids(ids:, time: Time)
ActiveRecord::Base.connection_pool.with_connection do
ActiveRecord::Base.transaction do
locked_ids = Models::Message
Expand All @@ -166,7 +166,7 @@ def requeue_by_ids(ids:)

requeued_count = Models::Message
.where(id: locked_ids)
.update_all(status: Models::Message::Status::QUEUED, updated_at: Time.now.utc)
.update_all(status: Models::Message::Status::QUEUED, updated_at: time.now.utc)

{ requeued_count: requeued_count, not_requeued_ids: ids - locked_ids }
end
Expand Down

0 comments on commit 758ac0f

Please sign in to comment.