diff --git a/exp/services/ledgerexporter/internal/exportmanager.go b/exp/services/ledgerexporter/internal/exportmanager.go index 4e2c2eda59..5d01c943d5 100644 --- a/exp/services/ledgerexporter/internal/exportmanager.go +++ b/exp/services/ledgerexporter/internal/exportmanager.go @@ -12,20 +12,41 @@ import ( // LedgerCloseMetaObject represents a file with metadata and binary data. type LedgerCloseMetaObject struct { // file name - objectKey string - startSequence uint32 - endSequence uint32 + objectKey string // Actual binary data data xdr.LedgerCloseMetaBatch } +func NewLedgerCloseMetaObject(key string, startSeq uint32, endSeq uint32) *LedgerCloseMetaObject { + return &LedgerCloseMetaObject{ + objectKey: key, + data: xdr.LedgerCloseMetaBatch{ + StartSequence: xdr.Uint32(startSeq), + EndSequence: xdr.Uint32(endSeq), + }, + } +} + +func (f *LedgerCloseMetaObject) GetLastLedgerCloseMetaSequence() (uint32, error) { + if len(f.data.LedgerCloseMetas) == 0 { + return 0, errors.New("LedgerCloseMetas is empty") + } + + return f.data.LedgerCloseMetas[len(f.data.LedgerCloseMetas)-1].LedgerSequence(), nil +} + // AddLedgerCloseMeta adds a ledger -func (f *LedgerCloseMetaObject) AddLedgerCloseMeta(ledgerCloseMeta xdr.LedgerCloseMeta) { - if f.startSequence == 0 { - f.data.StartSequence = xdr.Uint32(ledgerCloseMeta.LedgerSequence()) +func (f *LedgerCloseMetaObject) AddLedgerCloseMeta(ledgerCloseMeta xdr.LedgerCloseMeta) error { + lastSequence, err := f.GetLastLedgerCloseMetaSequence() + if err == nil { + if ledgerCloseMeta.LedgerSequence() != lastSequence+1 { + return fmt.Errorf("ledgers must be added sequentially. Sequence number: %d, "+ + "expected sequence number: %d", ledgerCloseMeta.LedgerSequence(), lastSequence+1) + } } + f.data.LedgerCloseMetas = append(f.data.LedgerCloseMetas, ledgerCloseMeta) - f.data.EndSequence = xdr.Uint32(ledgerCloseMeta.LedgerSequence()) + return nil } // LedgerCount returns the number of ledgers added so far @@ -106,29 +127,28 @@ func (e *exportManager) AddLedgerCloseMeta(ledgerCloseMeta xdr.LedgerCloseMeta) if !exists { // Create a new LedgerCloseMetaObject and add it to the map. - ledgerCloseMetaObject = &LedgerCloseMetaObject{ - objectKey: objectKey, - startSequence: ledgerSeq, - endSequence: ledgerSeq + e.config.LedgersPerFile - 1, - } + ledgerCloseMetaObject = NewLedgerCloseMetaObject(objectKey, ledgerSeq, + ledgerSeq+e.config.LedgersPerFile-1) // Special case: Adjust the end ledger sequence for the first batch. // Since the start ledger is 2 instead of 0, we want to ensure that the end ledger sequence // does not exceed LedgersPerFile. // For example, if LedgersPerFile is 64, the file name for the first batch should be 0-63, not 2-66. if ledgerSeq < e.config.LedgersPerFile { - ledgerCloseMetaObject.endSequence = e.config.LedgersPerFile - 1 + ledgerCloseMetaObject.data.EndSequence = xdr.Uint32(e.config.LedgersPerFile - 1) } e.objectMap[objectKey] = ledgerCloseMetaObject } // Add ledger to the LedgerCloseMetaObject - ledgerCloseMetaObject.AddLedgerCloseMeta(ledgerCloseMeta) + if err := ledgerCloseMetaObject.AddLedgerCloseMeta(ledgerCloseMeta); err != nil { + return errors.Wrapf(err, "failed to add ledger to LedgerCloseMetaObject") + } //logger.Logf("ledger Seq: %d object: %s ledgercount: %d ledgersperfile: %d", ledgerSeq, - if ledgerSeq >= ledgerCloseMetaObject.endSequence { + if ledgerSeq >= uint32(ledgerCloseMetaObject.data.EndSequence) { // Current export object is full, send it for upload // This is a blocking call! e.exportObjectCh <- ledgerCloseMetaObject @@ -160,7 +180,7 @@ func (e *exportManager) Run(ctx context.Context, startLedger, endLedger uint32) //time.Sleep(time.Duration(1) * time.Second) err = e.AddLedgerCloseMeta(ledgerCloseMeta) if err != nil { - return errors.Wrapf(err, "failed to fetch ledger %d", nextLedger) + return errors.Wrapf(err, "failed to add ledger %d", nextLedger) } nextLedger++ } diff --git a/exp/services/ledgerexporter/internal/exportmanager_test.go b/exp/services/ledgerexporter/internal/exportmanager_test.go index e520d5cbad..b13c35c9b8 100644 --- a/exp/services/ledgerexporter/internal/exportmanager_test.go +++ b/exp/services/ledgerexporter/internal/exportmanager_test.go @@ -2,6 +2,7 @@ package exporter import ( "context" + "fmt" "sync" "testing" @@ -44,9 +45,7 @@ func (s *ExportManagerSuite) TestRun() { V0: &xdr.LedgerCloseMetaV0{ LedgerHeader: xdr.LedgerHeaderHistoryEntry{ Header: xdr.LedgerHeader{ - LedgerSeq: xdr.Uint32(i), - LedgerVersion: xdr.Uint32(20), - BucketListHash: xdr.Hash{1, 2, 3}, + LedgerSeq: xdr.Uint32(i), }, }, }, @@ -96,9 +95,7 @@ func (s *ExportManagerSuite) TestAddLedgerCloseMeta() { V0: &xdr.LedgerCloseMetaV0{ LedgerHeader: xdr.LedgerHeaderHistoryEntry{ Header: xdr.LedgerHeader{ - LedgerSeq: xdr.Uint32(i), - LedgerVersion: xdr.Uint32(20), - BucketListHash: xdr.Hash{1, 2, 3}, + LedgerSeq: xdr.Uint32(i), }, }, }, @@ -111,3 +108,38 @@ func (s *ExportManagerSuite) TestAddLedgerCloseMeta() { wg.Wait() assert.Equal(s.T(), expectedObjectkeys, actualObjectKeys) } + +func (s *ExportManagerSuite) TestAddLedgerCloseMetaSequential() { + config := ExporterConfig{LedgersPerFile: 10, FilesPerPartition: 1} + exporter := NewExportManager(config, &s.mockBackend) + + // Add ledgers sequentially + for i := 1; i <= 5; i++ { + assert.NoError(s.T(), + exporter.AddLedgerCloseMeta(xdr.LedgerCloseMeta{ + V0: &xdr.LedgerCloseMetaV0{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + LedgerSeq: xdr.Uint32(i), + }, + }, + }, + })) + } + + // Add a ledger out of sequence + err := exporter.AddLedgerCloseMeta(xdr.LedgerCloseMeta{ + V0: &xdr.LedgerCloseMetaV0{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + LedgerSeq: xdr.Uint32(7), + }, + }, + }, + }) + + expectedErrMsg := fmt.Sprintf("failed to add ledger to LedgerCloseMetaObject: ledgers must be added sequentially."+ + " Sequence number: %d, expected sequence number: %d", 7, 6) + assert.EqualError(s.T(), err, expectedErrMsg) + +}