From b57df88c0b511006bea46d4aa3ca731c0ee966fb Mon Sep 17 00:00:00 2001 From: Zach Ahn Date: Sun, 1 Dec 2024 21:08:00 -0500 Subject: [PATCH] Process jobs with expired claims --- app/models/disqualified/record.rb | 8 +++++--- sorbet/rbi/dsl/disqualified/record.rbi | 22 ++++++++++++++++++++-- test/models/disqualified/record_test.rb | 9 +++++++++ 3 files changed, 34 insertions(+), 5 deletions(-) diff --git a/app/models/disqualified/record.rb b/app/models/disqualified/record.rb index d2dada0..d832bb4 100644 --- a/app/models/disqualified/record.rb +++ b/app/models/disqualified/record.rb @@ -15,8 +15,11 @@ class Disqualified::Record < Disqualified::BaseRecord joins("LEFT OUTER JOIN disqualified_sequences ds ON ds.uuid = sequence_uuid AND ds.current_step = sequence_step") .where("ds.uuid = sequence_uuid OR (ds.uuid IS NULL AND sequence_uuid IS NULL)") } - scope :pending, -> { where(finished_at: nil, run_at: (..Time.now), locked_by: nil) } - scope :runnable, -> { with_sequence.pending } + scope :ready, -> { where(finished_at: nil, run_at: (..Time.now)) } + scope :pending_without_claim, -> { where(locked_by: nil) } + scope :pending_expired_claim, -> { where(locked_at: (..(10.minutes.ago))) } + scope :ordered, -> { order(run_at: :asc) } + scope :runnable, -> { ready.with_sequence.ordered.and(pending_without_claim.or(pending_expired_claim)) } sig do params(id: T.nilable(T.any(Integer, String))).returns(Disqualified::Record) @@ -26,7 +29,6 @@ def self.claim_one!(id: nil) association = Disqualified::Record .runnable - .order(run_at: :asc) .limit(1) if id diff --git a/sorbet/rbi/dsl/disqualified/record.rbi b/sorbet/rbi/dsl/disqualified/record.rbi index 780ee21..06bfc1f 100644 --- a/sorbet/rbi/dsl/disqualified/record.rbi +++ b/sorbet/rbi/dsl/disqualified/record.rbi @@ -536,7 +536,13 @@ class Disqualified::Record def order(*args, &blk); end sig { params(args: T.untyped, blk: T.untyped).returns(PrivateAssociationRelation) } - def pending(*args, &blk); end + def ordered(*args, &blk); end + + sig { params(args: T.untyped, blk: T.untyped).returns(PrivateAssociationRelation) } + def pending_expired_claim(*args, &blk); end + + sig { params(args: T.untyped, blk: T.untyped).returns(PrivateAssociationRelation) } + def pending_without_claim(*args, &blk); end sig { params(args: T.untyped, blk: T.untyped).returns(PrivateAssociationRelation) } def preload(*args, &blk); end @@ -544,6 +550,9 @@ class Disqualified::Record sig { params(args: T.untyped, blk: T.untyped).returns(PrivateAssociationRelation) } def readonly(*args, &blk); end + sig { params(args: T.untyped, blk: T.untyped).returns(PrivateAssociationRelation) } + def ready(*args, &blk); end + sig { params(args: T.untyped, blk: T.untyped).returns(PrivateAssociationRelation) } def references(*args, &blk); end @@ -1509,7 +1518,13 @@ class Disqualified::Record def order(*args, &blk); end sig { params(args: T.untyped, blk: T.untyped).returns(PrivateRelation) } - def pending(*args, &blk); end + def ordered(*args, &blk); end + + sig { params(args: T.untyped, blk: T.untyped).returns(PrivateRelation) } + def pending_expired_claim(*args, &blk); end + + sig { params(args: T.untyped, blk: T.untyped).returns(PrivateRelation) } + def pending_without_claim(*args, &blk); end sig { params(args: T.untyped, blk: T.untyped).returns(PrivateRelation) } def preload(*args, &blk); end @@ -1517,6 +1532,9 @@ class Disqualified::Record sig { params(args: T.untyped, blk: T.untyped).returns(PrivateRelation) } def readonly(*args, &blk); end + sig { params(args: T.untyped, blk: T.untyped).returns(PrivateRelation) } + def ready(*args, &blk); end + sig { params(args: T.untyped, blk: T.untyped).returns(PrivateRelation) } def references(*args, &blk); end diff --git a/test/models/disqualified/record_test.rb b/test/models/disqualified/record_test.rb index 618daba..dcc1dec 100644 --- a/test/models/disqualified/record_test.rb +++ b/test/models/disqualified/record_test.rb @@ -54,6 +54,15 @@ def perform end end + test ".runnable returns non-finished jobs with expired claim" do + travel_to(2.days.ago) do + NoArgJob.perform_async + Disqualified::Record.claim_one! + assert_equal(0, Disqualified::Record.runnable.size) + end + assert_equal(1, Disqualified::Record.runnable.size) + end + test "#run! doesn't run ran jobs" do NoArgJob.perform_async record = Disqualified::Record.runnable.first