Skip to content

Commit

Permalink
Revert "when que overflowed: wait a bit and stop accepting, return er…
Browse files Browse the repository at this point in the history
…rors on feed"

This reverts commit 364b3d7.
  • Loading branch information
Termina1 committed Dec 17, 2024
1 parent b180b17 commit faf7a12
Showing 1 changed file with 8 additions and 12 deletions.
20 changes: 8 additions & 12 deletions utils/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,16 @@ import (
"context"
"errors"
"sync"
"sync/atomic"
"time"
)

type FDQueue[S ~[]E, E any] struct {
ctx context.Context
close context.CancelFunc
timelimit time.Duration
ch chan E
active sync.WaitGroup
batchSize int
overflowed atomic.Bool
ctx context.Context
close context.CancelFunc
timelimit time.Duration
ch chan E
active sync.WaitGroup
batchSize int
}

var ErrClosed = errors.New("[chotki] feed/drain queue is closed")
Expand All @@ -40,7 +38,7 @@ func (q *FDQueue[S, E]) Close() error {
}

func (q *FDQueue[S, E]) Drain(ctx context.Context, recs S) error {
if q.ctx.Err() != nil || q.overflowed.Load() {
if q.ctx.Err() != nil {
return ErrClosed
}
q.active.Add(1)
Expand All @@ -51,8 +49,6 @@ func (q *FDQueue[S, E]) Drain(ctx context.Context, recs S) error {
break
case <-q.ctx.Done():
break
case <-time.After(q.timelimit):
q.overflowed.Store(true)
case q.ch <- pkg:
}

Expand All @@ -61,7 +57,7 @@ func (q *FDQueue[S, E]) Drain(ctx context.Context, recs S) error {
}

func (q *FDQueue[S, E]) Feed(ctx context.Context) (recs S, err error) {
if q.ctx.Err() != nil || q.overflowed.Load() {
if q.ctx.Err() != nil {
return nil, ErrClosed
}
q.active.Add(1)
Expand Down

0 comments on commit faf7a12

Please sign in to comment.