Skip to content

Commit

Permalink
Ensure publisher throughput is based on messages published (#169)
Browse files Browse the repository at this point in the history
  • Loading branch information
bedrock-adam authored Nov 11, 2024
1 parent c20244c commit d12d58b
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 10 deletions.
20 changes: 14 additions & 6 deletions db/migrate/create_outboxer_messages.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,24 @@ def up

t.timestamps

t.string :updated_by_publisher_name, limit: 263
# 255 (hostname) + 1 (colon) + 7 (pid)

t.string :updated_by_publisher_name, limit: 263 # 255 (hostname) + 1 (colon) + 7 (pid)
t.bigint :updated_by_publisher_id
end

add_index :outboxer_messages, :status
add_index :outboxer_messages, [:status, :updated_at]
# messages by status count
add_index :outboxer_messages, :status, name: 'idx_outboxer_status'

# messages by status latency
add_index :outboxer_messages, [:status, :updated_at],
name: 'idx_outboxer_status_updated_at'

# publisher latency
add_index :outboxer_messages, [:updated_by_publisher_id, :updated_at],
name: 'idx_outboxer_pub_id_updated_at'

add_index :outboxer_messages, :updated_by_publisher_id
# publisher throughput
add_index :outboxer_messages, [:status, :updated_by_publisher_id, :updated_at],
name: 'idx_outboxer_status_pub_id_updated_at'
end

def down
Expand Down
9 changes: 5 additions & 4 deletions lib/outboxer/publisher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -258,17 +258,18 @@ def create_heartbeat_thread(id:, name:,
raise NotFound.new(id: id), cause: error
end

end_rtt = process.clock_gettime(process::CLOCK_MONOTONIC)
rtt = end_rtt - start_rtt

signal = publisher.signals.order(created_at: :asc).first

if !signal.nil?
handle_signal(id: id, name: signal.name, logger: logger, process: process)
signal.destroy
end

end_rtt = process.clock_gettime(process::CLOCK_MONOTONIC)
rtt = end_rtt - start_rtt

throughput = messages = Models::Message
throughput = Models::Message
.where(status: Models::Message::Status::PUBLISHED)
.where(updated_by_publisher_id: id)
.where('updated_at >= ?', 1.second.ago)
.count
Expand Down

0 comments on commit d12d58b

Please sign in to comment.