From 6e5e003c2fb637fd0a934c09139e9728c09f8f8a Mon Sep 17 00:00:00 2001 From: Joshua Young Date: Fri, 17 Jan 2025 09:31:54 +1000 Subject: [PATCH] Less aggressive async pruning for RubyThreadPoolExecutor --- .../concurrent/executor/fixed_thread_pool.rb | 2 +- .../executor/java_thread_pool_executor.rb | 2 +- .../executor/ruby_thread_pool_executor.rb | 28 ++++++++++-- .../executor/cached_thread_pool_spec.rb | 4 +- .../ruby_thread_pool_executor_spec.rb | 43 ++++++++----------- 5 files changed, 46 insertions(+), 33 deletions(-) diff --git a/lib/concurrent-ruby/concurrent/executor/fixed_thread_pool.rb b/lib/concurrent-ruby/concurrent/executor/fixed_thread_pool.rb index 8324c0673..1cd754872 100644 --- a/lib/concurrent-ruby/concurrent/executor/fixed_thread_pool.rb +++ b/lib/concurrent-ruby/concurrent/executor/fixed_thread_pool.rb @@ -12,7 +12,7 @@ module Concurrent # @!macro thread_pool_executor_constant_default_max_queue_size # Default maximum number of tasks that may be added to the task queue. - # @!macro thread_pool_executor_constant_default_thread_timeout + # @!macro thread_pool_executor_constant_default_thread_idle_timeout # Default maximum number of seconds a thread in the pool may remain idle # before being reclaimed. diff --git a/lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb b/lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb index 598a5f91f..4c1b3bc00 100644 --- a/lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb +++ b/lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb @@ -18,7 +18,7 @@ class JavaThreadPoolExecutor < JavaExecutorService # @!macro thread_pool_executor_constant_default_max_queue_size DEFAULT_MAX_QUEUE_SIZE = 0 - # @!macro thread_pool_executor_constant_default_thread_timeout + # @!macro thread_pool_executor_constant_default_thread_idle_timeout DEFAULT_THREAD_IDLETIMEOUT = 60 # @!macro thread_pool_executor_constant_default_synchronous diff --git a/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb b/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb index 9375acf38..6afda5585 100644 --- a/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb +++ b/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb @@ -20,9 +20,12 @@ class RubyThreadPoolExecutor < RubyExecutorService # @!macro thread_pool_executor_constant_default_max_queue_size DEFAULT_MAX_QUEUE_SIZE = 0 - # @!macro thread_pool_executor_constant_default_thread_timeout + # @!macro thread_pool_executor_constant_default_thread_idle_timeout DEFAULT_THREAD_IDLETIMEOUT = 60 + # @!macro thread_pool_executor_constant_default_pool_prune_timeout + DEFAULT_POOL_PRUNETIMEOUT = 30 + # @!macro thread_pool_executor_constant_default_synchronous DEFAULT_SYNCHRONOUS = false @@ -149,6 +152,8 @@ def ns_initialize(opts) @gc_interval = opts.fetch(:gc_interval, @idletime / 2.0).to_i # undocumented @next_gc_time = Concurrent.monotonic_time + @gc_interval + + ns_set_pruner end # @!visibility private @@ -166,7 +171,6 @@ def ns_execute(*args, &task) return fallback_action(*args, &task) end - ns_prune_pool if @next_gc_time < Concurrent.monotonic_time nil end @@ -183,6 +187,8 @@ def ns_shutdown_execution # no more tasks will be accepted, just stop all workers @pool.each(&:stop) end + + ns_pruner&.kill end # @!visibility private @@ -218,7 +224,7 @@ def ns_assign_worker(*args, &task) # @!visibility private def ns_enqueue(*args, &task) return false if @synchronous - + if !ns_limited_queue? || @queue.size < @max_queue @queue << [task, args] true @@ -303,8 +309,24 @@ def ns_reset_if_forked @largest_length = 0 @workers_counter = 0 @ruby_pid = $$ + + ns_set_pruner + end + end + + def ns_pruner + return if @min_length == @max_length + + return @pruner if @pruner && @pruner.alive? + + @pruner = Thread.new do + until stopped_event&.set? + sleep DEFAULT_POOL_PRUNETIMEOUT + ns_prune_pool + end end end + alias_method :ns_set_pruner, :ns_pruner # @!visibility private class Worker diff --git a/spec/concurrent/executor/cached_thread_pool_spec.rb b/spec/concurrent/executor/cached_thread_pool_spec.rb index 513873b92..ede18c15a 100644 --- a/spec/concurrent/executor/cached_thread_pool_spec.rb +++ b/spec/concurrent/executor/cached_thread_pool_spec.rb @@ -158,9 +158,7 @@ module Concurrent latch = Concurrent::CountDownLatch.new(4) 4.times { subject.post { sleep 0.1; latch.count_down } } expect(latch.wait(1)).to be true - sleep 0.2 - subject.post {} - sleep 0.2 + sleep 36 expect(subject.length).to be < 4 end diff --git a/spec/concurrent/executor/ruby_thread_pool_executor_spec.rb b/spec/concurrent/executor/ruby_thread_pool_executor_spec.rb index bf328a7c6..d619b19eb 100644 --- a/spec/concurrent/executor/ruby_thread_pool_executor_spec.rb +++ b/spec/concurrent/executor/ruby_thread_pool_executor_spec.rb @@ -56,56 +56,49 @@ def wakeup_thread_group(group) end before(:each) do - @now = Concurrent.monotonic_time - allow(Concurrent).to receive(:monotonic_time) { @now } - @group1 = prepare_thread_group(5) @group2 = prepare_thread_group(5) end def eventually(mutex: nil, timeout: 5, &block) - start = Time.now - while Time.now - start < timeout - begin - if mutex - mutex.synchronize do - return yield - end - else + start = Time.now + while Time.now - start < timeout + begin + if mutex + mutex.synchronize do return yield end - rescue Exception => last_failure + else + return yield end - Thread.pass + rescue Exception => last_failure end - raise last_failure + Thread.pass + end + raise last_failure end - it "triggers pruning when posting work if the last prune happened more than gc_interval ago" do + it "triggers pruning if the thread idletimes have elapsed and the prunetime has elapsed" do wakeup_thread_group(@group1) - @now += 6 + sleep 36 wakeup_thread_group(@group2) - subject.post { } eventually { expect(@group1.threads).to all(have_attributes(status: false)) } eventually { expect(@group2.threads).to all(have_attributes(status: 'sleep')) } end - it "does not trigger pruning when posting work if the last prune happened less than gc_interval ago" do + it "does not trigger pruning if the thread idletimes have elapsed but the prunetime has not elapsed" do wakeup_thread_group(@group1) - @now += 3 - subject.prune_pool - @now += 3 + sleep 6 wakeup_thread_group(@group2) - subject.post { } - eventually { expect(@group1.threads).to all(have_attributes(status: false)) } + eventually { expect(@group1.threads).to all(have_attributes(status: 'sleep')) } eventually { expect(@group2.threads).to all(have_attributes(status: 'sleep')) } end it "reclaims threads that have been idle for more than idletime seconds" do wakeup_thread_group(@group1) - @now += 6 + sleep 6 wakeup_thread_group(@group2) subject.prune_pool @@ -116,7 +109,7 @@ def eventually(mutex: nil, timeout: 5, &block) it "keeps at least min_length workers" do wakeup_thread_group(@group1) wakeup_thread_group(@group2) - @now += 12 + sleep 12 subject.prune_pool all_threads = @group1.threads + @group2.threads eventually do