Skip to content

Commit

Permalink
Disqualified::Sequence
Browse files Browse the repository at this point in the history
  • Loading branch information
zachahn committed Nov 20, 2024
1 parent 71ee22d commit 72641b2
Show file tree
Hide file tree
Showing 12 changed files with 1,694 additions and 4 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

### Added

* Added `Disqualified::Sequence`

### Changed

* Renamed `Disqualified::Record#unqueue` to `Disqualified::Record#unclaim`
Expand Down
25 changes: 23 additions & 2 deletions app/models/disqualified/record.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,18 @@ class Disqualified::Record < Disqualified::BaseRecord

self.table_name = "disqualified_jobs"

scope :runnable, -> { where(finished_at: nil, run_at: (..Time.now), locked_by: nil) }
belongs_to :disqualified_sequence,
class_name: "Disqualified::SequenceRecord",
foreign_key: "sequence_uuid",
primary_key: "uuid",
optional: true

scope :with_sequence, -> {
joins("LEFT OUTER JOIN disqualified_sequences ds ON ds.uuid = sequence_uuid AND ds.current_step = sequence_step")
.where("ds.uuid = sequence_uuid OR (ds.uuid IS NULL AND sequence_uuid IS NULL)")
}
scope :pending, -> { where(finished_at: nil, run_at: (..Time.now), locked_by: nil) }
scope :runnable, -> { with_sequence.pending }

sig do
params(id: T.nilable(T.any(Integer, String))).returns(Disqualified::Record)
Expand Down Expand Up @@ -50,7 +61,17 @@ def run!

sig { void }
def finish
update!(locked_by: nil, locked_at: nil, finished_at: Time.now)
transaction do
update!(locked_by: nil, locked_at: nil, finished_at: Time.now)
if sequence_uuid && sequence_step
Disqualified::SequenceRecord
.where(uuid: sequence_uuid, current_step: sequence_step)
.update_all(
current_step: T.must(sequence_step) + 1,
updated_at: Time.now
)
end
end
end

sig { void }
Expand Down
5 changes: 5 additions & 0 deletions app/models/disqualified/sequence_record.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# typed: strict

class Disqualified::SequenceRecord < Disqualified::BaseRecord
self.table_name = "disqualified_sequences"
end
1 change: 1 addition & 0 deletions lib/disqualified.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ module Disqualified
require_relative "disqualified/job"
require_relative "disqualified/main"
require_relative "disqualified/pool"
require_relative "disqualified/sequence"
require_relative "disqualified/version"
10 changes: 9 additions & 1 deletion lib/disqualified/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,19 @@ module ClassMethods

sig { params(the_time: T.any(Time, Date, ActiveSupport::TimeWithZone), args: T.untyped).void }
def perform_at(the_time, *args)
if Thread.current[Disqualified::Sequence::UUID]
Thread.current[Disqualified::Sequence::COUNT] += 1
sequence_uuid = Thread.current[Disqualified::Sequence::UUID]
sequence_step = Thread.current[Disqualified::Sequence::COUNT]
end

Disqualified::Record.create!(
handler: T.unsafe(self).name,
arguments: JSON.dump(args),
queue: "default",
run_at: the_time
run_at: the_time,
sequence_uuid:,
sequence_step:
)
end

Expand Down
26 changes: 26 additions & 0 deletions lib/disqualified/sequence.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# typed: strict

class Disqualified::Sequence
extend T::Sig

UUID = :disqualified_sequence_uuid
COUNT = :disqualified_sequence_count

sig { params(description: T.nilable(String), block: T.proc.void).void }
def self.queue(description: nil, &block)
Disqualified::SequenceRecord.transaction do
Thread.current[UUID] = SecureRandom.uuid
Thread.current[COUNT] = 0
yield
Disqualified::SequenceRecord.create!(
uuid: Thread.current[UUID],
current_step: 1,
final_step: Thread.current[COUNT],
description:
)
end
ensure
Thread.current[UUID] = nil
Thread.current[COUNT] = nil
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
class CreateDisqualifiedSequences < ActiveRecord::Migration[7.2]
def change
create_table :disqualified_sequences do |t|
t.text :uuid, null: false
t.integer :current_step, null: false
t.integer :final_step, null: false
t.text :description
t.timestamps
end

add_index :disqualified_sequences, :uuid, unique: true

add_column :disqualified_jobs, :sequence_uuid, :text
add_column :disqualified_jobs, :sequence_step, :integer
end
end
156 changes: 156 additions & 0 deletions sorbet/rbi/dsl/disqualified/record.rbi

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 72641b2

Please sign in to comment.