diff --git a/app/models/disqualified/base_record.rb b/app/models/disqualified/base_record.rb index b292cbc..1400c52 100644 --- a/app/models/disqualified/base_record.rb +++ b/app/models/disqualified/base_record.rb @@ -1,5 +1,3 @@ -# typed: strict - class Disqualified::BaseRecord < ActiveRecord::Base self.abstract_class = true end diff --git a/app/models/disqualified/record.rb b/app/models/disqualified/record.rb index 0768c8a..f63b918 100644 --- a/app/models/disqualified/record.rb +++ b/app/models/disqualified/record.rb @@ -1,15 +1,8 @@ -# typed: strict - class Disqualified::Record < Disqualified::BaseRecord - extend T::Sig - self.table_name = "disqualified_jobs" scope :runnable, -> { where(finished_at: nil, run_at: (..Time.now), locked_by: nil) } - sig do - params(id: T.nilable(T.any(Integer, String))).returns(Disqualified::Record) - end def self.claim_one!(id: nil) run_id = SecureRandom.uuid association = @@ -34,7 +27,6 @@ def self.claim_one!(id: nil) Disqualified::Record.find_by!(locked_by: run_id) end - sig { returns(Disqualified::Record) } def run! record = self.class.claim_one!(id:) record.send(:instantiate_handler_and_perform_with_args) @@ -42,25 +34,21 @@ def run! record end - sig { void } def finish update!(locked_by: nil, locked_at: nil, finished_at: Time.now) end - sig { void } def requeue retry_count = attempts - 1 sleep = (retry_count**4) + 15 + (rand(10) * (retry_count + 1)) unqueue(run_at: Time.now + sleep) end - sig { params(run_at: T.nilable(Time)).void } def unqueue(run_at: nil) run_at ||= Time.now update!(locked_by: nil, locked_at: nil, run_at:) end - sig { void } private def instantiate_handler_and_perform_with_args raise Disqualified::Error::JobAlreadyFinished if !finished_at.nil? raise Disqualified::Error::JobNotClaimed if locked_by.nil? diff --git a/lib/disqualified.rb b/lib/disqualified.rb index 081d933..ddba357 100644 --- a/lib/disqualified.rb +++ b/lib/disqualified.rb @@ -1,5 +1,3 @@ -# typed: strict - module Disqualified end diff --git a/lib/disqualified/active_job.rb b/lib/disqualified/active_job.rb index e44e63d..1835625 100644 --- a/lib/disqualified/active_job.rb +++ b/lib/disqualified/active_job.rb @@ -1,13 +1,9 @@ -# typed: strict - require "active_job" require "disqualified" class Disqualified::ActiveJobAdapter - extend T::Sig include Disqualified::Job - sig { params(serialized_job_data: T::Hash[String, T.untyped]).void } def perform(serialized_job_data) ::ActiveJob::Base.execute(serialized_job_data) end @@ -16,19 +12,14 @@ def perform(serialized_job_data) module ActiveJob module QueueAdapters class DisqualifiedAdapter - extend T::Sig - - sig { returns(T::Boolean) } def enqueue_after_transaction_commit? Disqualified.client_options.enqueue_after_transaction_commit end - sig { params(job_data: ActiveJob::Base).void } def enqueue(job_data) Disqualified::ActiveJobAdapter.perform_async(job_data.serialize) end - sig { params(job_data: ActiveJob::Base, timestamp: Numeric).void } def enqueue_at(job_data, timestamp) timestamp = Time.at(timestamp) Disqualified::ActiveJobAdapter.perform_at(timestamp, job_data.serialize) diff --git a/lib/disqualified/cli.rb b/lib/disqualified/cli.rb index 4799931..b413b25 100644 --- a/lib/disqualified/cli.rb +++ b/lib/disqualified/cli.rb @@ -1,10 +1,6 @@ -# typed: strict - class Disqualified::CLI - extend T::Sig include Disqualified::Logging - sig { void } def self.run cli = new(ARGV) cli.run @@ -16,18 +12,16 @@ class ServerEngine < Rails::Engine end end - sig { params(argv: T::Array[String]).void } def initialize(argv) @original_argv = argv end - sig { void } def run require File.join(Dir.pwd, "config/environment") option_parser.parse(@original_argv) - server_options = T.must(Disqualified.server_options) + server_options = Disqualified.server_options delay_range = server_options.delay_range error_hooks = server_options.error_hooks logger = server_options.logger @@ -57,14 +51,13 @@ def run private - sig { returns(OptionParser) } def option_parser - return T.must(@option_parser) if instance_variable_defined?(:@option_parser) + return @option_parser if instance_variable_defined?(:@option_parser) option_parser = OptionParser.new do |opts| opts.banner = "Usage: #{File.basename($0)} [OPTIONS]" - server_options = T.must(Disqualified.server_options) + server_options = Disqualified.server_options opts.on("--delay-low SECONDS", Numeric, "Default: #{server_options.delay_low}") do |value| server_options.delay_low = value @@ -84,6 +77,6 @@ def option_parser end end - @option_parser ||= T.let(option_parser, T.nilable(OptionParser)) + @option_parser ||= option_parser end end diff --git a/lib/disqualified/configuration.rb b/lib/disqualified/configuration.rb index c4a047b..cf2d431 100644 --- a/lib/disqualified/configuration.rb +++ b/lib/disqualified/configuration.rb @@ -1,29 +1,20 @@ -# typed: strict - module Disqualified - extend T::Sig - - @client_options = T.let(ClientConfiguration.new, ClientConfiguration) + @client_options = ClientConfiguration.new class << self - extend T::Sig - - sig { returns(Disqualified::ClientConfiguration) } attr_accessor :client_options - sig { returns(T.nilable(Disqualified::ServerConfiguration)) } attr_accessor :server_options - sig { params(block: T.proc.params(arg0: Disqualified::ClientConfiguration).void).void } def configure_client(&block) block.call(client_options) end # While client options are always run, server options only run in the "disqualified" process - sig { params(block: T.proc.params(arg0: Disqualified::ServerConfiguration).void).void } + def configure_server(&block) if server_options - block.call(T.must(server_options)) + block.call(server_options) end end end diff --git a/lib/disqualified/configuration_structs.rb b/lib/disqualified/configuration_structs.rb index 0af2c36..a4af935 100644 --- a/lib/disqualified/configuration_structs.rb +++ b/lib/disqualified/configuration_structs.rb @@ -1,56 +1,43 @@ -# typed: strict - class Disqualified::ClientConfiguration - extend T::Sig - - sig { void } def initialize - @enqueue_after_transaction_commit = T.let(false, T::Boolean) + @enqueue_after_transaction_commit = false end - sig { returns(T::Boolean) } attr_accessor :enqueue_after_transaction_commit end class Disqualified::ServerConfiguration - extend T::Sig - - sig { void } def initialize - @delay_high = T.let(5.0, Numeric) - @delay_low = T.let(1.0, Numeric) - @logger = T.let(Rails.logger, T.untyped) - @pool_size = T.let(5, Integer) - @pwd = T.let(Dir.pwd, String) - @error_hooks = T.let([], T::Array[Disqualified::Logging::ERROR_HOOK_TYPE]) + @delay_high = 5.0 + @delay_low = 1.0 + @logger = Rails.logger + @pool_size = 5 + @pwd = Dir.pwd + @error_hooks = [] end - sig { returns(Numeric) } attr_accessor :delay_high - sig { returns(Numeric) } + attr_accessor :delay_low - sig { returns(T::Array[Disqualified::Logging::ERROR_HOOK_TYPE]) } + attr_accessor :error_hooks - sig { returns(T.untyped) } + attr_accessor :logger - sig { returns(Integer) } + attr_accessor :pool_size - sig { returns(String) } + attr_accessor :pwd private :error_hooks= - sig { returns(T::Range[Float]) } def delay_range delay_low.to_f..delay_high.to_f end - sig { params(block: Disqualified::Logging::ERROR_HOOK_TYPE).void } def on_error(&block) error_hooks.push(block) end - sig { returns(String) } def to_s "{ delay: #{delay_range}, pool_size: #{pool_size}, error_hooks_size: #{error_hooks.size} }" end diff --git a/lib/disqualified/engine.rb b/lib/disqualified/engine.rb index a299278..03bad20 100644 --- a/lib/disqualified/engine.rb +++ b/lib/disqualified/engine.rb @@ -1,5 +1,3 @@ -# typed: strict - module Disqualified class Engine < ::Rails::Engine end diff --git a/lib/disqualified/error.rb b/lib/disqualified/error.rb index 51008d2..2485cee 100644 --- a/lib/disqualified/error.rb +++ b/lib/disqualified/error.rb @@ -1,5 +1,3 @@ -# typed: strict - module Disqualified module Error class DisqualifiedError < StandardError diff --git a/lib/disqualified/job.rb b/lib/disqualified/job.rb index 6f3bfe0..ee49f11 100644 --- a/lib/disqualified/job.rb +++ b/lib/disqualified/job.rb @@ -1,31 +1,24 @@ -# typed: strict - module Disqualified::Job - extend T::Helpers - module ClassMethods - extend T::Sig - - sig { params(the_time: T.any(Time, Date, ActiveSupport::TimeWithZone), args: T.untyped).void } def perform_at(the_time, *args) Disqualified::Record.create!( - handler: T.unsafe(self).name, + handler: name, arguments: JSON.dump(args), queue: "default", run_at: the_time ) end - sig { params(args: T.untyped).void } def perform_async(*args) - T.unsafe(self).perform_at(Time.now, *args) + perform_at(Time.now, *args) end - sig { params(delay: T.any(Numeric, ActiveSupport::Duration), args: T.untyped).void } def perform_in(delay, *args) - T.unsafe(self).perform_at(T.unsafe(Time.now) + delay, *args) + perform_at(Time.now + delay, *args) end end - mixes_in_class_methods(ClassMethods) + def self.included(klass) + klass.extend(ClassMethods) + end end diff --git a/lib/disqualified/logging.rb b/lib/disqualified/logging.rb index 29d4c25..5436db0 100644 --- a/lib/disqualified/logging.rb +++ b/lib/disqualified/logging.rb @@ -1,8 +1,4 @@ -# typed: strict - module Disqualified::Logging - extend T::Sig - ERROR_CONTEXT_TYPE = T.type_alias do T::Hash[T.untyped, T.untyped] end @@ -13,7 +9,6 @@ module Disqualified::Logging module_function - sig { params(parts: T.untyped).void } def format_log(*parts) *extras, message = parts @@ -24,7 +19,6 @@ def format_log(*parts) end end - sig { params(error_hooks: T::Array[ERROR_HOOK_TYPE], error: Exception, context: ERROR_CONTEXT_TYPE).void } def handle_error(error_hooks, error, context) error_hooks.each do |hook| hook.call(error, context) diff --git a/lib/disqualified/main.rb b/lib/disqualified/main.rb index 95851e5..8495f53 100644 --- a/lib/disqualified/main.rb +++ b/lib/disqualified/main.rb @@ -1,16 +1,11 @@ -# typed: strict - class Disqualified::Main - extend T::Sig include Disqualified::Logging - sig { params(error_hooks: T::Array[Disqualified::Logging::ERROR_HOOK_TYPE], logger: T.untyped).void } def initialize(error_hooks:, logger:) @error_hooks = error_hooks @logger = logger end - sig { void } def call Rails.application.reloader.wrap do begin diff --git a/lib/disqualified/pool.rb b/lib/disqualified/pool.rb index 0343528..d312db2 100644 --- a/lib/disqualified/pool.rb +++ b/lib/disqualified/pool.rb @@ -1,43 +1,29 @@ -# typed: strict - class Disqualified::Pool - extend T::Sig include Disqualified::Logging CHECK = :check QUIT = :quit RUN = :run - sig do - params( - delay_range: T::Range[Float], - logger: T.untyped, - pool_size: Integer, - error_hooks: T::Array[Disqualified::Logging::ERROR_HOOK_TYPE], - task: T.proc.params(arg0: T::Hash[Symbol, T.untyped]).void - ).void - end def initialize(delay_range:, logger:, pool_size:, error_hooks:, &task) @delay_range = delay_range @logger = logger @pool_size = pool_size @error_hooks = error_hooks @task = task - @running = T.let(Concurrent::AtomicBoolean.new(true), Concurrent::AtomicBoolean) - @command_queue = T.let(Thread::Queue.new, Thread::Queue) + @running = Concurrent::AtomicBoolean.new(true) + @command_queue = Thread::Queue.new end - sig { void } def run! clock.execute - T.unsafe(Concurrent::Promises) + Concurrent::Promises .zip(*pool) .rescue { |error| handle_error(@error_hooks, error, {}) } .run .value! end - sig { void } def shutdown @running.make_false clock.shutdown @@ -46,33 +32,24 @@ def shutdown end end - sig { returns(Concurrent::TimerTask) } def clock - @clock ||= T.let( - Concurrent::TimerTask.new(run_now: true) do |clock_task| - @logger.debug { format_log("Disqualified::Pool#clock", "Starting") } - clock_task.execution_interval = random_interval - @command_queue.push(Disqualified::Pool::CHECK) - @logger.debug { format_log("Disqualified::Pool#clock", "Next run in #{clock_task.execution_interval}") } - rescue => e - handle_error(@error_hooks, e, {}) - end, - T.nilable(Concurrent::TimerTask) - ) + @clock ||= Concurrent::TimerTask.new(run_now: true) do |clock_task| + @logger.debug { format_log("Disqualified::Pool#clock", "Starting") } + clock_task.execution_interval = random_interval + @command_queue.push(Disqualified::Pool::CHECK) + @logger.debug { format_log("Disqualified::Pool#clock", "Next run in #{clock_task.execution_interval}") } + rescue => e + handle_error(@error_hooks, e, {}) + end end - sig { returns(T::Array[T.nilable(Concurrent::Promises::Future)]) } def pool - @pool ||= T.let( - @pool_size.times.map do |promise_index| - repeat(promise_index:) - &.run - end, - T.nilable(T::Array[T.nilable(Concurrent::Promises::Future)]) - ) + @pool ||= @pool_size.times.map do |promise_index| + repeat(promise_index:) + &.run + end end - sig { params(promise_index: Integer).returns(T.nilable(Concurrent::Promises::Future)) } def repeat(promise_index:) if @running.false? return @@ -112,7 +89,6 @@ def repeat(promise_index:) private - sig { returns(Float) } def random_interval rand(@delay_range) end diff --git a/lib/disqualified/version.rb b/lib/disqualified/version.rb index e083606..eab2143 100644 --- a/lib/disqualified/version.rb +++ b/lib/disqualified/version.rb @@ -1,5 +1,3 @@ -# typed: strict - module Disqualified VERSION = "0.4.0" end