Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add functions to count object in use and objects in pool #29

Merged
merged 8 commits into from
Nov 1, 2023
Merged
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name = "ConcurrentUtilities"
uuid = "f0e56b4a-5159-44fe-b623-3e5288b988bb"
authors = ["Jacob Quinn <[email protected]>"]
version = "2.2.1"
version = "2.3.0"

[deps]
Serialization = "9e88b42a-f829-5b0c-bbe9-9e923198166b"
Expand Down
29 changes: 29 additions & 0 deletions src/pools.jl
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,35 @@ safesizehint!(x, n) = sizehint!(x, min(4096, n))
# determines whether we'll look up object caches in .keyedvalues or .values
iskeyed(::Pool{K}) where {K} = K !== Nothing

Base.keytype(::Type{<:Pool{K}}) where {K} = K
Base.keytype(p::Pool) = keytype(typeof(p))

Base.valtype(::Type{<:Pool{<:Any, T}}) where {T} = T
Base.valtype(p::Pool) = valtype(typeof(p))
nickrobinson251 marked this conversation as resolved.
Show resolved Hide resolved

"""
Pools.max(pool::Pool) -> Int

Return the maximum number of objects permitted to be in use at the same time.
See `Pools.permits(pool)` for the number of objects currently in use.
"""
max(pool::Pool) = Base.@lock pool.lock pool.max
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think capacity might be a good name for this? Or max_capacity?

Or maybe max_allowed_in_use? I actually think i like this best.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i didn't go for "in use" ("usage", "reusable", etc) because although that's a good description for the HTTP use-case, this Pool functionality is pretty generic (and we already have punny names like drain! and the docs talk about "permits")

But also i didn't like capacity because that suggested to me that there's a limit on number of "in the pool" rather than "in use".

happy to change to "use"-type names if you think that's better


"""
Pools.permits(pool::Pool) -> Int

Return the number of objects currently in use. Less than or equal to `Pools.max(pool)`.
"""
permits(pool::Pool) = Base.@lock pool.lock pool.cur
nickrobinson251 marked this conversation as resolved.
Show resolved Hide resolved

"""
Pools.depth(pool::Pool) -> Int

Return the number of objects in the pool available for reuse.
"""
depth(pool::Pool) = Base.@lock pool.lock mapreduce(length, +, values(pool.keyedvalues); init=0)
depth(pool::Pool{Nothing}) = Base.@lock pool.lock length(pool.values)
nickrobinson251 marked this conversation as resolved.
Show resolved Hide resolved

"""
drain!(pool)

Expand Down
99 changes: 94 additions & 5 deletions test/pools.jl
Original file line number Diff line number Diff line change
@@ -1,24 +1,49 @@
using ConcurrentUtilities, Test
using ConcurrentUtilities.Pools, Test

@testset "Pools" begin
pool_size = length∘Pools.values
@testset "nonkeyed and pool basics" begin
pool = Pool{Int}(3)
@test keytype(pool) === Nothing
@test valtype(pool) === Int

@test Pools.max(pool) == 3
@test Pools.permits(pool) == 0
@test Pools.depth(pool) == 0

# acquire an object from the pool
x1 = acquire(() -> 1, pool)
# no existing objects in the pool, so our function was called to create a new one
@test x1 == 1
@test Pools.max(pool) == 3
@test Pools.permits(pool) == 1
@test Pools.depth(pool) == 0

# release back to the pool for reuse
release(pool, x1)
@test Pools.permits(pool) == 0
@test Pools.depth(pool) == 1

# acquire another object from the pool
x1 = acquire(() -> 2, pool)
# this time, the pool had an existing object, so our function was not called
@test x1 == 1
@test Pools.permits(pool) == 1
@test Pools.depth(pool) == 0

# but now there are no objects to reuse again, so the next acquire will call our function
x2 = acquire(() -> 2, pool)
@test x2 == 2
@test Pools.permits(pool) == 2
@test Pools.depth(pool) == 0

x3 = acquire(() -> 3, pool)
@test x3 == 3
# the pool is now at capacity, so the next acquire will block until an object is released
@test Pools.permits(pool) == 3
@test Pools.depth(pool) == 0

# the pool is now at `Pools.max`, so the next acquire will block until an object is released
@test Pools.permits(pool) == Pools.max(pool)
tsk = @async acquire(() -> 4, pool; forcenew=true)
yield()
@test !istaskdone(tsk)
Expand All @@ -28,60 +53,110 @@ using ConcurrentUtilities, Test
x1 = fetch(tsk)
# even though we released 1 for reuse, we passed forcenew, so our function was called to create new
@test x1 == 4
@test Pools.permits(pool) == 3
@test Pools.depth(pool) == 1

# error to try and provide a key to a non-keyed pool
@test_throws ArgumentError acquire(() -> 1, pool, 1)

# release objects back to the pool
release(pool, x1)
release(pool, x2)
release(pool, x3)
@test Pools.permits(pool) == 0
@test Pools.depth(pool) == 4

# acquire an object, but checking isvalid
x1 = acquire(() -> 5, pool; isvalid=x -> x == 1)
@test x1 == 1
@test Pools.permits(pool) == 1

# no valid objects, so our function was called to create a new one
x2 = acquire(() -> 6, pool; isvalid=x -> x == 1)
@test x2 == 6
# we have one slot left in the pool, we now throw while creating new
@test Pools.permits(pool) == 2

# we have one permit left, we now throw while creating a new object
# and we want to test that the permit isn't permanently lost for the pool
@test_throws ErrorException acquire(() -> error("oops"), pool; forcenew=true)
@test Pools.permits(pool) == 2

# we can still acquire a new object
x3 = acquire(() -> 7, pool; forcenew=true)
@test x3 == 7
@test Pools.permits(pool) == 3

# release objects back to the pool
drain!(pool)
release(pool, x1)
release(pool, x2)
release(pool, x3)
@test Pools.permits(pool) == 0
@test Pools.depth(pool) == 3

# try to do an invalid release
@test_throws ArgumentError release(pool, 10)

# test that the invalid release didn't push the object to our pool for reuse
x1 = acquire(() -> 8, pool)
@test x1 == 7
@test Pools.permits(pool) == 1
@test Pools.depth(pool) == 2
# calling drain! removes all objects for reuse
drain!(pool)
@test Pools.permits(pool) == 1
@test Pools.depth(pool) == 0

x2 = acquire(() -> 9, pool)
@test x2 == 9
@test Pools.permits(pool) == 2
@test Pools.depth(pool) == 0
end

@testset "keyed pool" begin
# now test a keyed pool
pool = Pool{String, Int}(3)
@test keytype(pool) === String
@test valtype(pool) === Int

@test Pools.max(pool) == 3
@test Pools.permits(pool) == 0
@test Pools.depth(pool) == 0

# acquire an object from the pool
x1 = acquire(() -> 1, pool, "a")
# no existing objects in the pool, so our function was called to create a new one
@test x1 == 1
@test Pools.permits(pool) == 1
@test Pools.depth(pool) == 0

# release back to the pool for reuse
release(pool, "a", x1)
@test Pools.permits(pool) == 0
@test Pools.depth(pool) == 1

# test for a different key
x2 = acquire(() -> 2, pool, "b")
# there's an existing object, but for a different key, so we don't reuse
@test x2 == 2
@test Pools.permits(pool) == 1
@test Pools.depth(pool) == 1

# acquire another object from the pool
x1 = acquire(() -> 2, pool, "a")
# this time, the pool had an existing object, so our function was not called
@test x1 == 1
@test Pools.permits(pool) == 2
@test Pools.depth(pool) == 0

x3 = acquire(() -> 3, pool, "a")
@test x3 == 3
@test Pools.permits(pool) == 3
@test Pools.depth(pool) == 0

# the pool is now at capacity, so the next acquire will block until an object is released
# even though we've acquired using different keys, the capacity is shared across the pool
@test Pools.permits(pool) == Pools.max(pool)
tsk = @async acquire(() -> 4, pool, "c"; forcenew=true)
yield()
@test !istaskdone(tsk)
Expand All @@ -91,13 +166,27 @@ using ConcurrentUtilities, Test
x1 = fetch(tsk)
# even though we released 1 for reuse, we passed forcenew, so our function was called to create new
@test x1 == 4
@test Pools.permits(pool) == 3
@test Pools.depth(pool) == 1

# error to try and provide an invalid key to a keyed pool
@test_throws ArgumentError acquire(() -> 1, pool, 1)
# error to release an invalid key back to the pool
@test_throws KeyError release(pool, "z", 1)
@test Pools.permits(pool) == 3
@test Pools.depth(pool) == 1

# error to *not* provide a key to a keyed pool
@test_throws ArgumentError acquire(() -> 1, pool)
@test Pools.permits(pool) == 3
@test Pools.depth(pool) == 1

# error to *not* provide a key when releasing to a keyed pool
@test_throws ArgumentError release(pool)
@test Pools.permits(pool) == 3
@test Pools.depth(pool) == 1

# error to release an invalid key back to the pool
@test_throws KeyError release(pool, "z", 1)
@test_broken Pools.permits(pool) == 3
@test Pools.depth(pool) == 1
end
end