Skip to content

Commit

Permalink
Add publisher settings (#162)
Browse files Browse the repository at this point in the history
  • Loading branch information
bedrock-adam authored Nov 10, 2024
1 parent 7e1f8f7 commit fa5af5a
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 15 deletions.
6 changes: 2 additions & 4 deletions db/migrate/create_outboxer_publishers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@ class CreateOutboxerPublishers < ActiveRecord::Migration[6.1]
def up
ActiveRecord::Base.transaction do
create_table :outboxer_publishers do |t|
t.string :name, limit: 263, null: false
# 255 (hostname) + 1 (colon) + 7 (pid)

t.string :name, limit: 263, null: false # 255 (hostname) + 1 (colon) + 7 (pid)
t.string :status, limit: 255, null: false

t.json :settings, null: false
t.json :metrics, null: false

t.timestamps
Expand Down
27 changes: 20 additions & 7 deletions lib/outboxer/publisher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ def find_by_id(id:)
id: publisher.id,
name: publisher.name,
status: publisher.status,
settings: publisher.settings,
metrics: publisher.metrics,
created_at: publisher.created_at.utc,
updated_at: publisher.updated_at.utc,
Expand All @@ -36,25 +37,35 @@ def find_by_id(id:)
raise NotFound.new(id: id), cause: error
end

def create(name:, current_time: Time.now)
def create(name:, buffer:, concurrency:,
tick:, poll:, heartbeat:, current_time: Time.now)
ActiveRecord::Base.connection_pool.with_connection do
ActiveRecord::Base.transaction do
publisher = Models::Publisher.create!(
name: name, status: Status::PUBLISHING, metrics: {
name: name,
status: Status::PUBLISHING,
settings: {
'buffer' => buffer,
'concurrency' => concurrency,
'tick' => tick,
'poll' => poll,
'heartbeat' => heartbeat },
metrics: {
'throughput' => 0,
'latency' => 0,
'cpu' => 0,
'rss ' => 0,
'rtt' => 0
},
created_at: current_time, updated_at: current_time)
'rtt' => 0 },
created_at: current_time,
updated_at: current_time)

@status = Status::PUBLISHING

{
id: publisher.id,
name: publisher.name,
status: publisher.status,
settings: publisher.settings,
metrics: publisher.metrics,
created_at: publisher.created_at,
updated_at: publisher.updated_at
Expand Down Expand Up @@ -355,7 +366,7 @@ def publish(
env: 'development',
db_config_path: ::File.expand_path('config/database.yml', ::Dir.pwd),
buffer: 1000, concurrency: 2,
poll: 5.0, tick: 0.1, heartbeat: 5.0,
tick: 0.1, poll: 5.0, heartbeat: 5.0,
logger: Logger.new($stdout, level: Logger::INFO),
database: Database,
time: ::Time, socket: ::Socket, process: ::Process, kernel: ::Kernel,
Expand All @@ -376,7 +387,9 @@ def publish(

queue = Queue.new

publisher = create(name: name)
publisher = create(
name: name, buffer: buffer, concurrency: concurrency,
tick: tick, poll: poll, heartbeat: heartbeat)
id = publisher[:id]

publisher_threads = create_publisher_threads(
Expand Down
4 changes: 4 additions & 0 deletions lib/outboxer/web/views/home.erb
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@
href="<%= outboxer_path("/publisher/#{publisher[:id]}#{normalise_query_string(time_zone: denormalised_query_params[:time_zone])}") %>">
<%= publisher.name %>
</a>
<div class="text-muted small mt-1">
buffer: <%= publisher[:settings]['buffer'] %>, concurrency: <%= publisher[:settings]['concurrency'] %><br>
tick: <%= publisher[:settings]['tick'] %>, poll: <%= publisher[:settings]['poll'] %>, heartbeat: <%= publisher[:settings]['heartbeat'] %>
</div>
</td>
<td><%= time_ago_in_words(publisher.created_at) %> ago</td>
<td><%= time_ago_in_words(publisher.updated_at) %> ago</td>
Expand Down
26 changes: 26 additions & 0 deletions lib/outboxer/web/views/publisher.erb
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,32 @@
</tbody>
</table>

<h5>Settings</h5>
<table class="table table-sm">
<tbody>
<tr>
<th scope="row">Buffer</th>
<td><%= publisher[:settings]['buffer'] %></td>
</tr>
<tr>
<th scope="row">Concurrency</th>
<td><%= publisher[:settings]['concurrency'] %></td>
</tr>
<tr>
<th scope="row">Tick</th>
<td><%= publisher[:settings]['tick'] %> seconds</td>
</tr>
<tr>
<th scope="row">Poll</th>
<td><%= publisher[:settings]['poll'] %> seconds</td>
</tr>
<tr>
<th scope="row">Heartbeat</th>
<td><%= publisher[:settings]['heartbeat'] %> seconds</td>
</tr>
</tbody>
</table>

<h5>Metrics</h5>
<table class="table table-sm">
<tbody>
Expand Down
9 changes: 9 additions & 0 deletions spec/factories/outboxer_publishers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@
factory :outboxer_publisher, class: 'Outboxer::Models::Publisher' do
name { 'server-01:57000' }
status { Outboxer::Models::Publisher::Status::PUBLISHING }
settings {
{
'buffer' => 1000,
'concurrency' => 3,
'tick' => 0.1,
'poll' => 5.0,
'heartbeat' => 5
}
}
metrics {
{
'throughput' => 1000,
Expand Down
21 changes: 17 additions & 4 deletions spec/lib/outboxer/publisher/find_by_id_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,18 @@ module Outboxer
create(:outboxer_publisher,
name: 'server-09:67000',
status: Models::Publisher::Status::PUBLISHING,
settings: {
'buffer' => 1000,
'concurrency' => 2,
'tick' => 0.1,
'poll' => 5.0,
'heartbeat' => 5.0 },
metrics: {
'throughput' => 1000,
'throughput' => 700,
'latency' => 0,
'cpu' => '10.5%',
'rss' => '40.95 MB',
'rtt' => '3.35 ms'
},
'rtt' => '3.35 ms' },
created_at: DateTime.parse("2024-11-10T02:00:00"),
updated_at: DateTime.parse("2024-11-10T02:00:10")
)
Expand All @@ -38,8 +43,16 @@ module Outboxer
expect(result[:id]).to eq(publisher.id)
expect(result[:name]).to eq(publisher.name)
expect(result[:status]).to eq('publishing')
expect(result[:settings]).to eq({
'buffer' => 1000,
'concurrency' => 2,
'tick' => 0.1,
'poll' => 5.0,
'heartbeat' => 5
})
expect(result[:created_at]).to eq(publisher.created_at.utc)
expect(result[:metrics]).to eq({
'throughput' => 1000,
'throughput' => 700,
'latency' => 0,
'cpu' => '10.5%',
'rss' => '40.95 MB',
Expand Down

0 comments on commit fa5af5a

Please sign in to comment.