Skip to content

Commit

Permalink
Less aggressive async pruning for RubyThreadPoolExecutor
Browse files Browse the repository at this point in the history
  • Loading branch information
joshuay03 committed Jan 17, 2025
1 parent dbfbc14 commit 6e5e003
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ module Concurrent
# @!macro thread_pool_executor_constant_default_max_queue_size
# Default maximum number of tasks that may be added to the task queue.

# @!macro thread_pool_executor_constant_default_thread_timeout
# @!macro thread_pool_executor_constant_default_thread_idle_timeout
# Default maximum number of seconds a thread in the pool may remain idle
# before being reclaimed.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class JavaThreadPoolExecutor < JavaExecutorService
# @!macro thread_pool_executor_constant_default_max_queue_size
DEFAULT_MAX_QUEUE_SIZE = 0

# @!macro thread_pool_executor_constant_default_thread_timeout
# @!macro thread_pool_executor_constant_default_thread_idle_timeout
DEFAULT_THREAD_IDLETIMEOUT = 60

# @!macro thread_pool_executor_constant_default_synchronous
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@ class RubyThreadPoolExecutor < RubyExecutorService
# @!macro thread_pool_executor_constant_default_max_queue_size
DEFAULT_MAX_QUEUE_SIZE = 0

# @!macro thread_pool_executor_constant_default_thread_timeout
# @!macro thread_pool_executor_constant_default_thread_idle_timeout
DEFAULT_THREAD_IDLETIMEOUT = 60

# @!macro thread_pool_executor_constant_default_pool_prune_timeout
DEFAULT_POOL_PRUNETIMEOUT = 30

# @!macro thread_pool_executor_constant_default_synchronous
DEFAULT_SYNCHRONOUS = false

Expand Down Expand Up @@ -149,6 +152,8 @@ def ns_initialize(opts)

@gc_interval = opts.fetch(:gc_interval, @idletime / 2.0).to_i # undocumented
@next_gc_time = Concurrent.monotonic_time + @gc_interval

ns_set_pruner
end

# @!visibility private
Expand All @@ -166,7 +171,6 @@ def ns_execute(*args, &task)
return fallback_action(*args, &task)
end

ns_prune_pool if @next_gc_time < Concurrent.monotonic_time
nil
end

Expand All @@ -183,6 +187,8 @@ def ns_shutdown_execution
# no more tasks will be accepted, just stop all workers
@pool.each(&:stop)
end

ns_pruner&.kill
end

# @!visibility private
Expand Down Expand Up @@ -218,7 +224,7 @@ def ns_assign_worker(*args, &task)
# @!visibility private
def ns_enqueue(*args, &task)
return false if @synchronous

if !ns_limited_queue? || @queue.size < @max_queue
@queue << [task, args]
true
Expand Down Expand Up @@ -303,8 +309,24 @@ def ns_reset_if_forked
@largest_length = 0
@workers_counter = 0
@ruby_pid = $$

ns_set_pruner
end
end

def ns_pruner
return if @min_length == @max_length

return @pruner if @pruner && @pruner.alive?

@pruner = Thread.new do
until stopped_event&.set?
sleep DEFAULT_POOL_PRUNETIMEOUT
ns_prune_pool
end
end
end
alias_method :ns_set_pruner, :ns_pruner

# @!visibility private
class Worker
Expand Down
4 changes: 1 addition & 3 deletions spec/concurrent/executor/cached_thread_pool_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,7 @@ module Concurrent
latch = Concurrent::CountDownLatch.new(4)
4.times { subject.post { sleep 0.1; latch.count_down } }
expect(latch.wait(1)).to be true
sleep 0.2
subject.post {}
sleep 0.2
sleep 36
expect(subject.length).to be < 4
end

Expand Down
43 changes: 18 additions & 25 deletions spec/concurrent/executor/ruby_thread_pool_executor_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,56 +56,49 @@ def wakeup_thread_group(group)
end

before(:each) do
@now = Concurrent.monotonic_time
allow(Concurrent).to receive(:monotonic_time) { @now }

@group1 = prepare_thread_group(5)
@group2 = prepare_thread_group(5)
end

def eventually(mutex: nil, timeout: 5, &block)
start = Time.now
while Time.now - start < timeout
begin
if mutex
mutex.synchronize do
return yield
end
else
start = Time.now
while Time.now - start < timeout
begin
if mutex
mutex.synchronize do
return yield
end
rescue Exception => last_failure
else
return yield
end
Thread.pass
rescue Exception => last_failure
end
raise last_failure
Thread.pass
end
raise last_failure
end

it "triggers pruning when posting work if the last prune happened more than gc_interval ago" do
it "triggers pruning if the thread idletimes have elapsed and the prunetime has elapsed" do
wakeup_thread_group(@group1)
@now += 6
sleep 36
wakeup_thread_group(@group2)
subject.post { }

eventually { expect(@group1.threads).to all(have_attributes(status: false)) }
eventually { expect(@group2.threads).to all(have_attributes(status: 'sleep')) }
end

it "does not trigger pruning when posting work if the last prune happened less than gc_interval ago" do
it "does not trigger pruning if the thread idletimes have elapsed but the prunetime has not elapsed" do
wakeup_thread_group(@group1)
@now += 3
subject.prune_pool
@now += 3
sleep 6
wakeup_thread_group(@group2)
subject.post { }

eventually { expect(@group1.threads).to all(have_attributes(status: false)) }
eventually { expect(@group1.threads).to all(have_attributes(status: 'sleep')) }
eventually { expect(@group2.threads).to all(have_attributes(status: 'sleep')) }
end

it "reclaims threads that have been idle for more than idletime seconds" do
wakeup_thread_group(@group1)
@now += 6
sleep 6
wakeup_thread_group(@group2)
subject.prune_pool

Expand All @@ -116,7 +109,7 @@ def eventually(mutex: nil, timeout: 5, &block)
it "keeps at least min_length workers" do
wakeup_thread_group(@group1)
wakeup_thread_group(@group2)
@now += 12
sleep 12
subject.prune_pool
all_threads = @group1.threads + @group2.threads
eventually do
Expand Down

0 comments on commit 6e5e003

Please sign in to comment.