From fa5af5abab73aceb031fc3e3b16ea9e5f3c02b3f Mon Sep 17 00:00:00 2001 From: Adam Mikulasev Date: Sun, 10 Nov 2024 18:02:30 +1100 Subject: [PATCH] Add publisher settings (#162) --- db/migrate/create_outboxer_publishers.rb | 6 ++--- lib/outboxer/publisher.rb | 27 ++++++++++++++----- lib/outboxer/web/views/home.erb | 4 +++ lib/outboxer/web/views/publisher.erb | 26 ++++++++++++++++++ spec/factories/outboxer_publishers.rb | 9 +++++++ .../lib/outboxer/publisher/find_by_id_spec.rb | 21 ++++++++++++--- 6 files changed, 78 insertions(+), 15 deletions(-) diff --git a/db/migrate/create_outboxer_publishers.rb b/db/migrate/create_outboxer_publishers.rb index 792a58e..f5c7a4e 100644 --- a/db/migrate/create_outboxer_publishers.rb +++ b/db/migrate/create_outboxer_publishers.rb @@ -2,11 +2,9 @@ class CreateOutboxerPublishers < ActiveRecord::Migration[6.1] def up ActiveRecord::Base.transaction do create_table :outboxer_publishers do |t| - t.string :name, limit: 263, null: false - # 255 (hostname) + 1 (colon) + 7 (pid) - + t.string :name, limit: 263, null: false # 255 (hostname) + 1 (colon) + 7 (pid) t.string :status, limit: 255, null: false - + t.json :settings, null: false t.json :metrics, null: false t.timestamps diff --git a/lib/outboxer/publisher.rb b/lib/outboxer/publisher.rb index 37aca18..e491b9b 100644 --- a/lib/outboxer/publisher.rb +++ b/lib/outboxer/publisher.rb @@ -19,6 +19,7 @@ def find_by_id(id:) id: publisher.id, name: publisher.name, status: publisher.status, + settings: publisher.settings, metrics: publisher.metrics, created_at: publisher.created_at.utc, updated_at: publisher.updated_at.utc, @@ -36,18 +37,27 @@ def find_by_id(id:) raise NotFound.new(id: id), cause: error end - def create(name:, current_time: Time.now) + def create(name:, buffer:, concurrency:, + tick:, poll:, heartbeat:, current_time: Time.now) ActiveRecord::Base.connection_pool.with_connection do ActiveRecord::Base.transaction do publisher = Models::Publisher.create!( - name: name, status: Status::PUBLISHING, metrics: { + name: name, + status: Status::PUBLISHING, + settings: { + 'buffer' => buffer, + 'concurrency' => concurrency, + 'tick' => tick, + 'poll' => poll, + 'heartbeat' => heartbeat }, + metrics: { 'throughput' => 0, 'latency' => 0, 'cpu' => 0, 'rss ' => 0, - 'rtt' => 0 - }, - created_at: current_time, updated_at: current_time) + 'rtt' => 0 }, + created_at: current_time, + updated_at: current_time) @status = Status::PUBLISHING @@ -55,6 +65,7 @@ def create(name:, current_time: Time.now) id: publisher.id, name: publisher.name, status: publisher.status, + settings: publisher.settings, metrics: publisher.metrics, created_at: publisher.created_at, updated_at: publisher.updated_at @@ -355,7 +366,7 @@ def publish( env: 'development', db_config_path: ::File.expand_path('config/database.yml', ::Dir.pwd), buffer: 1000, concurrency: 2, - poll: 5.0, tick: 0.1, heartbeat: 5.0, + tick: 0.1, poll: 5.0, heartbeat: 5.0, logger: Logger.new($stdout, level: Logger::INFO), database: Database, time: ::Time, socket: ::Socket, process: ::Process, kernel: ::Kernel, @@ -376,7 +387,9 @@ def publish( queue = Queue.new - publisher = create(name: name) + publisher = create( + name: name, buffer: buffer, concurrency: concurrency, + tick: tick, poll: poll, heartbeat: heartbeat) id = publisher[:id] publisher_threads = create_publisher_threads( diff --git a/lib/outboxer/web/views/home.erb b/lib/outboxer/web/views/home.erb index 25afa73..9dd1444 100644 --- a/lib/outboxer/web/views/home.erb +++ b/lib/outboxer/web/views/home.erb @@ -34,6 +34,10 @@ href="<%= outboxer_path("/publisher/#{publisher[:id]}#{normalise_query_string(time_zone: denormalised_query_params[:time_zone])}") %>"> <%= publisher.name %> +
+ buffer: <%= publisher[:settings]['buffer'] %>, concurrency: <%= publisher[:settings]['concurrency'] %>
+ tick: <%= publisher[:settings]['tick'] %>, poll: <%= publisher[:settings]['poll'] %>, heartbeat: <%= publisher[:settings]['heartbeat'] %> +
<%= time_ago_in_words(publisher.created_at) %> ago <%= time_ago_in_words(publisher.updated_at) %> ago diff --git a/lib/outboxer/web/views/publisher.erb b/lib/outboxer/web/views/publisher.erb index 69baec8..858155c 100644 --- a/lib/outboxer/web/views/publisher.erb +++ b/lib/outboxer/web/views/publisher.erb @@ -71,6 +71,32 @@ +
Settings
+ + + + + + + + + + + + + + + + + + + + + + + +
Buffer<%= publisher[:settings]['buffer'] %>
Concurrency<%= publisher[:settings]['concurrency'] %>
Tick<%= publisher[:settings]['tick'] %> seconds
Poll<%= publisher[:settings]['poll'] %> seconds
Heartbeat<%= publisher[:settings]['heartbeat'] %> seconds
+
Metrics
diff --git a/spec/factories/outboxer_publishers.rb b/spec/factories/outboxer_publishers.rb index 8da5a25..bc9196d 100644 --- a/spec/factories/outboxer_publishers.rb +++ b/spec/factories/outboxer_publishers.rb @@ -2,6 +2,15 @@ factory :outboxer_publisher, class: 'Outboxer::Models::Publisher' do name { 'server-01:57000' } status { Outboxer::Models::Publisher::Status::PUBLISHING } + settings { + { + 'buffer' => 1000, + 'concurrency' => 3, + 'tick' => 0.1, + 'poll' => 5.0, + 'heartbeat' => 5 + } + } metrics { { 'throughput' => 1000, diff --git a/spec/lib/outboxer/publisher/find_by_id_spec.rb b/spec/lib/outboxer/publisher/find_by_id_spec.rb index 4f67743..24c804b 100644 --- a/spec/lib/outboxer/publisher/find_by_id_spec.rb +++ b/spec/lib/outboxer/publisher/find_by_id_spec.rb @@ -8,13 +8,18 @@ module Outboxer create(:outboxer_publisher, name: 'server-09:67000', status: Models::Publisher::Status::PUBLISHING, + settings: { + 'buffer' => 1000, + 'concurrency' => 2, + 'tick' => 0.1, + 'poll' => 5.0, + 'heartbeat' => 5.0 }, metrics: { - 'throughput' => 1000, + 'throughput' => 700, 'latency' => 0, 'cpu' => '10.5%', 'rss' => '40.95 MB', - 'rtt' => '3.35 ms' - }, + 'rtt' => '3.35 ms' }, created_at: DateTime.parse("2024-11-10T02:00:00"), updated_at: DateTime.parse("2024-11-10T02:00:10") ) @@ -38,8 +43,16 @@ module Outboxer expect(result[:id]).to eq(publisher.id) expect(result[:name]).to eq(publisher.name) expect(result[:status]).to eq('publishing') + expect(result[:settings]).to eq({ + 'buffer' => 1000, + 'concurrency' => 2, + 'tick' => 0.1, + 'poll' => 5.0, + 'heartbeat' => 5 + }) + expect(result[:created_at]).to eq(publisher.created_at.utc) expect(result[:metrics]).to eq({ - 'throughput' => 1000, + 'throughput' => 700, 'latency' => 0, 'cpu' => '10.5%', 'rss' => '40.95 MB',