Skip to content

Commit dc4b4d1

Browse files
committed
[DEX-2722] feat: reduce retention time successful outbox/inbox items
1 parent e5c1d01 commit dc4b4d1

File tree

7 files changed

+74
-30
lines changed

7 files changed

+74
-30
lines changed

CHANGELOG.md

+11
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,17 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
1313

1414
### Fixed
1515

16+
## [6.12.0] - 2025-01-10
17+
18+
### Added
19+
20+
- Random seconds have been added to the item removal tasks to prevent tasks from running simultaneously
21+
- Add option `retention_delivered_items` to remove successful items. Default equals option `retention`
22+
23+
### Changed
24+
25+
- `retention` removes items with statuses: `failed` and `discarded`
26+
1627
## [6.11.0] - 2024-12-23
1728

1829
### Added

README.md

+4-2
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,8 @@ default: &default
267267
outbox_items: # outbox items section
268268
my_outbox_item: # underscored model class name
269269
owner: my_outbox_item_team # optional, used in Yabeda metrics
270-
retention: P1W # retention period, https://en.wikipedia.org/wiki/ISO_8601#Durations
270+
retention: P1W # for statuses: failed and discarded, retention period, https://en.wikipedia.org/wiki/ISO_8601#Durations
271+
retention_delivered_items: PT6H # for statuses: delivered, retention period for delivered items, https://en.wikipedia.org/wiki/ISO_8601#Durations
271272
max_retries: 3 # default 0, the number of retries before the item will be marked as failed
272273
strict_order: false # optional, default
273274
transports: # transports section
@@ -342,7 +343,8 @@ end
342343
inbox_items: # inbox items section
343344
my_inbox_item: # underscored model class name
344345
owner: my_inbox_item_team # optional, used in Yabeda metrics
345-
retention: P1W # retention period, https://en.wikipedia.org/wiki/ISO_8601#Durations
346+
retention: P1W # for statuses: failed and discarded, retention period, https://en.wikipedia.org/wiki/ISO_8601#Durations
347+
retention_delivered_items: PT6H # for statuses: delivered, retention period for delivered items, https://en.wikipedia.org/wiki/ISO_8601#Durations
346348
max_retries: 3 # default 0, the number of retries before the item will be marked as failed
347349
transports: # transports section
348350
import_order: # underscored transport class name

app/jobs/sbmt/outbox/base_delete_stale_items_job.rb

+43-20
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ class BaseDeleteStaleItemsJob < Outbox.active_job_base_class
1313
class << self
1414
def enqueue
1515
item_classes.each do |item_class|
16-
perform_later(item_class.to_s)
16+
delay = rand(15).seconds
17+
set(wait: delay).perform_later(item_class.to_s)
1718
end
1819
end
1920

@@ -41,12 +42,13 @@ def perform(item_class_name)
4142

4243
lock_manager.lock("#{self.class.name}:#{item_class_name}:lock", LOCK_TTL) do |locked|
4344
if locked
44-
duration = item_class.config.retention
45+
duration_failed = item_class.config.retention
46+
duration_delivered = item_class.config.retention_delivered_items
4547

46-
validate_retention!(duration)
48+
validate_retention!(duration_failed)
4749

4850
logger.with_tags(box_type: box_type, box_name: box_name) do
49-
delete_stale_items(Time.current - duration)
51+
delete_stale_items(Time.current - duration_failed, Time.current - duration_delivered)
5052
end
5153
else
5254
logger.log_info("Failed to acquire lock #{self.class.name}:#{item_class_name}")
@@ -58,25 +60,25 @@ def perform(item_class_name)
5860

5961
private
6062

61-
def validate_retention!(duration)
62-
return if duration >= MIN_RETENTION_PERIOD
63+
def validate_retention!(duration_failed)
64+
return if duration_failed >= MIN_RETENTION_PERIOD
6365

6466
raise "Retention period for #{box_name} must be longer than #{MIN_RETENTION_PERIOD.inspect}"
6567
end
6668

67-
def delete_stale_items(waterline)
68-
logger.log_info("Start deleting #{box_type} items for #{box_name} older than #{waterline}")
69+
def delete_stale_items(waterline_failed, waterline_delivered)
70+
logger.log_info("Start deleting #{box_type} items for #{box_name} older than: failed and discarded items #{waterline_failed} and delivered items #{waterline_delivered}")
6971

7072
case database_type
7173
when :postgresql
72-
postgres_delete_in_batches(waterline)
74+
postgres_delete_in_batches(waterline_failed, waterline_delivered)
7375
when :mysql
74-
mysql_delete_in_batches(waterline)
76+
mysql_delete_in_batches(waterline_failed, waterline_delivered)
7577
else
7678
raise "Unsupported database type"
7779
end
7880

79-
logger.log_info("Successfully deleted #{box_type} items for #{box_name} older than #{waterline}")
81+
logger.log_info("Successfully deleted #{box_type} items for #{box_name} older than: failed and discarded items #{waterline_failed} and delivered items #{waterline_delivered}")
8082
end
8183

8284
# Deletes stale items from PostgreSQL database in batches
@@ -90,12 +92,22 @@ def delete_stale_items(waterline)
9092
# WHERE "items"."id" IN (
9193
# SELECT "items"."id"
9294
# FROM "items"
93-
# WHERE "items"."created_at" < '2023-05-01 00:00:00'
95+
# WHERE (
96+
# "items"."status" = 1 AND "items"."created_at" < '2023-05-01 00:00:00'
97+
# )
9498
# LIMIT 1000
9599
# )
96-
def postgres_delete_in_batches(waterline)
100+
def postgres_delete_in_batches(waterline_failed, waterline_delivered)
97101
table = item_class.arel_table
98-
condition = table[:created_at].lt(waterline)
102+
103+
status_delivered = item_class.statuses[:delivered]
104+
status_failed_discarded = item_class.statuses.values_at(:failed, :discarded)
105+
106+
delete_items_in_batches(table, table[:status].eq(status_delivered).and(table[:created_at].lt(waterline_delivered)))
107+
delete_items_in_batches(table, table[:status].in(status_failed_discarded).and(table[:created_at].lt(waterline_failed)))
108+
end
109+
110+
def delete_items_in_batches(table, condition)
99111
subquery = table
100112
.project(table[:id])
101113
.where(condition)
@@ -129,14 +141,25 @@ def postgres_delete_in_batches(waterline)
129141
#
130142
# Example SQL generated for deletion:
131143
# DELETE FROM `items`
132-
# WHERE `items`.`created_at` < '2023-05-01 00:00:00'
144+
# WHERE (
145+
# `items`.`status` = 1 AND `items`.`created_at` < '2023-05-01 00:00:00'
146+
# )
133147
# LIMIT 1000
134-
def mysql_delete_in_batches(waterline)
148+
def mysql_delete_in_batches(waterline_failed, waterline_delivered)
149+
status_delivered = item_class.statuses[:delivered]
150+
status_failed_discarded = [item_class.statuses.values_at(:failed, :discarded)]
151+
152+
delete_items_in_batches_mysql(
153+
item_class.where(status: status_delivered, created_at: ...waterline_delivered)
154+
)
155+
delete_items_in_batches_mysql(
156+
item_class.where(status: status_failed_discarded).where(created_at: ...waterline_failed)
157+
)
158+
end
159+
160+
def delete_items_in_batches_mysql(query)
135161
loop do
136-
deleted_count = item_class
137-
.where(created_at: ...waterline)
138-
.limit(BATCH_SIZE)
139-
.delete_all
162+
deleted_count = query.limit(BATCH_SIZE).delete_all
140163

141164
logger.log_info("Deleted #{deleted_count} #{box_type} items for #{box_name} items")
142165
break if deleted_count == 0

app/models/sbmt/outbox/base_item_config.rb

+7
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,13 @@ def retention
3737
@retention ||= ActiveSupport::Duration.parse(options[:retention] || "P1W")
3838
end
3939

40+
def retention_delivered_items
41+
@retention_delivered_items ||= begin
42+
value = options[:retention_delivered_items] || retention
43+
value.is_a?(String) ? ActiveSupport::Duration.parse(value) : value
44+
end
45+
end
46+
4047
def max_retries
4148
@max_retries ||= (options[:max_retries] || 0).to_i
4249
end

lib/sbmt/outbox/version.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,6 @@
22

33
module Sbmt
44
module Outbox
5-
VERSION = "6.11.0"
5+
VERSION = "6.12.0"
66
end
77
end

spec/internal/config/outbox.yml

+1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ test:
88
partition_size: 2
99
partition_strategy: number
1010
retention: P1W
11+
retention_delivered_items: PT6H
1112
retry_strategies:
1213
- exponential_backoff
1314
- compacted_log

spec/jobs/sbmt/outbox/base_delete_stale_items_job_spec.rb

+7-7
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ def item_classes
1111
end
1212
end
1313

14-
let!(:item) { create(:outbox_item, created_at: created_at) }
15-
let!(:item_2) { create(:outbox_item, created_at: created_at) }
14+
let!(:item_delivered) { create(:outbox_item, created_at: created_at, status: 2) }
15+
let!(:item_failed) { create(:outbox_item, created_at: created_at, status: 1) }
1616
let(:created_at) { 1.month.ago }
1717

1818
before do
@@ -25,17 +25,17 @@ def item_classes
2525
end
2626
end
2727

28-
it "deletes item" do
28+
it "deletes items with status 2 and old items with status 1" do
2929
expect { job_class.perform_now("OutboxItem") }
3030
.to change(OutboxItem, :count).by(-2)
3131
end
3232

33-
context "when item is too young" do
34-
let(:created_at) { 1.hour.ago }
33+
context "when an element with status 1 does not retention" do
34+
let(:created_at) { 6.hours.ago }
3535

36-
it "doesn't delete item" do
36+
it "doesn't delete item with status 1 but deletes item with status 2" do
3737
expect { job_class.perform_now("OutboxItem") }
38-
.not_to change(OutboxItem, :count)
38+
.to change(OutboxItem, :count).by(-1)
3939
end
4040
end
4141
end

0 commit comments

Comments
 (0)