-
-
Notifications
You must be signed in to change notification settings - Fork 5.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add threadpool support to runtime #42302
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,26 +1,62 @@ | ||
# This file is a part of Julia. License is MIT: https://julialang.org/license | ||
|
||
export threadid, nthreads, @threads, @spawn | ||
export threadid, nthreads, @threads, @spawn, | ||
threadpool, nthreadpools | ||
|
||
""" | ||
Threads.threadid() | ||
Threads.threadid() -> Int | ||
|
||
Get the ID number of the current thread of execution. The master thread has ID `1`. | ||
Get the ID number of the current thread of execution. The master thread has | ||
ID `1`. | ||
""" | ||
threadid() = Int(ccall(:jl_threadid, Int16, ())+1) | ||
|
||
# Inclusive upper bound on threadid() | ||
""" | ||
Threads.nthreads() | ||
Threads.nthreads([:default|:interactive]) -> Int | ||
|
||
Get the number of threads available to the Julia process. This is the inclusive upper bound | ||
on [`threadid()`](@ref). | ||
Get the number of threads (across all thread pools or within the specified | ||
thread pool) available to Julia. The number of threads across all thread | ||
pools is the inclusive upper bound on [`threadid()`](@ref). | ||
|
||
See also: `BLAS.get_num_threads` and `BLAS.set_num_threads` in the | ||
[`LinearAlgebra`](@ref man-linalg) standard library, and `nprocs()` in the | ||
[`Distributed`](@ref man-distributed) standard library. | ||
""" | ||
function nthreads end | ||
|
||
nthreads() = Int(unsafe_load(cglobal(:jl_n_threads, Cint))) | ||
function nthreads(pool::Symbol) | ||
if pool == :default | ||
tpid = Int8(0) | ||
elseif pool == :interactive | ||
tpid = Int8(1) | ||
else | ||
error("invalid threadpool specified") | ||
end | ||
return _nthreads_in_pool(tpid) | ||
end | ||
function _nthreads_in_pool(tpid::Int8) | ||
p = unsafe_load(cglobal(:jl_n_threads_per_pool, Ptr{Cint})) | ||
return Int(unsafe_load(p, tpid + 1)) | ||
end | ||
|
||
""" | ||
Threads.threadpool(tid = threadid()) -> Symbol | ||
|
||
Returns the specified thread's threadpool; either `:default` or `:interactive`. | ||
""" | ||
function threadpool(tid = threadid()) | ||
tpid = ccall(:jl_threadpoolid, Int8, (Int16,), tid-1) | ||
return tpid == 0 ? :default : :interactive | ||
end | ||
|
||
""" | ||
Threads.nthreadpools() -> Int | ||
|
||
Returns the number of threadpools currently configured. | ||
""" | ||
nthreadpools() = Int(unsafe_load(cglobal(:jl_n_threadpools, Cint))) | ||
kpamnany marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
|
||
function threading_run(fun, static) | ||
ccall(:jl_enter_threaded_region, Cvoid, ()) | ||
|
@@ -48,7 +84,7 @@ function _threadsfor(iter, lbody, schedule) | |
quote | ||
local threadsfor_fun | ||
let range = $(esc(range)) | ||
function threadsfor_fun(tid=1; onethread=false) | ||
function threadsfor_fun(tid = 1; onethread = false) | ||
r = range # Load into local variable | ||
lenr = length(r) | ||
# divide loop iterations among threads | ||
|
@@ -232,35 +268,63 @@ macro threads(args...) | |
end | ||
|
||
""" | ||
Threads.@spawn expr | ||
Threads.@spawn [:default|:interactive] expr | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The name There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I find The fact is that there is really no difference between the threadpools -- the term There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I personally prefer I don't think There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
||
Create a [`Task`](@ref) and [`schedule`](@ref) it to run on any available thread. | ||
The task is allocated to a thread after it becomes available. To wait for the task | ||
to finish, call [`wait`](@ref) on the result of this macro, or call [`fetch`](@ref) to | ||
wait and then obtain its return value. | ||
Create a [`Task`](@ref) and [`schedule`](@ref) it to run on any available | ||
thread in the specified threadpool (`:default` if unspecified). The task is | ||
allocated to a thread once one becomes available. To wait for the task to | ||
finish, call [`wait`](@ref) on the result of this macro, or call | ||
[`fetch`](@ref) to wait and then obtain its return value. | ||
|
||
Values can be interpolated into `@spawn` via `\$`, which copies the value directly into the | ||
constructed underlying closure. This allows you to insert the _value_ of a variable, | ||
isolating the asynchronous code from changes to the variable's value in the current task. | ||
Values can be interpolated into `@spawn` via `\$`, which copies the value | ||
directly into the constructed underlying closure. This allows you to insert | ||
the _value_ of a variable, isolating the asynchronous code from changes to | ||
the variable's value in the current task. | ||
|
||
!!! note | ||
See the manual chapter on threading for important caveats. | ||
See the manual chapter on [multi-threading](@ref man-multithreading) | ||
for important caveats. See also the chapter on [threadpools](@ref man-threadpools). | ||
|
||
!!! compat "Julia 1.3" | ||
This macro is available as of Julia 1.3. | ||
|
||
!!! compat "Julia 1.4" | ||
Interpolating values via `\$` is available as of Julia 1.4. | ||
|
||
!!! compat "Julia 1.9" | ||
A threadpool may be specified as of Julia 1.9. | ||
""" | ||
macro spawn(expr) | ||
letargs = Base._lift_one_interp!(expr) | ||
macro spawn(args...) | ||
tpid = Int8(0) | ||
na = length(args) | ||
if na == 2 | ||
ttype, ex = args | ||
if ttype isa QuoteNode | ||
ttype = ttype.value | ||
elseif ttype isa Symbol | ||
# TODO: allow unquoted symbols | ||
ttype = nothing | ||
end | ||
if ttype === :interactive | ||
tpid = Int8(1) | ||
elseif ttype !== :default | ||
throw(ArgumentError("unsupported threadpool in @spawn: $ttype")) | ||
end | ||
elseif na == 1 | ||
ex = args[1] | ||
else | ||
throw(ArgumentError("wrong number of arguments in @spawn")) | ||
end | ||
|
||
letargs = Base._lift_one_interp!(ex) | ||
|
||
thunk = esc(:(()->($expr))) | ||
thunk = esc(:(()->($ex))) | ||
var = esc(Base.sync_varname) | ||
quote | ||
let $(letargs...) | ||
local task = Task($thunk) | ||
task.sticky = false | ||
ccall(:jl_set_task_threadpoolid, Cint, (Any, Int8), task, $tpid) | ||
if $(Expr(:islocal, var)) | ||
put!($var, task) | ||
end | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the wait-free property of multiqueue, we need
nt = nthreads()
since all worker threads from all worker pools are possible enqueuers. We probably don't need to worry too much about this for now since most of enqueues happen inside each pool and a theoretical guarantee does not imply practical performance in general. But I think it's something to watch for when using it in practice especially when there are intensive inter-pool synchronizations. For example, a rather very conservative but wasteful approach isheap_p = max(heap_c * _nthreads_in_pool(tpid), nthreads() + 1)
.@vtjnash Why is
+=
used inheap_p += heap_c * nt
instead of=
by the way? We only needheap_p / nt > 1
here, right? The heap vector is either empy or fully initialized so I suppose it doesn't matter ATM though.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a good point. In particular, to retain interactivity, it should be a common pattern for an interactive task to enqueue work to the default threadpool. However, as you say, it may not be a problem in practice, and in any case we have many problems with the use of multi-queues in general; IMO the way forward is an evolution of your #43366.