Skip to content

Commit

Permalink
Improved uploader shutdown logic
Browse files Browse the repository at this point in the history
  • Loading branch information
urvisavla committed Feb 22, 2024
1 parent d4705df commit 7aee8a5
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 17 deletions.
2 changes: 0 additions & 2 deletions exp/services/ledgerexporter/internal/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ func (a *App) Run() {
if err != nil && !errors.Is(err, context.Canceled) {
logger.Errorf("Error executing Uploader: %v", err)
cancel()
return
}
}()

Expand All @@ -78,7 +77,6 @@ func (a *App) Run() {
if err != nil && !errors.Is(err, context.Canceled) {
logger.Errorf("Error executing ExportManager: %v", err)
cancel()
return
}
}()

Expand Down
34 changes: 19 additions & 15 deletions exp/services/ledgerexporter/internal/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package exporter

import (
"context"
"time"

"github.com/pkg/errors"
)
Expand Down Expand Up @@ -39,30 +40,33 @@ func (u *uploader) Upload(ctx context.Context, metaArchive *LedgerMetaArchive) e

// Run starts the uploader, continuously listening for LedgerMetaArchive objects to upload.
func (u *uploader) Run(ctx context.Context) error {
uploadCtx, cancel := context.WithCancel(context.Background())
go func() {
<-ctx.Done()
logger.Info("Context done, waiting for remaining uploads to complete...")
// allow up to 10 seconds to upload remaining objects from metaArchiveCh
<-time.After(10 * time.Second)

Check failure on line 48 in exp/services/ledgerexporter/internal/uploader.go

View workflow job for this annotation

GitHub Actions / golangci

mnd: Magic number: 10, in <argument> detected (gomnd)
logger.Info("Timeout reached, canceling remaining uploads...")
cancel()
}()

for {
select {
case <-ctx.Done():
// Drain the channel and upload pending objects before exiting.
logger.Info("Stopping uploader, draining remaining objects from channel...")
for obj := range u.metaArchiveCh {
err := u.Upload(ctx, obj)
if err != nil {
logger.WithError(err).Errorf("Error uploading %s during shutdown", obj.objectKey)
}
}
logger.WithError(ctx.Err()).Info("Uploader stopped")
return ctx.Err()
case <-uploadCtx.Done():
return uploadCtx.Err()

case metaObject, ok := <-u.metaArchiveCh:
if !ok {
logger.Info("Export object channel closed, stopping uploader")
return nil
logger.Info("Meta archive channel closed, stopping uploader")
return errors.New("Meta archive channel closed")
}
//Upload the received LedgerMetaArchive.
err := u.Upload(ctx, metaObject)
err := u.Upload(uploadCtx, metaObject)
if err != nil {
return errors.Wrapf(err, "error uploading %s", metaObject.objectKey)
return err
}
logger.Infof("Uploaded %s successfully", metaObject.objectKey)
}
}
return ctx.Err()

Check failure on line 71 in exp/services/ledgerexporter/internal/uploader.go

View workflow job for this annotation

GitHub Actions / golangci

unreachable: unreachable code (govet)
}
54 changes: 54 additions & 0 deletions exp/services/ledgerexporter/internal/uploader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io"
"testing"
"time"

"github.com/stellar/go/support/errors"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -72,3 +73,56 @@ func (s *UploaderSuite) TestUploadPutError() {
err := dataUploader.Upload(context.Background(), archive)
assert.Equal(s.T(), fmt.Sprintf("error uploading %s: error in PutFileIfNotExists", key), err.Error())
}

func (s *UploaderSuite) TestRunChannelClose() {
s.mockDataStore.On("PutFileIfNotExists", mock.Anything, mock.Anything).Return(nil)

objectCh := make(chan *LedgerMetaArchive, 1)
go func() {
key, start, end := "test", uint32(1), uint32(100)
for i := start; i <= end; i++ {
objectCh <- NewLedgerMetaArchive(key, i, i)
}
<-time.After(time.Second * 2)
close(objectCh)
}()

dataUploader := uploader{destination: &s.mockDataStore, metaArchiveCh: objectCh}
err := dataUploader.Run(context.Background())

assert.EqualError(s.T(), err, "Meta archive channel closed")
}

func (s *UploaderSuite) TestRunContextCancel() {
objectCh := make(chan *LedgerMetaArchive, 1)
s.mockDataStore.On("PutFileIfNotExists", mock.Anything, mock.Anything).Return(nil)
ctx, cancel := context.WithCancel(context.Background())

go func() {
for {
objectCh <- NewLedgerMetaArchive("test", 1, 1)
}
}()

go func() {
<-time.After(time.Second * 2)
cancel()
}()

dataUploader := uploader{destination: &s.mockDataStore, metaArchiveCh: objectCh}
err := dataUploader.Run(ctx)

assert.EqualError(s.T(), err, "context canceled")
}

func (s *UploaderSuite) TestRunUploadError() {
objectCh := make(chan *LedgerMetaArchive, 10)
objectCh <- NewLedgerMetaArchive("test", 1, 1)

s.mockDataStore.On("PutFileIfNotExists", "test",
mock.Anything).Return(errors.New("Put error"))

dataUploader := uploader{destination: &s.mockDataStore, metaArchiveCh: objectCh}
err := dataUploader.Run(context.Background())
assert.Equal(s.T(), "error uploading test: Put error", err.Error())
}

0 comments on commit 7aee8a5

Please sign in to comment.