diff --git a/go.mod b/go.mod index d95ddc5..a6fc0a0 100644 --- a/go.mod +++ b/go.mod @@ -3,8 +3,9 @@ module github.com/lxzan/concurrency go 1.20 require ( + github.com/lxzan/dao v1.1.7 github.com/pkg/errors v0.9.1 - github.com/stretchr/testify v1.8.1 + github.com/stretchr/testify v1.8.4 ) require ( diff --git a/go.sum b/go.sum index 5803f2d..f7a38ef 100644 --- a/go.sum +++ b/go.sum @@ -1,19 +1,14 @@ -github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/lxzan/dao v1.1.7 h1:I049e67buJIpr4QJ/vJbHSjKMLN4ZJlSMeK3Rq+CJl8= +github.com/lxzan/dao v1.1.7/go.mod h1:5ChTIo7RSZ4upqRo16eicJ3XdJWhGwgMIsyuGLMUofM= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= -github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= -github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= -github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/groups/group.go b/groups/group.go index 53bf698..217b551 100644 --- a/groups/group.go +++ b/groups/group.go @@ -20,18 +20,18 @@ type ( Caller func(args any, f func(any) error) error Group[T any] struct { - options *options // 配置 - mu sync.Mutex // 锁 - ctx context.Context // 上下文 - cancelFunc context.CancelFunc // 取消函数 - canceled atomic.Uint32 // 是否已取消 - errs []error // 错误 - done chan bool // 完成信号 - q []T // 任务队列 - taskDone int64 // 已完成任务数量 - taskTotal int64 // 总任务数量 - OnMessage func(args T) error // 任务处理 - OnError func(err error) // 错误处理 + options *options // 配置 + mu sync.Mutex // 锁 + ctx context.Context // 上下文 + cancelFunc context.CancelFunc // 取消函数 + canceled atomic.Uint32 // 是否已取消 + errs []error // 错误 + done chan bool // 完成信号 + q []T // 任务队列 + taskDone int64 // 已完成任务数量 + taskTotal int64 // 总任务数量 + OnMessage func(args T) error // 任务处理 + OnError func(args T, err error) // 错误处理 } ) @@ -53,7 +53,7 @@ func New[T any](opts ...Option) *Group[T] { c.OnMessage = func(args T) error { return nil } - c.OnError = func(err error) {} + c.OnError = func(args T, err error) {} return c } @@ -104,7 +104,7 @@ func (c *Group[T]) do(args T) { c.mu.Lock() c.errs = append(c.errs, err) c.mu.Unlock() - c.OnError(err) + c.OnError(args, err) } if c.incrAndIsDone() { diff --git a/groups/group_test.go b/groups/group_test.go index b890b59..fddad74 100644 --- a/groups/group_test.go +++ b/groups/group_test.go @@ -96,7 +96,7 @@ func TestNewTaskGroup(t *testing.T) { return nil } } - ctl.OnError = func(err error) { + ctl.OnError = func(args int, err error) { ctl.Cancel() } err := ctl.Start() diff --git a/internal/queue.go b/internal/queue.go deleted file mode 100644 index 0685a49..0000000 --- a/internal/queue.go +++ /dev/null @@ -1,126 +0,0 @@ -package internal - -type ( - pointer uint32 - - element[T any] struct { - prev, addr, next pointer - Value T - } - - Queue[T any] struct { - head, tail pointer // 头尾指针 - length int // 长度 - stack stack // 回收站 - elements []element[T] // 元素列表 - template element[T] // 空值模板 - } -) - -func NewQueue[T any](capacity uint32) *Queue[T] { - return &Queue[T]{elements: make([]element[T], 1, 1+capacity)} -} - -func (c *Queue[T]) get(addr pointer) *element[T] { - if addr > 0 { - return &(c.elements[addr]) - } - return nil -} - -func (c *Queue[T]) getElement() *element[T] { - if c.stack.Len() > 0 { - addr := c.stack.Pop() - v := c.get(addr) - v.addr = addr - return v - } - - addr := pointer(len(c.elements)) - c.elements = append(c.elements, c.template) - v := c.get(addr) - v.addr = addr - return v -} - -func (c *Queue[T]) Reset() { - c.head, c.tail, c.length = 0, 0, 0 - c.stack = c.stack[:0] - c.elements = c.elements[:1] -} - -func (c *Queue[T]) Len() int { - return c.length -} - -func (c *Queue[T]) Front() (value T) { - if c.head > 0 { - return c.get(c.head).Value - } - return value -} - -func (c *Queue[T]) Push(value T) { - ele := c.getElement() - ele.Value = value - c.length++ - - if c.tail != 0 { - tail := c.get(c.tail) - tail.next = ele.addr - ele.prev = tail.addr - c.tail = ele.addr - return - } - - c.head = ele.addr - c.tail = ele.addr -} - -func (c *Queue[T]) Pop() (value T) { - ele := c.get(c.head) - if ele == nil { - return value - } - - c.head = ele.next - if head := c.get(c.head); head != nil { - head.prev = 0 - } - - c.length-- - value = ele.Value - c.stack.Push(ele.addr) - *ele = c.template - if c.length == 0 { - c.tail = 0 - c.Reset() - } - - return value -} - -func (c *Queue[T]) Range(f func(v T) bool) { - for i := c.get(c.head); i != nil; i = c.get(i.next) { - if !f(i.Value) { - break - } - } -} - -type stack []pointer - -func (c *stack) Len() int { - return len(*c) -} - -func (c *stack) Push(v pointer) { - *c = append(*c, v) -} - -func (c *stack) Pop() pointer { - var n = c.Len() - var v = (*c)[n-1] - *c = (*c)[:n-1] - return v -} diff --git a/internal/queue_test.go b/internal/queue_test.go deleted file mode 100644 index dcbc29c..0000000 --- a/internal/queue_test.go +++ /dev/null @@ -1,138 +0,0 @@ -package internal - -import ( - "container/list" - "github.com/stretchr/testify/assert" - "math/rand" - "testing" -) - -func TestQueue_Range(t *testing.T) { - const count = 1000 - var q = NewQueue[int](0) - var a []int - for i := 0; i < count; i++ { - v := rand.Intn(count) - q.Push(v) - a = append(a, v) - } - - assert.Equal(t, q.Len(), count) - - var b []int - q.Range(func(v int) bool { - b = append(b, v) - return len(b) < 100 - }) - assert.Equal(t, len(b), 100) - - var i = 0 - for q.Len() > 0 { - v := q.Pop() - assert.Equal(t, a[i], v) - i++ - } -} - -func TestQueue_Addr(t *testing.T) { - const count = 1000 - var q = NewQueue[int](0) - for i := 0; i < count; i++ { - v := rand.Intn(count) - if v&7 == 0 { - q.Pop() - } else { - q.Push(v) - } - } - - var sum = 0 - for i := q.get(q.head); i != nil; i = q.get(i.next) { - sum++ - - prev := q.get(i.prev) - next := q.get(i.next) - if prev != nil { - assert.Equal(t, prev.next, i.addr) - } - if next != nil { - assert.Equal(t, i.addr, next.prev) - } - } - - assert.Equal(t, q.Len(), sum) - if head := q.get(q.head); head != nil { - assert.Zero(t, head.prev) - } - if tail := q.get(q.tail); tail != nil { - assert.Zero(t, tail.next) - } -} - -func TestQueue_Pop(t *testing.T) { - var q = NewQueue[int](0) - assert.Zero(t, q.Front()) - assert.Zero(t, q.Pop()) - - q.Push(1) - q.Push(2) - q.Push(3) - q.Pop() - q.Push(4) - q.Push(5) - q.Pop() - - var arr []int - q.Range(func(v int) bool { - arr = append(arr, v) - return true - }) - assert.Equal(t, q.Front(), 3) - assert.True(t, IsSameSlice(arr, []int{3, 4, 5})) - assert.Equal(t, len(q.elements), 5) - assert.Equal(t, q.stack.Len(), 1) -} - -func TestQueue_Random(t *testing.T) { - var count = 10000 - var q = NewQueue[int](0) - var linkedlist = list.New() - for i := 0; i < count; i++ { - var flag = rand.Intn(4) - var val = rand.Int() - switch flag { - case 0, 1, 2: - q.Push(val) - linkedlist.PushBack(val) - default: - if q.Len() > 0 { - q.Pop() - linkedlist.Remove(linkedlist.Front()) - } - } - } - - for i := linkedlist.Front(); i != nil; i = i.Next() { - var val = q.Pop() - assert.Equal(t, i.Value, val) - } -} - -func BenchmarkQueue_PushAndPop(b *testing.B) { - const count = 1000 - var q = NewQueue[int](count) - for i := 0; i < b.N; i++ { - for j := 0; j < count/4; j++ { - q.Push(j) - } - for j := 0; j < count/4; j++ { - q.Pop() - } - for j := 0; j < count/4; j++ { - q.Push(j) - } - for j := 0; j < count/4; j++ { - q.Pop() - } - } -} diff --git a/queues/single_queue.go b/queues/single_queue.go index 4f28e82..0c3fd2f 100644 --- a/queues/single_queue.go +++ b/queues/single_queue.go @@ -2,7 +2,7 @@ package queues import ( "context" - "github.com/lxzan/concurrency/internal" + "github.com/lxzan/dao/deque" "sync" "time" ) @@ -12,17 +12,17 @@ func newSingleQueue(o *options) *singleQueue { return &singleQueue{ conf: o, maxConcurrency: int32(o.concurrency), - q: internal.NewQueue[Job](8), + q: deque.New[Job](8), } } type singleQueue struct { mu sync.Mutex // 锁 conf *options - q *internal.Queue[Job] // 任务队列 - maxConcurrency int32 // 最大并发 - curConcurrency int32 // 当前并发 - stopped bool // 是否关闭 + q *deque.Deque[Job] // 任务队列 + maxConcurrency int32 // 最大并发 + curConcurrency int32 // 当前并发 + stopped bool // 是否关闭 } func (c *singleQueue) Stop(ctx context.Context) error { @@ -58,13 +58,13 @@ func (c *singleQueue) getJob(newJob Job, delta int32) Job { defer c.mu.Unlock() if !c.stopped && newJob != nil { - c.q.Push(newJob) + c.q.PushBack(newJob) } c.curConcurrency += delta if c.curConcurrency >= c.maxConcurrency { return nil } - if job := c.q.Pop(); job != nil { + if job := c.q.PopFront(); job != nil { c.curConcurrency++ return job }