diff --git a/.rubocop_todo.yml b/.rubocop_todo.yml index 2911aae..a3eed25 100644 --- a/.rubocop_todo.yml +++ b/.rubocop_todo.yml @@ -9,7 +9,7 @@ # Offense count: 1 # Configuration parameters: AllowedMethods, AllowedPatterns, CountRepeatedAttributes. Metrics/AbcSize: - Max: 18 + Max: 19 # Offense count: 5 # Configuration parameters: CountComments, CountAsOne, AllowedMethods, AllowedPatterns. diff --git a/CHANGELOG.md b/CHANGELOG.md index 053e58c..51ae240 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,17 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ### Fixed +## [6.18.0] - 2025-02-04 + +### Added + +- Add failed item caching in case of unrecoverable database conn issues +- Add `retry_latency` metric to measure retries + +### Fixed + +- Fix item processing cutoff timeout to be less than generic redis lock timeout + ## [6.17.0] - 2025-01-30 ### Added diff --git a/app/interactors/sbmt/outbox/process_item.rb b/app/interactors/sbmt/outbox/process_item.rb index a3be795..a3915d1 100644 --- a/app/interactors/sbmt/outbox/process_item.rb +++ b/app/interactors/sbmt/outbox/process_item.rb @@ -1,6 +1,7 @@ # frozen_string_literal: true require "sbmt/outbox/metrics/utils" +require "sbmt/outbox/v2/redis_item_meta" module Sbmt module Outbox @@ -8,14 +9,16 @@ class ProcessItem < Sbmt::Outbox::DryInteractor param :item_class, reader: :private param :item_id, reader: :private option :worker_version, reader: :private, optional: true, default: -> { 1 } + option :cache_ttl_sec, reader: :private, optional: true, default: -> { 5 * 60 } + option :redis, reader: :private, optional: true, default: -> {} METRICS_COUNTERS = %i[error_counter retry_counter sent_counter fetch_error_counter discarded_counter].freeze - delegate :log_success, :log_info, :log_failure, to: "Sbmt::Outbox.logger" + delegate :log_success, :log_info, :log_failure, :log_debug, to: "Sbmt::Outbox.logger" delegate :item_process_middlewares, to: "Sbmt::Outbox" delegate :box_type, :box_name, :owner, to: :item_class - attr_accessor :process_latency + attr_accessor :process_latency, :retry_latency def call log_success( @@ -26,9 +29,23 @@ def call item = nil item_class.transaction do - item = yield fetch_item + item = yield fetch_item_and_lock_for_update + + cached_item = fetch_redis_item_meta(redis_item_key(item_id)) + if cached_retries_exceeded?(cached_item) + msg = "max retries exceeded: marking item as failed based on cached data: #{cached_item}" + item.set_errors_count(cached_item.errors_count) + track_failed(msg, item) + next Failure(msg) + end + + if cached_greater_errors_count?(item, cached_item) + log_failure("inconsistent item: cached_errors_count:#{cached_item.errors_count} > db_errors_count:#{item.errors_count}: setting errors_count based on cached data:#{cached_item}") + item.set_errors_count(cached_item.errors_count) + end if item.processed_at? + self.retry_latency = Time.current - item.created_at item.config.retry_strategies.each do |retry_strategy| yield check_retry_strategy(item, retry_strategy) end @@ -62,7 +79,48 @@ def call private - def fetch_item + def cached_retries_exceeded?(cached_item) + return false unless cached_item + + item_class.max_retries_exceeded?(cached_item.errors_count) + end + + def cached_greater_errors_count?(db_item, cached_item) + return false unless cached_item + + cached_item.errors_count > db_item.errors_count + end + + def fetch_redis_item_meta(redis_key) + return if worker_version < 2 + + data = redis.call("GET", redis_key) + return if data.blank? + + Sbmt::Outbox::V2::RedisItemMeta.deserialize!(data) + rescue => ex + log_debug("error while fetching redis meta: #{ex.message}") + nil + end + + def set_redis_item_meta(item, ex) + return if worker_version < 2 + return if item.nil? + + redis_key = redis_item_key(item.id) + error_msg = format_exception_error(ex, extract_cause: false) + data = Sbmt::Outbox::V2::RedisItemMeta.new(errors_count: item.errors_count, error_msg: error_msg) + redis.call("SET", redis_key, data.to_s, "EX", cache_ttl_sec) + rescue => ex + log_debug("error while fetching redis meta: #{ex.message}") + nil + end + + def redis_item_key(item_id) + "#{box_type}:#{item_class.box_name}:#{item_id}" + end + + def fetch_item_and_lock_for_update item = item_class .lock("FOR UPDATE") .find_by(id: item_id) @@ -171,6 +229,7 @@ def track_failed(ex_or_msg, item = nil) item.pending! end rescue => e + set_redis_item_meta(item, e) log_error_handling_error(e, item) end @@ -259,6 +318,7 @@ def report_metrics(item) end track_process_latency(labels) if process_latency + track_retry_latency(labels) if retry_latency return unless counters[:sent_counter].positive? @@ -279,6 +339,10 @@ def counters def track_process_latency(labels) Yabeda.outbox.process_latency.measure(labels, process_latency.round(3)) end + + def track_retry_latency(labels) + Yabeda.outbox.retry_latency.measure(labels, retry_latency.round(3)) + end end end end diff --git a/app/models/sbmt/outbox/base_item.rb b/app/models/sbmt/outbox/base_item.rb index 392da3e..ade232e 100644 --- a/app/models/sbmt/outbox/base_item.rb +++ b/app/models/sbmt/outbox/base_item.rb @@ -49,6 +49,17 @@ def bucket_partitions end end end + + def max_retries_exceeded?(count) + return false if config.strict_order + return true unless retriable? + + count > config.max_retries + end + + def retriable? + config.max_retries > 0 + end end enum :status, { @@ -135,20 +146,21 @@ def touch_processed_at end def retriable? - config.max_retries > 0 + self.class.retriable? end def max_retries_exceeded? - return false if config.strict_order - return true unless retriable? - - errors_count > config.max_retries + self.class.max_retries_exceeded?(errors_count) end def increment_errors_counter increment(:errors_count) end + def set_errors_count(count) + self.errors_count = count + end + def add_error(ex_or_msg) increment_errors_counter diff --git a/config/initializers/yabeda.rb b/config/initializers/yabeda.rb index 2539a2c..80d96a6 100644 --- a/config/initializers/yabeda.rb +++ b/config/initializers/yabeda.rb @@ -50,6 +50,12 @@ buckets: [0.005, 0.01, 0.05, 0.1, 0.25, 0.5, 1, 2, 5, 10, 20, 30].freeze, comment: "A histogram for outbox/inbox deletion latency" + histogram :retry_latency, + tags: %i[type name partition owner], + unit: :seconds, + buckets: [1, 10, 20, 50, 120, 300, 900, 1800, 3600].freeze, + comment: "A histogram outbox retry latency" + counter :deleted_counter, tags: %i[box_type box_name], comment: "A counter for the number of deleted outbox/inbox items" diff --git a/lib/sbmt/outbox/engine.rb b/lib/sbmt/outbox/engine.rb index fb4b35f..df3f8a5 100644 --- a/lib/sbmt/outbox/engine.rb +++ b/lib/sbmt/outbox/engine.rb @@ -25,8 +25,8 @@ class Engine < Rails::Engine c.cdn_url = "https://cdn.jsdelivr.net/npm/sbmt-outbox-ui@0.0.8/dist/assets/index.js" end c.process_items = ActiveSupport::OrderedOptions.new.tap do |c| - c.general_timeout = 120 - c.cutoff_timeout = 60 + c.general_timeout = 180 + c.cutoff_timeout = 90 c.batch_size = 200 end c.worker = ActiveSupport::OrderedOptions.new.tap do |c| @@ -54,8 +54,8 @@ class Engine < Rails::Engine end c.processor = ActiveSupport::OrderedOptions.new.tap do |pc| pc.threads_count = 4 - pc.general_timeout = 120 - pc.cutoff_timeout = 60 + pc.general_timeout = 180 + pc.cutoff_timeout = 90 pc.brpop_delay = 1 end diff --git a/lib/sbmt/outbox/v2/processor.rb b/lib/sbmt/outbox/v2/processor.rb index d485f61..407fd80 100644 --- a/lib/sbmt/outbox/v2/processor.rb +++ b/lib/sbmt/outbox/v2/processor.rb @@ -10,7 +10,7 @@ module Outbox module V2 class Processor < BoxProcessor delegate :processor_config, :batch_process_middlewares, :logger, to: "Sbmt::Outbox" - attr_reader :lock_timeout, :brpop_delay + attr_reader :lock_timeout, :cache_ttl, :cutoff_timeout, :brpop_delay REDIS_BRPOP_MIN_DELAY = 0.1 @@ -18,11 +18,16 @@ def initialize( boxes, threads_count: nil, lock_timeout: nil, + cache_ttl: nil, + cutoff_timeout: nil, brpop_delay: nil, redis: nil ) @lock_timeout = lock_timeout || processor_config.general_timeout + @cache_ttl = cache_ttl || @lock_timeout * 10 + @cutoff_timeout = cutoff_timeout || processor_config.cutoff_timeout @brpop_delay = brpop_delay || redis_brpop_delay(boxes.count, processor_config.brpop_delay) + @redis = redis super(boxes: boxes, threads_count: threads_count || processor_config.threads_count, name: "processor", redis: redis) end @@ -66,14 +71,19 @@ def lock_task(scheduled_task) end def process(task) - lock_timer = Cutoff.new(lock_timeout) + lock_timer = Cutoff.new(cutoff_timeout) last_id = 0 strict_order = task.item_class.config.strict_order box_worker.item_execution_runtime.measure(task.yabeda_labels) do Outbox.database_switcher.use_master do task.ids.each do |id| - result = ProcessItem.call(task.item_class, id, worker_version: task.yabeda_labels[:worker_version]) + result = ProcessItem.call( + task.item_class, id, + worker_version: task.yabeda_labels[:worker_version], + cache_ttl_sec: cache_ttl, + redis: @redis + ) box_worker.job_items_counter.increment(task.yabeda_labels) last_id = id diff --git a/lib/sbmt/outbox/v2/redis_item_meta.rb b/lib/sbmt/outbox/v2/redis_item_meta.rb new file mode 100644 index 0000000..ca51e21 --- /dev/null +++ b/lib/sbmt/outbox/v2/redis_item_meta.rb @@ -0,0 +1,46 @@ +# frozen_string_literal: true + +module Sbmt + module Outbox + module V2 + class RedisItemMeta + attr_reader :version, :timestamp, :errors_count, :error_msg + + CURRENT_VERSION = 1 + MAX_ERROR_LEN = 200 + + def initialize(errors_count:, error_msg:, timestamp: Time.current.to_i, version: CURRENT_VERSION) + @errors_count = errors_count + @error_msg = error_msg + @timestamp = timestamp + @version = version + end + + def to_s + serialize + end + + def serialize + JSON.generate({ + version: version, + timestamp: timestamp, + errors_count: errors_count, + error_msg: error_msg.slice(0, MAX_ERROR_LEN) + }) + end + + def self.deserialize!(value) + raise "invalid data type: string is required" unless value.is_a?(String) + + data = JSON.parse!(value, max_nesting: 1) + new( + version: data["version"], + timestamp: data["timestamp"].to_i, + errors_count: data["errors_count"].to_i, + error_msg: data["error_msg"] + ) + end + end + end + end +end diff --git a/lib/sbmt/outbox/version.rb b/lib/sbmt/outbox/version.rb index b5cc71d..b089199 100644 --- a/lib/sbmt/outbox/version.rb +++ b/lib/sbmt/outbox/version.rb @@ -2,6 +2,6 @@ module Sbmt module Outbox - VERSION = "6.17.0" + VERSION = "6.18.0" end end diff --git a/spec/interactors/sbmt/outbox/process_item_spec.rb b/spec/interactors/sbmt/outbox/process_item_spec.rb index c964df8..13386e6 100644 --- a/spec/interactors/sbmt/outbox/process_item_spec.rb +++ b/spec/interactors/sbmt/outbox/process_item_spec.rb @@ -4,8 +4,10 @@ describe Sbmt::Outbox::ProcessItem do describe "#call" do - subject(:result) { described_class.call(OutboxItem, outbox_item.id) } + subject(:result) { described_class.call(OutboxItem, outbox_item.id, worker_version: worker_version, redis: redis) } + let(:redis) { nil } + let(:worker_version) { 1 } let(:max_retries) { 0 } let(:producer) { instance_double(Producer, call: true) } let(:dummy_middleware_class) { instance_double(Class, new: dummy_middleware) } @@ -35,6 +37,97 @@ end end + context "when outbox item is being processed concurrently" do + let(:outbox_item) { create(:outbox_item) } + let(:error_msg) { "Mysql2::Error::TimeoutError: Lock wait timeout exceeded; try restarting transaction" } + + before do + allow(OutboxItem).to receive(:lock).and_raise( + ActiveRecord::LockWaitTimeout.new(error_msg) + ) + end + + it "logs failure" do + expect(Sbmt::Outbox.error_tracker).to receive(:error) + allow(Sbmt::Outbox.logger).to receive(:log_failure) + expect(result.failure).to eq(error_msg) + expect(Sbmt::Outbox.logger) + .to have_received(:log_failure) + .with(/#{error_msg}/, stacktrace: kind_of(String)) + end + + it "does not call middleware" do + result + expect(dummy_middleware).not_to have_received(:call) + end + + it "tracks Yabeda error counter" do + expect { result }.to increment_yabeda_counter(Yabeda.outbox.fetch_error_counter).by(1) + end + end + + context "when there is cached item data" do + let(:redis) { RedisClient.new(url: ENV["REDIS_URL"]) } + let(:cached_errors_count) { 99 } + let(:db_errors_count) { 1 } + let(:max_retries) { 7 } + + before do + data = Sbmt::Outbox::V2::RedisItemMeta.new(errors_count: cached_errors_count, error_msg: "Some error") + redis.call("SET", "outbox:outbox_item:#{outbox_item.id}", data.to_s) + + allow_any_instance_of(Sbmt::Outbox::OutboxItemConfig).to receive(:max_retries).and_return(max_retries) + end + + context "when worker_version is 1" do + let(:outbox_item) { create(:outbox_item) } + let(:worker_version) { 1 } + + it "does not use cached data" do + expect { result }.not_to change { outbox_item.reload.errors_count } + end + end + + context "when worker_version is 2" do + let(:outbox_item) { create(:outbox_item, errors_count: db_errors_count) } + let(:worker_version) { 2 } + + before do + allow(Sbmt::Outbox.logger).to receive(:log_failure) + end + + context "when cached errors_count exceed max retries" do + it "increments cached errors count and marks items as failed" do + expect(Sbmt::Outbox.logger).to receive(:log_failure).with(/max retries exceeded: marking item as failed based on cached data/, any_args) + expect { result } + .to change { outbox_item.reload.errors_count }.from(1).to(100) + .and change { outbox_item.reload.status }.from("pending").to("failed") + end + end + + context "when cached errors_count is greater" do + let(:cached_errors_count) { 2 } + + it "sets errors_count based on cached data" do + expect(Sbmt::Outbox.logger).to receive(:log_failure).with(/inconsistent item: cached_errors_count:2 > db_errors_count:1: setting errors_count based on cached data/, any_args) + expect { result } + .to change { outbox_item.reload.errors_count }.from(1).to(2) + .and change { outbox_item.reload.status }.from("pending").to("delivered") + end + end + + context "when cached errors_count is less" do + let(:cached_errors_count) { 0 } + + it "sets errors_count based on db data" do + expect { result } + .to not_change { outbox_item.reload.errors_count } + .and change { outbox_item.reload.status }.from("pending").to("delivered") + end + end + end + end + context "when outbox item is not in pending state" do let(:outbox_item) do create( @@ -172,7 +265,8 @@ let!(:outbox_item) { create(:outbox_item, processed_at: Time.current) } it "doesn't track process_latency" do - expect { result }.not_to measure_yabeda_histogram(Yabeda.outbox.process_latency) + expect { result }.to measure_yabeda_histogram(Yabeda.outbox.retry_latency) + .and not_measure_yabeda_histogram(Yabeda.outbox.process_latency) end end end @@ -214,6 +308,8 @@ end context "when error persisting fails" do + let(:redis) { RedisClient.new(url: ENV["REDIS_URL"]) } + before do allow_any_instance_of(OutboxItem).to receive(:failed!).and_raise("boom") end @@ -239,6 +335,84 @@ it "tracks Yabeda error counter" do expect { result }.to increment_yabeda_counter(Yabeda.outbox.error_counter).by(1) end + + context "when worker_version is 1" do + let(:worker_version) { 1 } + + it "skips item state caching" do + result + data = redis.call("GET", "outbox:outbox_item:#{outbox_item.id}") + expect(data).to be_nil + end + end + + context "when worker_version is 2" do + let(:worker_version) { 2 } + + context "when there is no cached item state" do + it "caches item state in redis" do + result + data = redis.call("GET", "outbox:outbox_item:#{outbox_item.id}") + deserialized = JSON.parse(data) + expect(deserialized["timestamp"]).to be_an_integer + expect(deserialized).to include( + "error_msg" => "RuntimeError boom", + "errors_count" => 1, + "version" => 1 + ) + end + + it "sets ttl for item state data" do + result + res = redis.call("EXPIRETIME", "outbox:outbox_item:#{outbox_item.id}") + expect(res).to be > 0 + end + end + + context "when there is cached item state with greater errors_count" do + before do + data = Sbmt::Outbox::V2::RedisItemMeta.new(errors_count: 2, error_msg: "Some previous error") + redis.call("SET", "outbox:outbox_item:#{outbox_item.id}", data.to_s) + end + + it "caches item state in redis based on cached errors_count" do + result + data = redis.call("GET", "outbox:outbox_item:#{outbox_item.id}") + deserialized = JSON.parse(data) + expect(deserialized["timestamp"]).to be_an_integer + expect(deserialized).to include( + "error_msg" => "RuntimeError boom", + "errors_count" => 3, + "version" => 1 + ) + end + + it "sets ttl for item state data" do + result + res = redis.call("EXPIRETIME", "outbox:outbox_item:#{outbox_item.id}") + expect(res).to be > 0 + end + end + + context "when there is cached item state with le/eq errors_count" do + before do + data = Sbmt::Outbox::V2::RedisItemMeta.new(errors_count: 0, error_msg: "Some previous error") + redis.call("SET", "outbox:outbox_item:#{outbox_item.id}", data.to_s) + end + + it "caches item state in redis based on db errors_count" do + result + data = redis.call("GET", "outbox:outbox_item:#{outbox_item.id}") + deserialized = JSON.parse(data) + expect(deserialized["timestamp"]).to be_an_integer + expect(deserialized).to include( + "error_msg" => "RuntimeError boom", + "errors_count" => 1, + "version" => 1 + ) + end + end + end end end diff --git a/spec/lib/sbmt/outbox/v2/processor_spec.rb b/spec/lib/sbmt/outbox/v2/processor_spec.rb index d7dee4b..b7f8b27 100644 --- a/spec/lib/sbmt/outbox/v2/processor_spec.rb +++ b/spec/lib/sbmt/outbox/v2/processor_spec.rb @@ -9,12 +9,16 @@ let(:processor) do described_class.new( boxes, - lock_timeout: 1, - threads_count: 1, + lock_timeout: lock_timeout, + cutoff_timeout: cutoff_timeout, + threads_count: threads_count, redis: redis ) end + let(:lock_timeout) { 1 } + let(:cutoff_timeout) { nil } + let(:threads_count) { 1 } let(:redis) { instance_double(RedisClient) } context "when initialized" do @@ -49,6 +53,8 @@ let!(:item2) { create(:inbox_item, bucket: 0) } before do + allow(redis).to receive(:call).with("GET", "inbox:inbox_item:#{item1.id}").and_return(nil) + allow(redis).to receive(:call).with("GET", "inbox:inbox_item:#{item2.id}").and_return(nil) allow(redis).to receive(:blocking_call).with(1.1, "BRPOP", "inbox_item:job_queue", 0.1).and_return(%W[inbox_item:job_queue 0:#{Time.current.to_i}:#{item1.id},#{item2.id}]) end @@ -78,6 +84,29 @@ expect(processor.start).to be(Sbmt::Outbox::V2::ThreadPool::SKIPPED) end + context "when cutoff times out after first processed item in batch" do + let(:lock_timeout) { 10 } + let(:cutoff_timeout) { 1 } + let(:cutoff_exception) { Cutoff::CutoffExceededError.new(Cutoff.new(cutoff_timeout)) } + + before do + allow_any_instance_of(Cutoff).to receive(:checkpoint!).and_raise(cutoff_exception) + allow(Sbmt::Outbox.logger).to receive(:log_info) + end + + it "processes only first item" do + expect(processor.send(:lock_manager)).to receive(:lock) + .with("sbmt:outbox:processor:inbox_item:0:lock", 10000) + .and_yield(task) + expect(Sbmt::Outbox.logger).to receive(:log_info).with(/Lock timeout while processing/) + + expect { processor.start } + .to change(InboxItem.delivered, :count).from(0).to(1) + .and increment_yabeda_counter(Yabeda.box_worker.job_items_counter).with_tags(name: "inbox_item", type: :inbox, worker_name: "processor", worker_version: 2).by(1) + .and increment_yabeda_counter(Yabeda.box_worker.job_timeout_counter).with_tags(name: "inbox_item", type: :inbox, worker_name: "processor", worker_version: 2).by(1) + end + end + context "when use option strict_order" do context "when strict_order is true" do before do