From 0355276524ac14403e0932ecf7c6c1801b7cb7f4 Mon Sep 17 00:00:00 2001 From: Adam Mikulasev Date: Mon, 11 Nov 2024 21:43:54 +1100 Subject: [PATCH] rename dequeued status to buffered (#174) --- lib/outboxer/message.rb | 4 +-- lib/outboxer/messages.rb | 10 +++--- lib/outboxer/models/message.rb | 6 ++-- lib/outboxer/publisher.rb | 34 +++++++++---------- lib/outboxer/web/views/error.erb | 2 +- lib/outboxer/web/views/home.erb | 2 +- lib/outboxer/web/views/layout.erb | 2 +- spec/factories/outboxer_messages.rb | 4 +-- spec/lib/outboxer/message/can_requeue_spec.rb | 6 ++-- spec/lib/outboxer/message/publishing_spec.rb | 6 ++-- .../{dequeue_spec.rb => buffer_spec.rb} | 12 +++---- .../lib/outboxer/messages/can_requeue_spec.rb | 6 ++-- spec/lib/outboxer/messages/delete_all_spec.rb | 6 ++-- .../outboxer/messages/delete_by_ids_spec.rb | 2 +- .../lib/outboxer/messages/list/filter_spec.rb | 8 ++--- .../messages/list/no_arguments_spec.rb | 2 +- spec/lib/outboxer/messages/list/sort_spec.rb | 6 ++-- spec/lib/outboxer/messages/metrics_spec.rb | 16 ++++----- .../lib/outboxer/messages/requeue_all_spec.rb | 8 ++--- spec/lib/outboxer/publisher/publish_spec.rb | 10 +++--- 20 files changed, 76 insertions(+), 76 deletions(-) rename spec/lib/outboxer/messages/{dequeue_spec.rb => buffer_spec.rb} (71%) diff --git a/lib/outboxer/message.rb b/lib/outboxer/message.rb index 8a2a90ad..9dc87fca 100644 --- a/lib/outboxer/message.rb +++ b/lib/outboxer/message.rb @@ -59,7 +59,7 @@ def publishing(id:, publisher_id: nil, publisher_name: nil, ActiveRecord::Base.transaction do message = Models::Message.lock.find_by!(id: id) - if message.status != Models::Message::Status::DEQUEUED + if message.status != Models::Message::Status::BUFFERED raise ArgumentError, "cannot transition outboxer message #{message.id} " \ "from #{message.status} to #{Models::Message::Status::PUBLISHING}" @@ -159,7 +159,7 @@ def delete(id:) end end - REQUEUE_STATUSES = [:dequeued, :publishing, :failed] + REQUEUE_STATUSES = [:buffered, :publishing, :failed] def can_requeue?(status:) REQUEUE_STATUSES.include?(status&.to_sym) diff --git a/lib/outboxer/messages.rb b/lib/outboxer/messages.rb index 6072493f..6c67e4f0 100644 --- a/lib/outboxer/messages.rb +++ b/lib/outboxer/messages.rb @@ -2,8 +2,8 @@ module Outboxer module Messages extend self - def dequeue(limit: 1, publisher_id: nil, publisher_name: nil, - current_utc_time: Time.now.utc) + def buffer(limit: 1, publisher_id: nil, publisher_name: nil, + current_utc_time: Time.now.utc) ActiveRecord::Base.connection_pool.with_connection do ActiveRecord::Base.transaction do messages = Models::Message @@ -17,7 +17,7 @@ def dequeue(limit: 1, publisher_id: nil, publisher_name: nil, Models::Message .where(id: messages.map { |message| message[:id] }) .update_all( - status: Models::Message::Status::DEQUEUED, + status: Models::Message::Status::BUFFERED, updated_at: current_utc_time, updated_by_publisher_id: publisher_id, updated_by_publisher_name: publisher_name) @@ -35,7 +35,7 @@ def dequeue(limit: 1, publisher_id: nil, publisher_name: nil, end end - LIST_STATUS_OPTIONS = [nil, :queued, :dequeued, :publishing, :published, :failed] + LIST_STATUS_OPTIONS = [nil, :queued, :buffered, :publishing, :published, :failed] LIST_STATUS_DEFAULT = nil LIST_SORT_OPTIONS = [:id, :status, :messageable, :created_at, :updated_at, :updated_by_publisher_name] @@ -115,7 +115,7 @@ def list(status: LIST_STATUS_DEFAULT, } end - REQUEUE_STATUSES = [:dequeued, :publishing, :failed] + REQUEUE_STATUSES = [:buffered, :publishing, :failed] def can_requeue?(status:) REQUEUE_STATUSES.include?(status&.to_sym) diff --git a/lib/outboxer/models/message.rb b/lib/outboxer/models/message.rb index 9f549532..daeaca76 100644 --- a/lib/outboxer/models/message.rb +++ b/lib/outboxer/models/message.rb @@ -19,7 +19,7 @@ class Message < ::ActiveRecord::Base module Status QUEUED = 'queued' - DEQUEUED = 'dequeued' + BUFFERED = 'buffered' PUBLISHING = 'publishing' PUBLISHED = 'published' FAILED = 'failed' @@ -27,14 +27,14 @@ module Status STATUSES = [ Status::QUEUED, - Status::DEQUEUED, + Status::BUFFERED, Status::PUBLISHING, Status::PUBLISHED, Status::FAILED ] scope :queued, -> { where(status: Status::QUEUED) } - scope :dequeued, -> { where(status: Status::DEQUEUED) } + scope :buffered, -> { where(status: Status::BUFFERED) } scope :publishing, -> { where(status: Status::PUBLISHING) } scope :published, -> { where(status: Status::PUBLISHED) } scope :failed, -> { where(status: Status::FAILED) } diff --git a/lib/outboxer/publisher.rb b/lib/outboxer/publisher.rb index 26fd9b6a..ce1378b3 100644 --- a/lib/outboxer/publisher.rb +++ b/lib/outboxer/publisher.rb @@ -191,24 +191,24 @@ def create_publisher_threads(id:, name:, break if message.nil? publish_message( - id: id, name: name, dequeued_message: message, + id: id, name: name, buffered_message: message, logger: logger, process: process, kernel: kernel, &block) end end end end - def dequeue_messages(id:, name:, - queue:, buffer:, poll:, tick:, - signal_read:, logger:, process:, kernel:) - dequeue_limit = buffer - queue.size + def buffer_messages(id:, name:, + queue:, buffer:, poll:, tick:, + signal_read:, logger:, process:, kernel:) + buffer_limit = buffer - queue.size - if dequeue_limit > 0 - dequeued_messages = Messages.dequeue( - limit: dequeue_limit, publisher_id: id, publisher_name: name) + if buffer_limit > 0 + buffered_messages = Messages.buffer( + limit: buffer_limit, publisher_id: id, publisher_name: name) - if dequeued_messages.count > 0 - dequeued_messages.each { |message| queue.push(message) } + if buffered_messages.count > 0 + buffered_messages.each { |message| queue.push(message) } else Publisher.sleep( poll, @@ -410,7 +410,7 @@ def publish( loop do case @status when Status::PUBLISHING - dequeue_messages( + buffer_messages( id: id, name: name, queue: queue, buffer: buffer, poll: poll, tick:, @@ -445,14 +445,14 @@ def publish( database.disconnect(logger: logger) end - def publish_message(id:, name:, dequeued_message:, logger:, kernel:, process:, &block) - dequeued_at = process.clock_gettime(Process::CLOCK_MONOTONIC) + def publish_message(id:, name:, buffered_message:, logger:, kernel:, process:, &block) + buffered_at = process.clock_gettime(Process::CLOCK_MONOTONIC) message = Message.publishing( - id: dequeued_message[:id], publisher_id: id, publisher_name: name) + id: buffered_message[:id], publisher_id: id, publisher_name: name) logger.debug "Outboxer publishing message #{message[:id]} for "\ "#{message[:messageable_type]}::#{message[:messageable_id]} "\ - "in #{(process.clock_gettime(Process::CLOCK_MONOTONIC) - dequeued_at).round(3)}s" + "in #{(process.clock_gettime(Process::CLOCK_MONOTONIC) - buffered_at).round(3)}s" begin block.call(message) @@ -460,7 +460,7 @@ def publish_message(id:, name:, dequeued_message:, logger:, kernel:, process:, & Message.failed(id: message[:id], exception: e, publisher_id: id, publisher_name: name) logger.debug "Outboxer failed to publish message #{message[:id]} for "\ "#{message[:messageable_type]}::#{message[:messageable_id]} "\ - "in #{(process.clock_gettime(Process::CLOCK_MONOTONIC) - dequeued_at).round(3)}s" + "in #{(process.clock_gettime(Process::CLOCK_MONOTONIC) - buffered_at).round(3)}s" raise end @@ -468,7 +468,7 @@ def publish_message(id:, name:, dequeued_message:, logger:, kernel:, process:, & Message.published(id: message[:id], publisher_id: id, publisher_name: name) logger.debug "Outboxer published message #{message[:id]} for "\ "#{message[:messageable_type]}::#{message[:messageable_id]} "\ - "in #{(process.clock_gettime(Process::CLOCK_MONOTONIC) - dequeued_at).round(3)}s" + "in #{(process.clock_gettime(Process::CLOCK_MONOTONIC) - buffered_at).round(3)}s" rescue StandardError => e logger.error( "#{e.class}: #{e.message}\n"\ diff --git a/lib/outboxer/web/views/error.erb b/lib/outboxer/web/views/error.erb index 8cf33e67..3f334f5d 100644 --- a/lib/outboxer/web/views/error.erb +++ b/lib/outboxer/web/views/error.erb @@ -26,7 +26,7 @@ Publishing