Skip to content

Commit

Permalink
pkg/accesslogs: rework sequentialUploader's shutdown
Browse files Browse the repository at this point in the history
The "fix" for uploader's shutdown introduced in b29f6d4 has sadly only
moved the problem somewhere else. This change aims to fix the shutdown
of sequentialUploader while maintaining the following invariants:

- after close is called, uploader realizes that the shutdown has begun
    - starts working until the queue is empty, then shuts down
- close is blocked until uploader drains the current upload queue

Change-Id: I5cd7aee93d9703eef4c5b576b3ee2f3fac220171
  • Loading branch information
amwolff committed Jul 2, 2024
1 parent 360f499 commit ab52919
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 32 deletions.
4 changes: 2 additions & 2 deletions pkg/accesslogs/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func BenchmarkParallelQueueEntry(b *testing.B) {
log := zaptest.NewLogger(b)
defer ctx.Check(log.Sync)

s := newInMemoryStorage()
s := noopStorage{}
p := NewProcessor(log, Options{})
defer ctx.Check(p.Close)

Expand Down Expand Up @@ -207,7 +207,7 @@ func BenchmarkQueueEntry(b *testing.B) {
log := zaptest.NewLogger(b)
defer ctx.Check(log.Sync)

s := newInMemoryStorage()
s := noopStorage{}
p := NewProcessor(log, Options{})
defer ctx.Check(p.Close)

Expand Down
84 changes: 54 additions & 30 deletions pkg/accesslogs/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,13 @@ type sequentialUploader struct {
retryLimit int
shutdownTimeout time.Duration

mu sync.Mutex
queue chan upload
queueLen int
mu sync.Mutex
queue chan upload
queueLen int
queueClosed bool

closing sync2.Event
queueDrained sync2.Event
closed bool
}

type sequentialUploaderOptions struct {
Expand All @@ -138,7 +140,7 @@ var monQueueLength = mon.IntVal("queue_length")

func (u *sequentialUploader) queueUpload(store Storage, bucket, key string, body []byte) error {
u.mu.Lock()
if u.closed {
if u.queueClosed {
u.mu.Unlock()
return ErrClosed
}
Expand Down Expand Up @@ -168,7 +170,7 @@ func (u *sequentialUploader) queueUpload(store Storage, bucket, key string, body

func (u *sequentialUploader) queueUploadWithoutQueueLimit(store Storage, bucket, key string, body []byte) error {
u.mu.Lock()
if u.closed {
if u.queueClosed {
u.mu.Unlock()
return ErrClosed
}
Expand All @@ -193,13 +195,15 @@ func (u *sequentialUploader) queueUploadWithoutQueueLimit(store Storage, bucket,

func (u *sequentialUploader) close() error {
u.mu.Lock()
if u.closed {
if u.queueClosed {
u.mu.Unlock()
return nil
}
u.closed = true
u.queueClosed = true
u.mu.Unlock()

u.closing.Signal()

ctx, cancel := context.WithTimeout(context.Background(), u.shutdownTimeout)
defer cancel()

Expand All @@ -213,37 +217,57 @@ func (u *sequentialUploader) close() error {
}

func (u *sequentialUploader) run() error {
for up := range u.queue {
// TODO(artur): we need to figure out what context we want to
// pass here. Most likely a context with configurable timeout.
if err := up.store.Put(context.TODO(), up.bucket, up.key, up.body); err != nil {
if up.retries == u.retryLimit {
u.decrementQueueLen()
mon.Event("upload_dropped")
u.log.Error("retry limit reached",
zap.String("bucket", up.bucket),
zap.String("prefix", up.key),
zap.Error(err),
)
continue // NOTE(artur): here we could spill to disk or something
var closing bool
for {
select {
case up := <-u.queue:
// TODO(artur): we need to figure out what context we want
// to pass here. WithTimeout(Background, …)?
if err := up.store.Put(context.TODO(), up.bucket, up.key, up.body); err != nil {
if up.retries == u.retryLimit {
mon.Event("upload_dropped")
u.log.Error("retry limit reached",
zap.String("bucket", up.bucket),
zap.String("prefix", up.key),
zap.Error(err),
)
if done := u.decrementQueueLen(closing); done {
return nil
}
continue // NOTE(artur): here we could spill to disk or something
}
up.retries++
u.queue <- up // failure; don't decrement u.queueLen
mon.Event("upload_failed")
continue
}
mon.Event("upload_successful")
if done := u.decrementQueueLen(closing); done {
return nil
}
case <-u.closing.Signaled():
u.mu.Lock()
if u.queueLen == 0 {
u.mu.Unlock()
u.queueDrained.Signal()
return nil
} else {
u.mu.Unlock()
closing = true
}
up.retries++
u.queue <- up // failure; don't decrement u.queueLen
mon.Event("upload_failed")
continue
}
u.decrementQueueLen()
mon.Event("upload_successful")
}
return nil
}

func (u *sequentialUploader) decrementQueueLen() {
func (u *sequentialUploader) decrementQueueLen(closing bool) bool {
u.mu.Lock()
u.queueLen--
monQueueLength.Observe(int64(u.queueLen))
if u.queueLen == 0 && u.closed {
if u.queueLen == 0 && closing {
u.mu.Unlock()
u.queueDrained.Signal()
return true
}
u.mu.Unlock()
return false
}

0 comments on commit ab52919

Please sign in to comment.