diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 9a72bab..ff6ba70 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -27,6 +27,8 @@ tests: DATABASE_URL: postgres://postgres:secret@postgres:5432 REDIS_URL: redis://redis:6379/0 before_script: + - gem sources --remove https://rubygems.org/ + - gem sources --add https://nexus.sbmt.io/repository/rubygems/ - gem update --system 3.4.22 - bin/setup script: diff --git a/CHANGELOG.md b/CHANGELOG.md index f633952..8db8b07 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,27 +13,33 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ### Fixed +## [6.10.3] - 2024-10-22 + +### Fixed + +- fix deleting stale items from MySQL and PostgreSQL + ## [6.10.2] - 2024-09-30 ### Fixed - + - change `DEFAULT_PARTITION_STRATEGY` to string ## [6.10.1] - 2024-09-23 ### Fixed - + - log OTEL `trace_id` ## [6.10.0] - 2024-09-19 ### Changed -- Renamed `backtrace` log tag to `stacktrace` +- Renamed `backtrace` log tag to `stacktrace` ### Fixed -- Fixed handling of errors if database is not available +- Fixed handling of errors if database is not available ## [6.9.0] - 2024-09-13 diff --git a/Gemfile b/Gemfile index be173b2..f1c034f 100644 --- a/Gemfile +++ b/Gemfile @@ -1,5 +1,5 @@ # frozen_string_literal: true -source "https://rubygems.org" +source ENV.fetch("NEXUS_PUBLIC_SOURCE_URL", "https://rubygems.org") gemspec diff --git a/app/jobs/sbmt/outbox/base_delete_stale_items_job.rb b/app/jobs/sbmt/outbox/base_delete_stale_items_job.rb index f7d345a..85bc518 100644 --- a/app/jobs/sbmt/outbox/base_delete_stale_items_job.rb +++ b/app/jobs/sbmt/outbox/base_delete_stale_items_job.rb @@ -7,8 +7,8 @@ module Outbox class BaseDeleteStaleItemsJob < Outbox.active_job_base_class MIN_RETENTION_PERIOD = 1.day LOCK_TTL = 10_800_000 - BATCH_SIZE = 1000 - SLEEP_TIME = 1 + BATCH_SIZE = 1_000 + SLEEP_TIME = 0.5 class << self def enqueue @@ -25,7 +25,7 @@ def item_classes delegate :config, :logger, to: "Sbmt::Outbox" delegate :box_type, :box_name, to: :item_class - attr_accessor :item_class + attr_accessor :item_class, :lock_timer def perform(item_class_name) self.item_class = item_class_name.constantize @@ -36,6 +36,7 @@ def perform(item_class_name) Redis.new(config.redis) end + self.lock_timer = Cutoff.new(LOCK_TTL / 1000) lock_manager = Redlock::Client.new([client], retry_count: 0) lock_manager.lock("#{self.class.name}:#{item_class_name}:lock", LOCK_TTL) do |locked| @@ -51,6 +52,8 @@ def perform(item_class_name) logger.log_info("Failed to acquire lock #{self.class.name}:#{item_class_name}") end end + rescue Cutoff::CutoffExceededError + logger.log_info("Lock timeout while processing #{item_class_name}") end private @@ -64,17 +67,94 @@ def validate_retention!(duration) def delete_stale_items(waterline) logger.log_info("Start deleting #{box_type} items for #{box_name} older than #{waterline}") + case database_type + when :postgresql + postgres_delete_in_batches(waterline) + when :mysql + mysql_delete_in_batches(waterline) + else + raise "Unsupported database type" + end + + logger.log_info("Successfully deleted #{box_type} items for #{box_name} older than #{waterline}") + end + + # Deletes stale items from PostgreSQL database in batches + # + # This method efficiently deletes items older than the given waterline + # using a subquery approach to avoid locking large portions of the table. + # + # + # Example SQL generated for deletion: + # DELETE FROM "items" + # WHERE "items"."id" IN ( + # SELECT "items"."id" + # FROM "items" + # WHERE "items"."created_at" < '2023-05-01 00:00:00' + # LIMIT 1000 + # ) + def postgres_delete_in_batches(waterline) + table = item_class.arel_table + condition = table[:created_at].lt(waterline) + subquery = table + .project(table[:id]) + .where(condition) + .take(BATCH_SIZE) + + delete_statement = Arel::Nodes::DeleteStatement.new + delete_statement.relation = table + delete_statement.wheres = [table[:id].in(subquery)] + loop do - ids = Outbox.database_switcher.use_slave do - item_class.where("created_at < ?", waterline).limit(BATCH_SIZE).ids - end - break if ids.empty? + deleted_count = item_class + .connection + .execute(delete_statement.to_sql) + .cmd_tuples + + logger.log_info("Deleted #{deleted_count} #{box_type} items for #{box_name} items") + break if deleted_count == 0 + lock_timer.checkpoint! + sleep(SLEEP_TIME) + end + end - item_class.where(id: ids).delete_all - sleep SLEEP_TIME + # Deletes stale items from MySQL database in batches + # + # This method efficiently deletes items older than the given waterline + # using MySQL's built-in LIMIT clause for DELETE statements. + # + # The main difference from the PostgreSQL method is that MySQL allows + # direct use of LIMIT in DELETE statements, simplifying the query. + # This approach doesn't require a subquery, making it more straightforward. + # + # Example SQL generated for deletion: + # DELETE FROM `items` + # WHERE `items`.`created_at` < '2023-05-01 00:00:00' + # LIMIT 1000 + def mysql_delete_in_batches(waterline) + loop do + deleted_count = item_class + .where("created_at < ?", waterline) + .limit(BATCH_SIZE) + .delete_all + + logger.log_info("Deleted #{deleted_count} #{box_type} items for #{box_name} items") + break if deleted_count == 0 + lock_timer.checkpoint! + sleep(SLEEP_TIME) end + end - logger.log_info("Successfully deleted #{box_type} items for #{box_name} older than #{waterline}") + def database_type + adapter_name = item_class.connection.adapter_name.downcase + case adapter_name + when "postgresql" + :postgresql + when "mysql2" + :mysql + else + :unknown + end end end end diff --git a/lib/sbmt/outbox/version.rb b/lib/sbmt/outbox/version.rb index 21782b8..92a6d40 100644 --- a/lib/sbmt/outbox/version.rb +++ b/lib/sbmt/outbox/version.rb @@ -2,6 +2,6 @@ module Sbmt module Outbox - VERSION = "6.10.2" + VERSION = "6.10.3" end end diff --git a/sbmt-outbox.gemspec b/sbmt-outbox.gemspec index 5a33a62..9c18199 100644 --- a/sbmt-outbox.gemspec +++ b/sbmt-outbox.gemspec @@ -53,10 +53,9 @@ Gem::Specification.new do |s| s.add_development_dependency "rspec-rails" s.add_development_dependency "rspec_junit_formatter" s.add_development_dependency "rubocop" - s.add_development_dependency "rubocop-rails" - s.add_development_dependency "rubocop-rspec" - s.add_development_dependency "rubocop-performance" - s.add_development_dependency "standard", ">= 1.7" + s.add_development_dependency "rubocop-rails", ">= 2.5" + s.add_development_dependency "rubocop-rspec", ">= 2.11" + s.add_development_dependency "standard", ">= 1.12" s.add_development_dependency "schked", ">= 0.3", "< 2" s.add_development_dependency "zeitwerk" s.add_development_dependency "sentry-rails", "> 5.2.0" diff --git a/spec/jobs/sbmt/outbox/base_delete_stale_items_job_spec.rb b/spec/jobs/sbmt/outbox/base_delete_stale_items_job_spec.rb index e9b4b86..a796e64 100644 --- a/spec/jobs/sbmt/outbox/base_delete_stale_items_job_spec.rb +++ b/spec/jobs/sbmt/outbox/base_delete_stale_items_job_spec.rb @@ -12,8 +12,13 @@ def item_classes end let!(:item) { create(:outbox_item, created_at: created_at) } + let!(:item_2) { create(:outbox_item, created_at: created_at) } let(:created_at) { 1.month.ago } + before do + stub_const("Sbmt::Outbox::BaseDeleteStaleItemsJob::BATCH_SIZE", 1) + end + describe ".enqueue" do it "enqueue all item classes" do expect { job_class.enqueue }.to have_enqueued_job(job_class).with("OutboxItem") @@ -22,7 +27,7 @@ def item_classes it "deletes item" do expect { job_class.perform_now("OutboxItem") } - .to change(OutboxItem, :count).by(-1) + .to change(OutboxItem, :count).by(-2) end context "when item is too young" do