From 575f9435fd7b9b27805b7c55b2027ac12f7bf3bb Mon Sep 17 00:00:00 2001 From: Adam Mikulasev Date: Sun, 29 Dec 2024 20:12:31 +1100 Subject: [PATCH] Integrate better with sidekiq (#215) --- .github/workflows/master.yml | 8 +++ .../message/publish_job.rb | 12 +++- .../outboxer_integration/test/started_job.rb | 16 +++++ .../test/completed_event.rb | 6 ++ .../test/started_event.rb | 6 ++ app/models/test_event.rb | 2 - bin/console | 4 +- config/sidekiq.rb | 8 ++- db/migrate/create_events.rb | 13 ++-- generators/install_generator.rb | 22 +++++-- spec/bin/outboxer_publisher_spec.rb | 61 +++++++++++++++++ .../message/publish_job_spec.rb | 66 +++++++++++++++++++ 12 files changed, 207 insertions(+), 17 deletions(-) create mode 100644 app/jobs/outboxer_integration/test/started_job.rb create mode 100644 app/models/outboxer_integration/test/completed_event.rb create mode 100644 app/models/outboxer_integration/test/started_event.rb delete mode 100644 app/models/test_event.rb create mode 100644 spec/bin/outboxer_publisher_spec.rb create mode 100644 spec/jobs/outboxer_integration/message/publish_job_spec.rb diff --git a/.github/workflows/master.yml b/.github/workflows/master.yml index df7e1590..9af02b9d 100644 --- a/.github/workflows/master.yml +++ b/.github/workflows/master.yml @@ -41,6 +41,10 @@ jobs: ports: - 5432:5432 options: --health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 5 + redis: + image: redis + ports: + - 6379:6379 steps: - uses: actions/checkout@v3 - name: set up ruby @@ -82,6 +86,10 @@ jobs: --health-interval=10s --health-timeout=5s --health-retries=5 + redis: + image: redis + ports: + - 6379:6379 steps: - uses: actions/checkout@v3 - name: set up ruby diff --git a/app/jobs/outboxer_integration/message/publish_job.rb b/app/jobs/outboxer_integration/message/publish_job.rb index ee87dda8..3ed06505 100644 --- a/app/jobs/outboxer_integration/message/publish_job.rb +++ b/app/jobs/outboxer_integration/message/publish_job.rb @@ -3,8 +3,18 @@ module Message class PublishJob include Sidekiq::Job + MESSAGEABLE_TYPE_REGEX = /\A([A-Za-z]+)::([A-Za-z]+)::([A-Za-z]+)Event\z/ + def perform(args) - Sidekiq.logger.info "OutboxerIntegration::Message::PublishJob#perform: #{args.inspect}" + captures = args['messageable_type'].match(MESSAGEABLE_TYPE_REGEX)&.captures + return if captures.nil? + + begin + "#{captures[0]}::#{captures[1]}::#{captures[2]}Job".constantize + .perform_async('event_id' => args['messageable_id']) + rescue NameError + # no-op if constant not defined + end end end end diff --git a/app/jobs/outboxer_integration/test/started_job.rb b/app/jobs/outboxer_integration/test/started_job.rb new file mode 100644 index 00000000..cd4fd54a --- /dev/null +++ b/app/jobs/outboxer_integration/test/started_job.rb @@ -0,0 +1,16 @@ +module OutboxerIntegration + class Test + class StartedJob + include Sidekiq::Job + + def perform(args) + started_event = Test::StartedEvent.find(args['event_id']) + + Test::CompletedEvent.create!( + body: { + 'test' => { + 'id' => started_event.body['test']['id'] } }) + end + end + end +end diff --git a/app/models/outboxer_integration/test/completed_event.rb b/app/models/outboxer_integration/test/completed_event.rb new file mode 100644 index 00000000..f9cc2d3d --- /dev/null +++ b/app/models/outboxer_integration/test/completed_event.rb @@ -0,0 +1,6 @@ +module OutboxerIntegration + class Test + class CompletedEvent < ::Event + end + end +end diff --git a/app/models/outboxer_integration/test/started_event.rb b/app/models/outboxer_integration/test/started_event.rb new file mode 100644 index 00000000..fd8370f8 --- /dev/null +++ b/app/models/outboxer_integration/test/started_event.rb @@ -0,0 +1,6 @@ +module OutboxerIntegration + class Test + class StartedEvent < ::Event + end + end +end diff --git a/app/models/test_event.rb b/app/models/test_event.rb deleted file mode 100644 index b34f99ce..00000000 --- a/app/models/test_event.rb +++ /dev/null @@ -1,2 +0,0 @@ -class TestEvent < Event -end diff --git a/bin/console b/bin/console index 59f8fddd..6bbe2978 100755 --- a/bin/console +++ b/bin/console @@ -6,7 +6,9 @@ require 'outboxer' require 'sidekiq' require 'irb' -require_relative "../app/models/event" +Dir.glob(File.join(__dir__, '../app/models/**/*.rb')).each do |file| + require_relative file +end env = ENV['OUTBOXER_ENV'] || 'development' config = Outboxer::Database.config(env: env, pool: 1) diff --git a/config/sidekiq.rb b/config/sidekiq.rb index 82c352a6..e59e3a69 100644 --- a/config/sidekiq.rb +++ b/config/sidekiq.rb @@ -1,11 +1,13 @@ require 'bundler/setup' -require 'outboxer' require 'active_record' require 'sidekiq' -require_relative '../app/jobs/event_created_job' +require 'outboxer' + +Dir[File.expand_path('../app/models/**/*.rb', __dir__)].each { |file| require file } +Dir[File.expand_path('../app/jobs/**/*.rb', __dir__)].each { |file| require file } -environment = ENV['APP_ENV'] || 'development' +environment = ENV['RAILS_ENV'] || 'development' db_config_content = File.read('config/database.yml') db_config_erb_result = ERB.new(db_config_content).result diff --git a/db/migrate/create_events.rb b/db/migrate/create_events.rb index 7735eba4..2ab372cf 100644 --- a/db/migrate/create_events.rb +++ b/db/migrate/create_events.rb @@ -1,8 +1,13 @@ class CreateEvents < ActiveRecord::Migration[7.0] def up create_table :events do |t| - t.bigint :user_id - t.bigint :tenant_id + # t.bigint :user_id + # t.bigint :tenant_id + + # t.string :eventable_type, limit: 255 + # t.bigint :eventable_id + # t.index [:eventable_type, :eventable_id] + t.string :type, null: false, limit: 255 t.send(json_column_type, :body) t.datetime :created_at, null: false @@ -19,10 +24,6 @@ def json_column_type case ActiveRecord::Base.connection.adapter_name when /PostgreSQL/ :jsonb - when /MySQL/, /MariaDB/ - :json - when /SQLite/ - :text else :json end diff --git a/generators/install_generator.rb b/generators/install_generator.rb index 44996902..5ed3e26f 100644 --- a/generators/install_generator.rb +++ b/generators/install_generator.rb @@ -42,14 +42,18 @@ def copy_migrations "db/migrate/create_events.rb") end - def copy_model + def copy_models copy_file( "app/models/event.rb", "app/models/event.rb") copy_file( - "app/models/test_event.rb", - "app/models/test_event.rb") + "app/models/outboxer_integration/test/started_event.rb", + "app/models/outboxer_integration/test/started_event.rb") + + copy_file( + "app/models/outboxer_integration/test/completed_event.rb", + "app/models/outboxer_integration/test/completed_event.rb") end def copy_bin_file @@ -57,10 +61,20 @@ def copy_bin_file run "chmod +x bin/outboxer_publisher" end - def copy_job + def copy_jobs copy_file( "app/jobs/outboxer_integration/message/publish_job.rb", "app/jobs/outboxer_integration/message/publish_job.rb") + + copy_file( + "app/jobs/outboxer_integration/test/started_job.rb", + "app/jobs/outboxer_integration/test/started_job.rb") + end + + def copy_specs + copy_file( + "spec/bin/outboxer_publisher_spec.rb", + "spec/bin/outboxer_publisher_spec.rb") end end end diff --git a/spec/bin/outboxer_publisher_spec.rb b/spec/bin/outboxer_publisher_spec.rb new file mode 100644 index 00000000..28a0291e --- /dev/null +++ b/spec/bin/outboxer_publisher_spec.rb @@ -0,0 +1,61 @@ +require 'spec_helper' + +require 'sidekiq' +require 'sidekiq/testing' + +require File.join(Dir.pwd, 'app/models/event') +require File.join(Dir.pwd, 'app/models/outboxer_integration/test/started_event') +require File.join(Dir.pwd, 'app/models/outboxer_integration/test/completed_event') + +RSpec.describe 'bin/outboxer_publisher' do + let(:test_id) { rand(1_000) + 1 } + + it 'performs event job handler async' do + Sidekiq::Testing.disable! + + OutboxerIntegration::Test::StartedEvent.create!( + body: { + 'test' => { + 'id' => test_id } }) + + outboxer_publisher_env = { + "OUTBOXER_ENV" => "test", + "OUTBOXER_REDIS_URL" => "redis://localhost:6379/0" } + outboxer_publisher_cmd = File.join(Dir.pwd, 'bin', 'outboxer_publisher') + outboxer_publisher_pid = spawn(outboxer_publisher_env, outboxer_publisher_cmd) + + sidekiq_env = { + "RAILS_ENV" => "test", + "REDIS_URL" => "redis://localhost:6379/0" } + sidekiq_cmd = "bundle exec sidekiq -c 10 -q default -r ./config/sidekiq.rb" + sidekiq_pid = spawn(sidekiq_env, sidekiq_cmd) + + max_attempts = 10 + + completed_event = nil + + max_attempts.times do |attempt| + completed_event = OutboxerIntegration::Test::CompletedEvent.last + break if completed_event + + sleep 1 + + puts "OutboxerIntegration::Test::CompletedEvent not found. "\ + "Retrying (attempt #{attempt + 1}/#{max_attempts})..." + end + + expect(completed_event.body['test']['id']).to eql(test_id) + ensure + if sidekiq_pid + Process.kill("TERM", sidekiq_pid) + Process.wait(sidekiq_pid) + end + + if outboxer_publisher_pid + Process.kill('TERM', outboxer_publisher_pid) + Process.wait(outboxer_publisher_pid) + end + + Sidekiq::Testing.fake! + end +end diff --git a/spec/jobs/outboxer_integration/message/publish_job_spec.rb b/spec/jobs/outboxer_integration/message/publish_job_spec.rb new file mode 100644 index 00000000..030bb3d4 --- /dev/null +++ b/spec/jobs/outboxer_integration/message/publish_job_spec.rb @@ -0,0 +1,66 @@ +require 'spec_helper' +require 'sidekiq' + +require_relative '../../../../app/jobs/outboxer_integration/message/publish_job' +require_relative '../../../../app/jobs/outboxer_integration/test/started_job' + +require 'sidekiq/testing' + +module OutboxerIntegration + module Message + RSpec.describe PublishJob, type: :job do + describe '#perform' do + context 'when OutboxerIntegration::Test::StartedEvent' do + let(:args) do + { + 'messageable_type' => 'OutboxerIntegration::Test::StartedEvent', + 'messageable_id' => '123' + } + end + + it 'performs OutboxerIntegration::Test::StartedJob async' do + PublishJob.new.perform(args) + + expect(OutboxerIntegration::Test::StartedJob.jobs).to match([ + hash_including('args' => [include('event_id' => '123')]) + ]) + end + end + + context 'with invalid messageable_type' do + let(:args) do + { + 'messageable_type' => 'Wrong::Format::Test', + 'messageable_id' => '123' + } + end + + it 'does not raise an error' do + expect { + PublishJob.new.perform(args) + }.not_to raise_error + end + end + + context 'when job class does not exist' do + let(:args) do + { + 'messageable_type' => 'OutboxerIntegration::Invoice::NonexistentEvent', + 'messageable_id' => '123' + } + end + + it 'does not raise an error' do + allow_any_instance_of(String) + .to receive(:constantize) + .and_raise(NameError.new("uninitialized constant")) + + expect { + PublishJob.new.perform(args) + }.not_to raise_error + end + end + end + end + end +end