From 7aee8a54449c1de67e79d546f43c1c39cde4c8ea Mon Sep 17 00:00:00 2001 From: Urvi Date: Wed, 21 Feb 2024 23:51:38 -0800 Subject: [PATCH] Improved uploader shutdown logic --- exp/services/ledgerexporter/internal/app.go | 2 - .../ledgerexporter/internal/uploader.go | 34 ++++++------ .../ledgerexporter/internal/uploader_test.go | 54 +++++++++++++++++++ 3 files changed, 73 insertions(+), 17 deletions(-) diff --git a/exp/services/ledgerexporter/internal/app.go b/exp/services/ledgerexporter/internal/app.go index 3b3f22174c..3c57405578 100644 --- a/exp/services/ledgerexporter/internal/app.go +++ b/exp/services/ledgerexporter/internal/app.go @@ -67,7 +67,6 @@ func (a *App) Run() { if err != nil && !errors.Is(err, context.Canceled) { logger.Errorf("Error executing Uploader: %v", err) cancel() - return } }() @@ -78,7 +77,6 @@ func (a *App) Run() { if err != nil && !errors.Is(err, context.Canceled) { logger.Errorf("Error executing ExportManager: %v", err) cancel() - return } }() diff --git a/exp/services/ledgerexporter/internal/uploader.go b/exp/services/ledgerexporter/internal/uploader.go index 3bd8faea89..ae6a96d649 100644 --- a/exp/services/ledgerexporter/internal/uploader.go +++ b/exp/services/ledgerexporter/internal/uploader.go @@ -2,6 +2,7 @@ package exporter import ( "context" + "time" "github.com/pkg/errors" ) @@ -39,30 +40,33 @@ func (u *uploader) Upload(ctx context.Context, metaArchive *LedgerMetaArchive) e // Run starts the uploader, continuously listening for LedgerMetaArchive objects to upload. func (u *uploader) Run(ctx context.Context) error { + uploadCtx, cancel := context.WithCancel(context.Background()) + go func() { + <-ctx.Done() + logger.Info("Context done, waiting for remaining uploads to complete...") + // allow up to 10 seconds to upload remaining objects from metaArchiveCh + <-time.After(10 * time.Second) + logger.Info("Timeout reached, canceling remaining uploads...") + cancel() + }() for { select { - case <-ctx.Done(): - // Drain the channel and upload pending objects before exiting. - logger.Info("Stopping uploader, draining remaining objects from channel...") - for obj := range u.metaArchiveCh { - err := u.Upload(ctx, obj) - if err != nil { - logger.WithError(err).Errorf("Error uploading %s during shutdown", obj.objectKey) - } - } - logger.WithError(ctx.Err()).Info("Uploader stopped") - return ctx.Err() + case <-uploadCtx.Done(): + return uploadCtx.Err() + case metaObject, ok := <-u.metaArchiveCh: if !ok { - logger.Info("Export object channel closed, stopping uploader") - return nil + logger.Info("Meta archive channel closed, stopping uploader") + return errors.New("Meta archive channel closed") } //Upload the received LedgerMetaArchive. - err := u.Upload(ctx, metaObject) + err := u.Upload(uploadCtx, metaObject) if err != nil { - return errors.Wrapf(err, "error uploading %s", metaObject.objectKey) + return err } + logger.Infof("Uploaded %s successfully", metaObject.objectKey) } } + return ctx.Err() } diff --git a/exp/services/ledgerexporter/internal/uploader_test.go b/exp/services/ledgerexporter/internal/uploader_test.go index 7339fb5557..a73d6bf535 100644 --- a/exp/services/ledgerexporter/internal/uploader_test.go +++ b/exp/services/ledgerexporter/internal/uploader_test.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "testing" + "time" "github.com/stellar/go/support/errors" "github.com/stretchr/testify/assert" @@ -72,3 +73,56 @@ func (s *UploaderSuite) TestUploadPutError() { err := dataUploader.Upload(context.Background(), archive) assert.Equal(s.T(), fmt.Sprintf("error uploading %s: error in PutFileIfNotExists", key), err.Error()) } + +func (s *UploaderSuite) TestRunChannelClose() { + s.mockDataStore.On("PutFileIfNotExists", mock.Anything, mock.Anything).Return(nil) + + objectCh := make(chan *LedgerMetaArchive, 1) + go func() { + key, start, end := "test", uint32(1), uint32(100) + for i := start; i <= end; i++ { + objectCh <- NewLedgerMetaArchive(key, i, i) + } + <-time.After(time.Second * 2) + close(objectCh) + }() + + dataUploader := uploader{destination: &s.mockDataStore, metaArchiveCh: objectCh} + err := dataUploader.Run(context.Background()) + + assert.EqualError(s.T(), err, "Meta archive channel closed") +} + +func (s *UploaderSuite) TestRunContextCancel() { + objectCh := make(chan *LedgerMetaArchive, 1) + s.mockDataStore.On("PutFileIfNotExists", mock.Anything, mock.Anything).Return(nil) + ctx, cancel := context.WithCancel(context.Background()) + + go func() { + for { + objectCh <- NewLedgerMetaArchive("test", 1, 1) + } + }() + + go func() { + <-time.After(time.Second * 2) + cancel() + }() + + dataUploader := uploader{destination: &s.mockDataStore, metaArchiveCh: objectCh} + err := dataUploader.Run(ctx) + + assert.EqualError(s.T(), err, "context canceled") +} + +func (s *UploaderSuite) TestRunUploadError() { + objectCh := make(chan *LedgerMetaArchive, 10) + objectCh <- NewLedgerMetaArchive("test", 1, 1) + + s.mockDataStore.On("PutFileIfNotExists", "test", + mock.Anything).Return(errors.New("Put error")) + + dataUploader := uploader{destination: &s.mockDataStore, metaArchiveCh: objectCh} + err := dataUploader.Run(context.Background()) + assert.Equal(s.T(), "error uploading test: Put error", err.Error()) +}