From d12d58b744edac237941a97472fa7ed4318d2b82 Mon Sep 17 00:00:00 2001 From: Adam Mikulasev Date: Mon, 11 Nov 2024 15:20:25 +1100 Subject: [PATCH] Ensure publisher throughput is based on messages published (#169) --- db/migrate/create_outboxer_messages.rb | 20 ++++++++++++++------ lib/outboxer/publisher.rb | 9 +++++---- 2 files changed, 19 insertions(+), 10 deletions(-) diff --git a/db/migrate/create_outboxer_messages.rb b/db/migrate/create_outboxer_messages.rb index 69f4dc3c..e26d6cb9 100644 --- a/db/migrate/create_outboxer_messages.rb +++ b/db/migrate/create_outboxer_messages.rb @@ -8,16 +8,24 @@ def up t.timestamps - t.string :updated_by_publisher_name, limit: 263 - # 255 (hostname) + 1 (colon) + 7 (pid) - + t.string :updated_by_publisher_name, limit: 263 # 255 (hostname) + 1 (colon) + 7 (pid) t.bigint :updated_by_publisher_id end - add_index :outboxer_messages, :status - add_index :outboxer_messages, [:status, :updated_at] + # messages by status count + add_index :outboxer_messages, :status, name: 'idx_outboxer_status' + + # messages by status latency + add_index :outboxer_messages, [:status, :updated_at], + name: 'idx_outboxer_status_updated_at' + + # publisher latency + add_index :outboxer_messages, [:updated_by_publisher_id, :updated_at], + name: 'idx_outboxer_pub_id_updated_at' - add_index :outboxer_messages, :updated_by_publisher_id + # publisher throughput + add_index :outboxer_messages, [:status, :updated_by_publisher_id, :updated_at], + name: 'idx_outboxer_status_pub_id_updated_at' end def down diff --git a/lib/outboxer/publisher.rb b/lib/outboxer/publisher.rb index e491b9bf..472f06ac 100644 --- a/lib/outboxer/publisher.rb +++ b/lib/outboxer/publisher.rb @@ -258,6 +258,9 @@ def create_heartbeat_thread(id:, name:, raise NotFound.new(id: id), cause: error end + end_rtt = process.clock_gettime(process::CLOCK_MONOTONIC) + rtt = end_rtt - start_rtt + signal = publisher.signals.order(created_at: :asc).first if !signal.nil? @@ -265,10 +268,8 @@ def create_heartbeat_thread(id:, name:, signal.destroy end - end_rtt = process.clock_gettime(process::CLOCK_MONOTONIC) - rtt = end_rtt - start_rtt - - throughput = messages = Models::Message + throughput = Models::Message + .where(status: Models::Message::Status::PUBLISHED) .where(updated_by_publisher_id: id) .where('updated_at >= ?', 1.second.ago) .count