From 9a3a6b724a66e715595c55764b168c3c0122298c Mon Sep 17 00:00:00 2001 From: Urvi Date: Thu, 27 Jun 2024 14:50:20 -0700 Subject: [PATCH] support/storage: Add GetLatestLedgerSequence method to Archive interface --- .../ledgerexporter/internal/config.go | 6 +-- .../ledgerexporter/internal/config_test.go | 28 ++++++---- historyarchive/archive.go | 11 ++++ historyarchive/archive_pool.go | 10 ++++ historyarchive/mocks.go | 5 ++ support/datastore/history_archive.go | 53 ------------------- support/datastore/resumablemanager_test.go | 8 ++- support/datastore/resumeablemanager.go | 4 +- 8 files changed, 56 insertions(+), 69 deletions(-) delete mode 100644 support/datastore/history_archive.go diff --git a/exp/services/ledgerexporter/internal/config.go b/exp/services/ledgerexporter/internal/config.go index d5aad53256..013a3ef8d7 100644 --- a/exp/services/ledgerexporter/internal/config.go +++ b/exp/services/ledgerexporter/internal/config.go @@ -124,8 +124,8 @@ func (config *Config) ValidateAndSetLedgerRange(ctx context.Context, archive his return errors.New("invalid end value, must be greater than start") } - latestNetworkLedger, err := datastore.GetLatestLedgerSequenceFromHistoryArchives(archive) - latestNetworkLedger = latestNetworkLedger + (datastore.GetHistoryArchivesCheckPointFrequency() * 2) + latestNetworkLedger, err := archive.GetLatestLedgerSequence() + latestNetworkLedger = latestNetworkLedger + (archive.GetCheckpointManager().GetCheckpointFrequency() * 2) if err != nil { return errors.Wrap(err, "Failed to retrieve the latest ledger sequence from history archives.") @@ -189,7 +189,7 @@ func (config *Config) GenerateCaptiveCoreConfig(coreBinFromPath string) (ledgerb BinaryPath: config.StellarCoreConfig.StellarCoreBinaryPath, NetworkPassphrase: params.NetworkPassphrase, HistoryArchiveURLs: params.HistoryArchiveURLs, - CheckpointFrequency: datastore.GetHistoryArchivesCheckPointFrequency(), + CheckpointFrequency: historyarchive.DefaultCheckpointFrequency, Log: logger.WithField("subservice", "stellar-core"), Toml: captiveCoreToml, UserAgent: "ledger-exporter", diff --git a/exp/services/ledgerexporter/internal/config_test.go b/exp/services/ledgerexporter/internal/config_test.go index f782de5ea4..d1c24cb198 100644 --- a/exp/services/ledgerexporter/internal/config_test.go +++ b/exp/services/ledgerexporter/internal/config_test.go @@ -5,19 +5,20 @@ import ( "fmt" "testing" + "github.com/stellar/go/historyarchive" "github.com/stellar/go/network" - "github.com/stellar/go/support/datastore" "github.com/stretchr/testify/require" - - "github.com/stellar/go/historyarchive" ) func TestNewConfig(t *testing.T) { ctx := context.Background() mockArchive := &historyarchive.MockArchive{} - mockArchive.On("GetRootHAS").Return(historyarchive.HistoryArchiveState{CurrentLedger: 5}, nil).Once() + mockArchive.On("GetLatestLedgerSequence").Return(uint32(5), nil).Once() + mockArchive.On("GetCheckpointManager"). + Return(historyarchive.NewCheckpointManager( + historyarchive.DefaultCheckpointFrequency)).Once() config, err := NewConfig( RuntimeSettings{StartLedger: 2, EndLedger: 3, ConfigFilePath: "test/test.toml", Mode: Append}, nil) @@ -198,7 +199,7 @@ func TestInvalidCaptiveCoreTomlPath(t *testing.T) { func TestValidateStartAndEndLedger(t *testing.T) { latestNetworkLedger := uint32(20000) - latestNetworkLedgerPadding := datastore.GetHistoryArchivesCheckPointFrequency() * 2 + latestNetworkLedgerPadding := historyarchive.DefaultCheckpointFrequency * 2 tests := []struct { name string @@ -282,7 +283,10 @@ func TestValidateStartAndEndLedger(t *testing.T) { ctx := context.Background() mockArchive := &historyarchive.MockArchive{} - mockArchive.On("GetRootHAS").Return(historyarchive.HistoryArchiveState{CurrentLedger: latestNetworkLedger}, nil) + mockArchive.On("GetLatestLedgerSequence").Return(latestNetworkLedger, nil) + mockArchive.On("GetCheckpointManager"). + Return(historyarchive.NewCheckpointManager( + historyarchive.DefaultCheckpointFrequency)) mockedHasCtr := 0 for _, tt := range tests { @@ -302,7 +306,7 @@ func TestValidateStartAndEndLedger(t *testing.T) { } }) } - mockArchive.AssertNumberOfCalls(t, "GetRootHAS", mockedHasCtr) + mockArchive.AssertExpectations(t) } func TestAdjustedLedgerRangeBoundedMode(t *testing.T) { @@ -358,7 +362,10 @@ func TestAdjustedLedgerRangeBoundedMode(t *testing.T) { ctx := context.Background() mockArchive := &historyarchive.MockArchive{} - mockArchive.On("GetRootHAS").Return(historyarchive.HistoryArchiveState{CurrentLedger: 500}, nil).Times(len(tests)) + mockArchive.On("GetLatestLedgerSequence").Return(uint32(500), nil) + mockArchive.On("GetCheckpointManager"). + Return(historyarchive.NewCheckpointManager( + historyarchive.DefaultCheckpointFrequency)) for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -421,7 +428,10 @@ func TestAdjustedLedgerRangeUnBoundedMode(t *testing.T) { ctx := context.Background() mockArchive := &historyarchive.MockArchive{} - mockArchive.On("GetRootHAS").Return(historyarchive.HistoryArchiveState{CurrentLedger: 500}, nil).Times(len(tests)) + mockArchive.On("GetLatestLedgerSequence").Return(uint32(500), nil) + mockArchive.On("GetCheckpointManager"). + Return(historyarchive.NewCheckpointManager( + historyarchive.DefaultCheckpointFrequency)) for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/historyarchive/archive.go b/historyarchive/archive.go index 4f9e14380f..d97471b42f 100644 --- a/historyarchive/archive.go +++ b/historyarchive/archive.go @@ -71,6 +71,7 @@ type ArchiveInterface interface { GetLedgerHeader(chk uint32) (xdr.LedgerHeaderHistoryEntry, error) GetRootHAS() (HistoryArchiveState, error) GetLedgers(start, end uint32) (map[uint32]*Ledger, error) + GetLatestLedgerSequence() (uint32, error) GetCheckpointHAS(chk uint32) (HistoryArchiveState, error) PutCheckpointHAS(chk uint32, has HistoryArchiveState, opts *CommandOptions) error PutRootHAS(has HistoryArchiveState, opts *CommandOptions) error @@ -176,6 +177,16 @@ func (a *Archive) PutPathHAS(path string, has HistoryArchiveState, opts *Command return a.backend.PutFile(path, io.NopCloser(bytes.NewReader(buf))) } +func (a *Archive) GetLatestLedgerSequence() (uint32, error) { + has, err := a.GetRootHAS() + if err != nil { + log.Error("Error getting root HAS from archive", err) + return 0, errors.Wrap(err, "failed to retrieve the latest ledger sequence from history archive") + } + + return has.CurrentLedger, nil +} + func (a *Archive) BucketExists(bucket Hash) (bool, error) { return a.cachedExists(BucketPath(bucket)) } diff --git a/historyarchive/archive_pool.go b/historyarchive/archive_pool.go index 48178ade26..28967d8aa6 100644 --- a/historyarchive/archive_pool.go +++ b/historyarchive/archive_pool.go @@ -204,6 +204,16 @@ func (pa *ArchivePool) GetCheckpointHAS(chk uint32) (HistoryArchiveState, error) }) } +func (pa *ArchivePool) GetLatestLedgerSequence() (uint32, error) { + has, err := pa.GetRootHAS() + if err != nil { + log.Error("Error getting root HAS from archive", err) + return 0, errors.Wrap(err, "failed to retrieve the latest ledger sequence from history archive") + } + + return has.CurrentLedger, nil +} + func (pa *ArchivePool) PutCheckpointHAS(chk uint32, has HistoryArchiveState, opts *CommandOptions) error { return pa.runRoundRobin(func(ai ArchiveInterface) error { return ai.PutCheckpointHAS(chk, has, opts) diff --git a/historyarchive/mocks.go b/historyarchive/mocks.go index fa5716e5de..efe333cd33 100644 --- a/historyarchive/mocks.go +++ b/historyarchive/mocks.go @@ -10,6 +10,11 @@ type MockArchive struct { mock.Mock } +func (m *MockArchive) GetLatestLedgerSequence() (uint32, error) { + a := m.Called() + return a.Get(0).(uint32), a.Error(1) +} + func (m *MockArchive) GetCheckpointManager() CheckpointManager { a := m.Called() return a.Get(0).(CheckpointManager) diff --git a/support/datastore/history_archive.go b/support/datastore/history_archive.go deleted file mode 100644 index 9fd291bac7..0000000000 --- a/support/datastore/history_archive.go +++ /dev/null @@ -1,53 +0,0 @@ -package datastore - -import ( - "context" - - log "github.com/sirupsen/logrus" - - "github.com/stellar/go/historyarchive" - "github.com/stellar/go/network" - "github.com/stellar/go/support/errors" - supportlog "github.com/stellar/go/support/log" - "github.com/stellar/go/support/storage" -) - -const ( - Pubnet = "pubnet" - Testnet = "testnet" -) - -func CreateHistoryArchiveFromNetworkName(ctx context.Context, networkName string, userAgent string, logger *supportlog.Entry) (historyarchive.ArchiveInterface, error) { - var historyArchiveUrls []string - switch networkName { - case Pubnet: - historyArchiveUrls = network.PublicNetworkhistoryArchiveURLs - case Testnet: - historyArchiveUrls = network.TestNetworkhistoryArchiveURLs - default: - return nil, errors.Errorf("Invalid network name %s", networkName) - } - - return historyarchive.NewArchivePool(historyArchiveUrls, historyarchive.ArchiveOptions{ - Logger: logger, - ConnectOptions: storage.ConnectOptions{ - UserAgent: userAgent, - Context: ctx, - }, - }) -} - -func GetLatestLedgerSequenceFromHistoryArchives(archive historyarchive.ArchiveInterface) (uint32, error) { - has, err := archive.GetRootHAS() - if err != nil { - log.Error("Error getting root HAS from archives", err) - return 0, errors.Wrap(err, "failed to retrieve the latest ledger sequence from any history archive") - } - - return has.CurrentLedger, nil -} - -func GetHistoryArchivesCheckPointFrequency() uint32 { - // this could evolve to use other sources for checkpoint freq - return historyarchive.DefaultCheckpointFrequency -} diff --git a/support/datastore/resumablemanager_test.go b/support/datastore/resumablemanager_test.go index 4616f9e4ae..4fc8738b08 100644 --- a/support/datastore/resumablemanager_test.go +++ b/support/datastore/resumablemanager_test.go @@ -282,8 +282,12 @@ func TestResumability(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { mockArchive := &historyarchive.MockArchive{} - mockArchive.On("GetRootHAS").Return(historyarchive.HistoryArchiveState{CurrentLedger: tt.latestLedger}, tt.archiveError).Once() - + mockArchive.On("GetLatestLedgerSequence").Return(tt.latestLedger, tt.archiveError).Once() + if tt.archiveError == nil { + mockArchive.On("GetCheckpointManager"). + Return(historyarchive.NewCheckpointManager( + historyarchive.DefaultCheckpointFrequency)).Once() + } mockDataStore := &MockDataStore{} tt.registerMockCalls(mockDataStore) diff --git a/support/datastore/resumeablemanager.go b/support/datastore/resumeablemanager.go index 35031d73f6..7e6b03df99 100644 --- a/support/datastore/resumeablemanager.go +++ b/support/datastore/resumeablemanager.go @@ -62,12 +62,12 @@ func (rm resumableManagerService) FindStart(ctx context.Context, start, end uint networkLatest := uint32(0) if end < 1 { var latestErr error - networkLatest, latestErr = GetLatestLedgerSequenceFromHistoryArchives(rm.archive) + networkLatest, latestErr = rm.archive.GetLatestLedgerSequence() if latestErr != nil { err := errors.Wrap(latestErr, "Resumability of requested export ledger range, was not able to get latest ledger from network") return 0, false, err } - networkLatest = networkLatest + (GetHistoryArchivesCheckPointFrequency() * 2) + networkLatest = networkLatest + (rm.archive.GetCheckpointManager().GetCheckpointFrequency() * 2) log.Infof("Resumability computed effective latest network ledger including padding of checkpoint frequency to be %d", networkLatest) if start > networkLatest {