From 8d91ef39c8b88936d7c5cb93a2560181a721132d Mon Sep 17 00:00:00 2001
From: Urvi <urvi.savla@stellar.org>
Date: Wed, 21 Feb 2024 23:51:38 -0800
Subject: [PATCH] Improved uploader shutdown logic

---
 exp/services/ledgerexporter/internal/app.go   |  2 -
 .../ledgerexporter/internal/uploader.go       | 34 ++++++------
 .../ledgerexporter/internal/uploader_test.go  | 54 +++++++++++++++++++
 3 files changed, 73 insertions(+), 17 deletions(-)

diff --git a/exp/services/ledgerexporter/internal/app.go b/exp/services/ledgerexporter/internal/app.go
index 3b3f22174c..3c57405578 100644
--- a/exp/services/ledgerexporter/internal/app.go
+++ b/exp/services/ledgerexporter/internal/app.go
@@ -67,7 +67,6 @@ func (a *App) Run() {
 		if err != nil && !errors.Is(err, context.Canceled) {
 			logger.Errorf("Error executing Uploader: %v", err)
 			cancel()
-			return
 		}
 	}()
 
@@ -78,7 +77,6 @@ func (a *App) Run() {
 		if err != nil && !errors.Is(err, context.Canceled) {
 			logger.Errorf("Error executing ExportManager: %v", err)
 			cancel()
-			return
 		}
 	}()
 
diff --git a/exp/services/ledgerexporter/internal/uploader.go b/exp/services/ledgerexporter/internal/uploader.go
index 3bd8faea89..ae6a96d649 100644
--- a/exp/services/ledgerexporter/internal/uploader.go
+++ b/exp/services/ledgerexporter/internal/uploader.go
@@ -2,6 +2,7 @@ package exporter
 
 import (
 	"context"
+	"time"
 
 	"github.com/pkg/errors"
 )
@@ -39,30 +40,33 @@ func (u *uploader) Upload(ctx context.Context, metaArchive *LedgerMetaArchive) e
 
 // Run starts the uploader, continuously listening for LedgerMetaArchive objects to upload.
 func (u *uploader) Run(ctx context.Context) error {
+	uploadCtx, cancel := context.WithCancel(context.Background())
+	go func() {
+		<-ctx.Done()
+		logger.Info("Context done, waiting for remaining uploads to complete...")
+		// allow up to 10 seconds to upload remaining objects from metaArchiveCh
+		<-time.After(10 * time.Second)
+		logger.Info("Timeout reached, canceling remaining uploads...")
+		cancel()
+	}()
 
 	for {
 		select {
-		case <-ctx.Done():
-			// Drain the channel and upload pending objects before exiting.
-			logger.Info("Stopping uploader, draining remaining objects from channel...")
-			for obj := range u.metaArchiveCh {
-				err := u.Upload(ctx, obj)
-				if err != nil {
-					logger.WithError(err).Errorf("Error uploading %s during shutdown", obj.objectKey)
-				}
-			}
-			logger.WithError(ctx.Err()).Info("Uploader stopped")
-			return ctx.Err()
+		case <-uploadCtx.Done():
+			return uploadCtx.Err()
+
 		case metaObject, ok := <-u.metaArchiveCh:
 			if !ok {
-				logger.Info("Export object channel closed, stopping uploader")
-				return nil
+				logger.Info("Meta archive channel closed, stopping uploader")
+				return errors.New("Meta archive channel closed")
 			}
 			//Upload the received LedgerMetaArchive.
-			err := u.Upload(ctx, metaObject)
+			err := u.Upload(uploadCtx, metaObject)
 			if err != nil {
-				return errors.Wrapf(err, "error uploading %s", metaObject.objectKey)
+				return err
 			}
+			logger.Infof("Uploaded %s successfully", metaObject.objectKey)
 		}
 	}
+	return ctx.Err()
 }
diff --git a/exp/services/ledgerexporter/internal/uploader_test.go b/exp/services/ledgerexporter/internal/uploader_test.go
index 7339fb5557..a73d6bf535 100644
--- a/exp/services/ledgerexporter/internal/uploader_test.go
+++ b/exp/services/ledgerexporter/internal/uploader_test.go
@@ -6,6 +6,7 @@ import (
 	"fmt"
 	"io"
 	"testing"
+	"time"
 
 	"github.com/stellar/go/support/errors"
 	"github.com/stretchr/testify/assert"
@@ -72,3 +73,56 @@ func (s *UploaderSuite) TestUploadPutError() {
 	err := dataUploader.Upload(context.Background(), archive)
 	assert.Equal(s.T(), fmt.Sprintf("error uploading %s: error in PutFileIfNotExists", key), err.Error())
 }
+
+func (s *UploaderSuite) TestRunChannelClose() {
+	s.mockDataStore.On("PutFileIfNotExists", mock.Anything, mock.Anything).Return(nil)
+
+	objectCh := make(chan *LedgerMetaArchive, 1)
+	go func() {
+		key, start, end := "test", uint32(1), uint32(100)
+		for i := start; i <= end; i++ {
+			objectCh <- NewLedgerMetaArchive(key, i, i)
+		}
+		<-time.After(time.Second * 2)
+		close(objectCh)
+	}()
+
+	dataUploader := uploader{destination: &s.mockDataStore, metaArchiveCh: objectCh}
+	err := dataUploader.Run(context.Background())
+
+	assert.EqualError(s.T(), err, "Meta archive channel closed")
+}
+
+func (s *UploaderSuite) TestRunContextCancel() {
+	objectCh := make(chan *LedgerMetaArchive, 1)
+	s.mockDataStore.On("PutFileIfNotExists", mock.Anything, mock.Anything).Return(nil)
+	ctx, cancel := context.WithCancel(context.Background())
+
+	go func() {
+		for {
+			objectCh <- NewLedgerMetaArchive("test", 1, 1)
+		}
+	}()
+
+	go func() {
+		<-time.After(time.Second * 2)
+		cancel()
+	}()
+
+	dataUploader := uploader{destination: &s.mockDataStore, metaArchiveCh: objectCh}
+	err := dataUploader.Run(ctx)
+
+	assert.EqualError(s.T(), err, "context canceled")
+}
+
+func (s *UploaderSuite) TestRunUploadError() {
+	objectCh := make(chan *LedgerMetaArchive, 10)
+	objectCh <- NewLedgerMetaArchive("test", 1, 1)
+
+	s.mockDataStore.On("PutFileIfNotExists", "test",
+		mock.Anything).Return(errors.New("Put error"))
+
+	dataUploader := uploader{destination: &s.mockDataStore, metaArchiveCh: objectCh}
+	err := dataUploader.Run(context.Background())
+	assert.Equal(s.T(), "error uploading test: Put error", err.Error())
+}