diff --git a/.CHANGELOG.md b/.CHANGELOG.md index 6d0966d..5436d92 100644 --- a/.CHANGELOG.md +++ b/.CHANGELOG.md @@ -1,5 +1,7 @@ # 开发中 - [syncx: 支持分key加锁](https://github.com/ecodeclub/ekit/pull/224) +- [syncx: 添加具有最大申请次数限制的LimitPool](https://github.com/ecodeclub/ekit/pull/233) + # v0.0.8 - [atomicx: 泛型封装 atomic.Value](https://github.com/gotomicro/ekit/pull/101) - [queue: API 定义](https://github.com/gotomicro/ekit/pull/109) diff --git a/syncx/limit_pool.go b/syncx/limit_pool.go new file mode 100644 index 0000000..b7427f7 --- /dev/null +++ b/syncx/limit_pool.go @@ -0,0 +1,53 @@ +// Copyright 2021 ecodeclub +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package syncx + +import ( + "sync/atomic" +) + +// LimitPool 是对 Pool 的简单封装允许用户通过控制一段时间内对Pool的令牌申请次数来间接控制Pool中对象的内存总占用量 +type LimitPool[T any] struct { + pool *Pool[T] + tokens *atomic.Int32 +} + +// NewLimitPool 创建一个 LimitPool 实例 +// maxTokens 表示一段时间内的允许发放的最大令牌数 +// factory 必须返回 T 类型的值,并且不能返回 nil +func NewLimitPool[T any](maxTokens int, factory func() T) *LimitPool[T] { + var tokens atomic.Int32 + tokens.Add(int32(maxTokens)) + return &LimitPool[T]{ + pool: NewPool[T](factory), + tokens: &tokens, + } +} + +// Get 取出一个元素 +func (l *LimitPool[T]) Get() T { + if l.tokens.Add(-1) < 0 { + l.tokens.Add(1) + var zero T + return zero + } + return l.pool.Get() +} + +// Put 放回去一个元素 +func (l *LimitPool[T]) Put(t T) { + l.pool.Put(t) + l.tokens.Add(1) +} diff --git a/syncx/limit_pool_test.go b/syncx/limit_pool_test.go new file mode 100644 index 0000000..aad405c --- /dev/null +++ b/syncx/limit_pool_test.go @@ -0,0 +1,68 @@ +// Copyright 2021 ecodeclub +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package syncx + +import ( + "bytes" + "sync" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestLimitPool(t *testing.T) { + + expectedMaxAttempts := 3 + expectedVal := []byte("A") + + pool := NewLimitPool(expectedMaxAttempts, func() []byte { + var buffer bytes.Buffer + buffer.Write(expectedVal) + return buffer.Bytes() + }) + + var wg sync.WaitGroup + bufChan := make(chan []byte, expectedMaxAttempts) + + // 从Pool中并发获取缓冲区 + for i := 0; i < expectedMaxAttempts; i++ { + wg.Add(1) + go func() { + defer wg.Done() + + buf := pool.Get() + + assert.NotZero(t, buf) + assert.Equal(t, string(expectedVal), string(buf)) + + bufChan <- buf + }() + } + + wg.Wait() + close(bufChan) + + // 超过最大申请次数返回零值 + assert.Zero(t, pool.Get()) + + // 归还一个 + pool.Put(<-bufChan) + + // 再次申请仍可以拿到非零值缓冲区 + assert.NotZero(t, string(expectedVal), string(pool.Get())) + + // 超过最大申请次数返回零值 + assert.Zero(t, pool.Get()) +}