Skip to content

Commit

Permalink
add Publisher stats (#146)
Browse files Browse the repository at this point in the history
  • Loading branch information
bedrock-adam authored Nov 3, 2024
1 parent 206cfe1 commit 12f6eab
Show file tree
Hide file tree
Showing 8 changed files with 252 additions and 106 deletions.
28 changes: 16 additions & 12 deletions bin/outboxer_publisher
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ options = {
concurrency: ENV.fetch('OUTBOXER_CONCURRENCY', 1).to_i,
poll_interval: ENV.fetch('OUTBOXER_POLL_INTERVAL', 10).to_f,
tick_interval: ENV.fetch('OUTBOXER_TICK_INTERVAL', 0.1).to_f,
heartbeat_interval: ENV.fetch('OUTBOXER_HEARTBEAT_INTERVAL', 5).to_f,
log_level: ENV.fetch('OUTBOXER_LOG_LEVEL', 'info').downcase,
sidekiq_redis_url: ENV.fetch('SIDEKIQ_REDIS_URL', 'redis://localhost:6379/0')
}
Expand All @@ -28,19 +29,22 @@ logger.level = Logger.const_get(options[:log_level].upcase)

Outboxer::Database.connect(config: db_config, logger: logger)

Outboxer::Publisher.publish(
batch_size: options[:batch_size],
concurrency: options[:concurrency],
poll_interval: options[:poll_interval],
tick_interval: options[:tick_interval],
logger: logger
) do |message|
case message[:messageable_type]
when 'Event'
EventCreatedJob.perform_async({ 'id' => message[:messageable_id] })
begin
Outboxer::Publisher.publish(
batch_size: options[:batch_size],
concurrency: options[:concurrency],
poll_interval: options[:poll_interval],
tick_interval: options[:tick_interval],
heartbeat_interval: options[:heartbeat_interval],
logger: logger
) do |message|
case message[:messageable_type]
when 'Event'
EventCreatedJob.perform_async({ 'id' => message[:messageable_id] })
end
end
ensure
Outboxer::Database.disconnect(logger: logger)
end

Outboxer::Database.disconnect(logger: logger)

# bin/outboxer_publisher
16 changes: 16 additions & 0 deletions config/brakeman.ignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"ignored_warnings": [
{
"fingerprint": "1c5ef0e65c33f7623a6ee8ad25776d3ff04ad14854fd09191147088740c4afb1",
"warning_type": "Command Injection",
"file": "lib/outboxer/publisher.rb",
"line": 187
},
{
"fingerprint": "df8ee41d7c5028a1429c07753f227d3a6d50128f04364d957988e9a60863b375",
"warning_type": "Command Injection",
"file": "lib/outboxer/publisher.rb",
"line": 188
}
]
}
6 changes: 2 additions & 4 deletions db/migrate/create_outboxer_publishers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,15 @@ class CreateOutboxerPublishers < ActiveRecord::Migration[6.1]
def up
ActiveRecord::Base.transaction do
create_table :outboxer_publishers do |t|
t.string :key, limit: 279, null: false
# 255 (hostname) + 1 (colon) + 10 (PID) + 1 (colon) + 12 (unique ID) = 279 characters
t.string :name, limit: 263, null: false
# 255 (hostname) + 1 (colon) + 7 (pid)

t.string :status, limit: 255, null: false

t.json :info, null: false

t.timestamps
end

add_index :outboxer_publishers, :key, unique: true
end
end

Expand Down
2 changes: 1 addition & 1 deletion lib/outboxer/message.rb
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def published(id:, current_utc_time: Time.now.utc,
end

def failed(id:, exception:, current_utc_time: Time.now.utc,
hostname: Socket.gethostname, process_id: Process.pid)
hostname: Socket.gethostname, process_id: Process.pid)
ActiveRecord::Base.connection_pool.with_connection do
ActiveRecord::Base.transaction do
message = Models::Message.order(created_at: :asc).lock.find_by!(id: id)
Expand Down
4 changes: 2 additions & 2 deletions lib/outboxer/models/publisher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ module Models
class Publisher < ::ActiveRecord::Base
self.table_name = :outboxer_publishers

validates :key, presence: true, length: { maximum: 279 }
# 255 (hostname) + 1 (colon) + 10 (PID) + 1 (colon) + 12 (unique ID) = 279 characters
validates :name, presence: true, length: { maximum: 263 }
# 255 (hostname) + 1 (colon) + 7 (pid)

module Status
PUBLISHING = 'publishing'
Expand Down
Loading

0 comments on commit 12f6eab

Please sign in to comment.