diff --git a/CHANGELOG.md b/CHANGELOG.md index 88a18d2..3f80d4e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,15 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ### Fixed +## [6.15.0] - 2025-01-23 + +### Added + +- Add rake tasks: + - rake outbox:delete_items + - rake outbox:update_status_items + + ## [6.14.0] - 2025-01-20 ### Added diff --git a/README.md b/README.md index 3ff53c0..0917ab9 100644 --- a/README.md +++ b/README.md @@ -439,6 +439,42 @@ outbox_items: partition_strategy: hash ``` +## Rake tasks + +```shell +rake outbox:delete_items +rake outbox:update_status_items +``` + +Example run: +```shell +rake outbox:delete_items[OutboxItem,1] # Mandatory parameters box class and status +rake outbox:update_status_items[OutboxItem,0,3] # Mandatory parameters box class, current status and new status + +``` + +Both tasks have optional parameters: +```ruby +- start_time # boxes are younger than the specified time, by default nil, time is specified in the format "2025-01-05T23:59:59" +- end_time # boxes are older than the specified time, by default 6.hours.ago, time is specified in the format "2025-01-05T23:59:59" +- batch_size # batch size, by default 1_000 +- sleep_time # sleep time between batches, by default 0.5 +``` + +Example with optional parameters: + - format optional parameters: + ```shell + rake outbox:delete_items[klass_name,status,start_time,end_time,batch_size,sleep_time] + + rake outbox:update_status_items[klass_name,status,new_status,start_time,end_time,batch_size,sleep_time] + ``` + - example: + ```shell + rake outbox:delete_items[OutboxItem,1,"2025-01-05T23:59:59","2025-01-05T00:00:00",10_000,5] + + rake outbox:update_status_items[OutboxItem,0,3,"2025-01-05T23:59:59","2025-01-05T00:00:00",10_000,5] + ``` + ## Concurrency The worker process consists of a poller and a processor, each of which has its own thread pool. diff --git a/lib/sbmt/outbox/engine.rb b/lib/sbmt/outbox/engine.rb index 4a9fe2d..13e25f1 100644 --- a/lib/sbmt/outbox/engine.rb +++ b/lib/sbmt/outbox/engine.rb @@ -79,6 +79,8 @@ class Engine < Rails::Engine rake_tasks do load "sbmt/outbox/tasks/retry_failed_items.rake" load "sbmt/outbox/tasks/delete_failed_items.rake" + load "sbmt/outbox/tasks/delete_items.rake" + load "sbmt/outbox/tasks/update_status_items.rake" end end end diff --git a/lib/sbmt/outbox/tasks/delete_items.rake b/lib/sbmt/outbox/tasks/delete_items.rake new file mode 100644 index 0000000..bb9ca37 --- /dev/null +++ b/lib/sbmt/outbox/tasks/delete_items.rake @@ -0,0 +1,43 @@ +# frozen_string_literal: true + +namespace :outbox do + desc "Delete outbox/inbox items" + task :delete_items, [:klass_name, :status, :start_time, :end_time, :batch_size, :sleep_time] => :environment do |_, args| + args.with_defaults(start_time: nil, end_time: 6.hours.ago, batch_size: 1000, sleep_time: 0.5) + + klass_name = args[:klass_name] + status = args[:status] + start_time = args[:start_time] + end_time = args[:end_time] + batch_size = args[:batch_size] + sleep_time = args[:sleep_time] + + unless klass_name && status + raise "Error: Class and status must be specified. Example: rake outbox:delete_items[OutboxItem,1]" + end + + klass_name = klass_name.constantize + query = klass_name.where(status: status) + + if start_time && end_time + query = query.where(created_at: start_time..end_time) + elsif start_time + query = query.where(created_at: start_time..) + elsif end_time + query = query.where(created_at: ..end_time) + end + + total_deleted = 0 + query.in_batches(of: batch_size) do |batch| + deleted_count = batch.delete_all + + Rails.logger.info("Batch items deleted: #{deleted_count}") + + total_deleted += deleted_count + + sleep sleep_time + end + + Rails.logger.info("Total items deleted: #{total_deleted}") + end +end diff --git a/lib/sbmt/outbox/tasks/update_status_items.rake b/lib/sbmt/outbox/tasks/update_status_items.rake new file mode 100644 index 0000000..e6a805a --- /dev/null +++ b/lib/sbmt/outbox/tasks/update_status_items.rake @@ -0,0 +1,43 @@ +# frozen_string_literal: true + +namespace :outbox do + desc "Update status of outbox/inbox items" + task :update_status_items, [:klass_name, :status, :new_status, :start_time, :end_time, :batch_size, :sleep_time] => :environment do |_, args| + args.with_defaults(start_time: nil, end_time: 6.hours.ago, batch_size: 1000, sleep_time: 0.5) + + klass_name = args[:klass_name] + status = args[:status] + new_status = args[:new_status] + start_time = args[:start_time] + end_time = args[:end_time] + batch_size = args[:batch_size] + sleep_time = args[:sleep_time] + + unless klass_name && status && new_status + raise "Error: Class, current status, and new status must be specified. Example: rake outbox:update_status_items[OutboxItem,0,3]" + end + + klass_name = klass_name.constantize + query = klass_name.where(status: status) + + if start_time && end_time + query = query.where(created_at: start_time..end_time) + elsif start_time + query = query.where(created_at: start_time..) + elsif end_time + query = query.where(created_at: ..end_time) + end + + total_updated = 0 + query.in_batches(of: batch_size) do |batch| + updated_count = batch.update_all(status: new_status) + + Rails.logger.info("Batch items updated: #{updated_count}") + + total_updated += updated_count + sleep sleep_time + end + + Rails.logger.info("Total items updated: #{total_updated}") + end +end diff --git a/lib/sbmt/outbox/version.rb b/lib/sbmt/outbox/version.rb index 660618b..77acc6b 100644 --- a/lib/sbmt/outbox/version.rb +++ b/lib/sbmt/outbox/version.rb @@ -2,6 +2,6 @@ module Sbmt module Outbox - VERSION = "6.14.0" + VERSION = "6.15.0" end end diff --git a/spec/lib/sbmt/outbox/tasks/delete_items_spec.rb b/spec/lib/sbmt/outbox/tasks/delete_items_spec.rb new file mode 100644 index 0000000..cb96bd4 --- /dev/null +++ b/spec/lib/sbmt/outbox/tasks/delete_items_spec.rb @@ -0,0 +1,60 @@ +# frozen_string_literal: true + +describe "rake outbox:delete_items" do + subject(:task) { Rake::Task["outbox:delete_items"] } + + let(:klass) { "OutboxItem" } + let(:status) { 1 } + + let(:created_at_a) { 6.hours.ago } + let(:created_at_b) { 8.hours.ago } + let(:created_at_c) { 4.hours.ago } + + let!(:outbox_item_a) { create(:outbox_item, status: :failed, errors_count: 1, created_at: created_at_a) } + let!(:outbox_item_b) { create(:outbox_item, status: :failed, errors_count: 1, created_at: created_at_b) } + let!(:outbox_item_c) { create(:outbox_item, status: :delivered, errors_count: 0, created_at: created_at_c) } + + before do + task.reenable + allow(Rails.logger).to receive(:info) + end + + context "when filtering records by status" do + let(:created_at_a) { Time.zone.now } + let(:created_at_b) { 6.hours.ago } + let(:created_at_c) { Time.zone.now } + + it "deletes records matching the given status" do + expect { + task.invoke(klass, status) + }.to change(OutboxItem, :count).by(-1) + + expect(Rails.logger).to have_received(:info).with(/Batch items deleted: 1/) + expect(Rails.logger).to have_received(:info).with(/Total items deleted: 1/) + end + end + + context "when filtering records by time range" do + let(:start_time) { 7.hours.ago } + let(:end_time) { 5.hours.ago } + + it "deletes records within the specified time range" do + expect { + task.invoke(klass, status, start_time, end_time) + }.to change(OutboxItem, :count).by(-1) + + expect(Rails.logger).to have_received(:info).with(/Batch items deleted: 1/) + expect(Rails.logger).to have_received(:info).with(/Total items deleted: 1/) + end + end + + context "when required parameters are missing" do + it "raises an error" do + expect { + task.invoke(nil, status) + }.to raise_error("Error: Class and status must be specified. Example: rake outbox:delete_items[OutboxItem,1]") + + expect(Rails.logger).not_to have_received(:info) + end + end +end diff --git a/spec/lib/sbmt/outbox/tasks/update_status_items_spec.rb b/spec/lib/sbmt/outbox/tasks/update_status_items_spec.rb new file mode 100644 index 0000000..8780548 --- /dev/null +++ b/spec/lib/sbmt/outbox/tasks/update_status_items_spec.rb @@ -0,0 +1,71 @@ +# frozen_string_literal: true + +describe "rake outbox:update_status_items" do + subject(:task) { Rake::Task["outbox:update_status_items"] } + + let(:klass) { "OutboxItem" } + let(:status) { 1 } + let(:new_status) { 3 } + + let(:created_at_a) { 6.hours.ago } + let(:created_at_b) { 8.hours.ago } + let(:created_at_c) { 4.hours.ago } + + let!(:outbox_item_a) { create(:outbox_item, status: :failed, errors_count: 1, created_at: created_at_a) } + let!(:outbox_item_b) { create(:outbox_item, status: :failed, errors_count: 1, created_at: created_at_b) } + let!(:outbox_item_c) { create(:outbox_item, status: :delivered, errors_count: 0, created_at: created_at_c) } + + before do + task.reenable + allow(Rails.logger).to receive(:info) + end + + context "when filtering records by status" do + let(:created_at_a) { Time.zone.now } + let(:created_at_b) { 6.hours.ago } + let(:created_at_c) { Time.zone.now } + + it "updates records matching the given status" do + expect { + task.invoke(klass, status, new_status) + outbox_item_a.reload + outbox_item_b.reload + outbox_item_c.reload + }.to change(outbox_item_b, :status).from("failed").to("discarded") + .and not_change { outbox_item_a.status } + .and not_change { outbox_item_c.status } + + expect(Rails.logger).to have_received(:info).with(/Batch items updated: 1/) + expect(Rails.logger).to have_received(:info).with(/Total items updated: 1/) + end + end + + context "when filtering records by time range" do + let(:start_time) { 7.hours.ago } + let(:end_time) { 5.hours.ago } + + it "updates records within the specified time range" do + expect { + task.invoke(klass, status, new_status, start_time, end_time) + outbox_item_a.reload + outbox_item_b.reload + outbox_item_c.reload + }.to change(outbox_item_a, :status).from("failed").to("discarded") + .and not_change { outbox_item_b.status } + .and not_change { outbox_item_c.status } + + expect(Rails.logger).to have_received(:info).with(/Batch items updated: 1/) + expect(Rails.logger).to have_received(:info).with(/Total items updated: 1/) + end + end + + context "when required parameters are missing" do + it "raises an error" do + expect { + task.invoke(nil, status, new_status) + }.to raise_error("Error: Class, current status, and new status must be specified. Example: rake outbox:update_status_items[OutboxItem,0,3]") + + expect(Rails.logger).not_to have_received(:info) + end + end +end