Skip to content

Commit

Permalink
Removing redundant data from the LedgerCloseMetaObject. Adding checks…
Browse files Browse the repository at this point in the history
… for sequential ledger numbers in AddLedgerCloseMeta.
  • Loading branch information
urvisavla committed Jan 19, 2024
1 parent 0718877 commit 79627fe
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 22 deletions.
52 changes: 36 additions & 16 deletions exp/services/ledgerexporter/internal/exportmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,41 @@ import (
// LedgerCloseMetaObject represents a file with metadata and binary data.
type LedgerCloseMetaObject struct {
// file name
objectKey string
startSequence uint32
endSequence uint32
objectKey string
// Actual binary data
data xdr.LedgerCloseMetaBatch
}

func NewLedgerCloseMetaObject(key string, startSeq uint32, endSeq uint32) *LedgerCloseMetaObject {
return &LedgerCloseMetaObject{
objectKey: key,
data: xdr.LedgerCloseMetaBatch{
StartSequence: xdr.Uint32(startSeq),
EndSequence: xdr.Uint32(endSeq),
},
}
}

func (f *LedgerCloseMetaObject) GetLastLedgerCloseMetaSequence() (uint32, error) {
if len(f.data.LedgerCloseMetas) == 0 {
return 0, errors.New("LedgerCloseMetas is empty")
}

return f.data.LedgerCloseMetas[len(f.data.LedgerCloseMetas)-1].LedgerSequence(), nil
}

// AddLedgerCloseMeta adds a ledger
func (f *LedgerCloseMetaObject) AddLedgerCloseMeta(ledgerCloseMeta xdr.LedgerCloseMeta) {
if f.startSequence == 0 {
f.data.StartSequence = xdr.Uint32(ledgerCloseMeta.LedgerSequence())
func (f *LedgerCloseMetaObject) AddLedgerCloseMeta(ledgerCloseMeta xdr.LedgerCloseMeta) error {
lastSequence, err := f.GetLastLedgerCloseMetaSequence()
if err == nil {
if ledgerCloseMeta.LedgerSequence() != lastSequence+1 {
return fmt.Errorf("ledgers must be added sequentially. Sequence number: %d, "+
"expected sequence number: %d", ledgerCloseMeta.LedgerSequence(), lastSequence+1)
}
}

f.data.LedgerCloseMetas = append(f.data.LedgerCloseMetas, ledgerCloseMeta)
f.data.EndSequence = xdr.Uint32(ledgerCloseMeta.LedgerSequence())
return nil
}

// LedgerCount returns the number of ledgers added so far
Expand Down Expand Up @@ -106,29 +127,28 @@ func (e *exportManager) AddLedgerCloseMeta(ledgerCloseMeta xdr.LedgerCloseMeta)

if !exists {
// Create a new LedgerCloseMetaObject and add it to the map.
ledgerCloseMetaObject = &LedgerCloseMetaObject{
objectKey: objectKey,
startSequence: ledgerSeq,
endSequence: ledgerSeq + e.config.LedgersPerFile - 1,
}
ledgerCloseMetaObject = NewLedgerCloseMetaObject(objectKey, ledgerSeq,
ledgerSeq+e.config.LedgersPerFile-1)

// Special case: Adjust the end ledger sequence for the first batch.
// Since the start ledger is 2 instead of 0, we want to ensure that the end ledger sequence
// does not exceed LedgersPerFile.
// For example, if LedgersPerFile is 64, the file name for the first batch should be 0-63, not 2-66.
if ledgerSeq < e.config.LedgersPerFile {
ledgerCloseMetaObject.endSequence = e.config.LedgersPerFile - 1
ledgerCloseMetaObject.data.EndSequence = xdr.Uint32(e.config.LedgersPerFile - 1)
}

e.objectMap[objectKey] = ledgerCloseMetaObject
}

// Add ledger to the LedgerCloseMetaObject
ledgerCloseMetaObject.AddLedgerCloseMeta(ledgerCloseMeta)
if err := ledgerCloseMetaObject.AddLedgerCloseMeta(ledgerCloseMeta); err != nil {
return errors.Wrapf(err, "failed to add ledger to LedgerCloseMetaObject")
}

//logger.Logf("ledger Seq: %d object: %s ledgercount: %d ledgersperfile: %d", ledgerSeq,

if ledgerSeq >= ledgerCloseMetaObject.endSequence {
if ledgerSeq >= uint32(ledgerCloseMetaObject.data.EndSequence) {
// Current export object is full, send it for upload
// This is a blocking call!
e.exportObjectCh <- ledgerCloseMetaObject
Expand Down Expand Up @@ -160,7 +180,7 @@ func (e *exportManager) Run(ctx context.Context, startLedger, endLedger uint32)
//time.Sleep(time.Duration(1) * time.Second)
err = e.AddLedgerCloseMeta(ledgerCloseMeta)
if err != nil {
return errors.Wrapf(err, "failed to fetch ledger %d", nextLedger)
return errors.Wrapf(err, "failed to add ledger %d", nextLedger)
}
nextLedger++
}
Expand Down
44 changes: 38 additions & 6 deletions exp/services/ledgerexporter/internal/exportmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package exporter

import (
"context"
"fmt"
"sync"
"testing"

Expand Down Expand Up @@ -44,9 +45,7 @@ func (s *ExportManagerSuite) TestRun() {
V0: &xdr.LedgerCloseMetaV0{
LedgerHeader: xdr.LedgerHeaderHistoryEntry{
Header: xdr.LedgerHeader{
LedgerSeq: xdr.Uint32(i),
LedgerVersion: xdr.Uint32(20),
BucketListHash: xdr.Hash{1, 2, 3},
LedgerSeq: xdr.Uint32(i),
},
},
},
Expand Down Expand Up @@ -96,9 +95,7 @@ func (s *ExportManagerSuite) TestAddLedgerCloseMeta() {
V0: &xdr.LedgerCloseMetaV0{
LedgerHeader: xdr.LedgerHeaderHistoryEntry{
Header: xdr.LedgerHeader{
LedgerSeq: xdr.Uint32(i),
LedgerVersion: xdr.Uint32(20),
BucketListHash: xdr.Hash{1, 2, 3},
LedgerSeq: xdr.Uint32(i),
},
},
},
Expand All @@ -111,3 +108,38 @@ func (s *ExportManagerSuite) TestAddLedgerCloseMeta() {
wg.Wait()
assert.Equal(s.T(), expectedObjectkeys, actualObjectKeys)
}

func (s *ExportManagerSuite) TestAddLedgerCloseMetaSequential() {
config := ExporterConfig{LedgersPerFile: 10, FilesPerPartition: 1}
exporter := NewExportManager(config, &s.mockBackend)

// Add ledgers sequentially
for i := 1; i <= 5; i++ {
assert.NoError(s.T(),
exporter.AddLedgerCloseMeta(xdr.LedgerCloseMeta{
V0: &xdr.LedgerCloseMetaV0{
LedgerHeader: xdr.LedgerHeaderHistoryEntry{
Header: xdr.LedgerHeader{
LedgerSeq: xdr.Uint32(i),
},
},
},
}))
}

// Add a ledger out of sequence
err := exporter.AddLedgerCloseMeta(xdr.LedgerCloseMeta{
V0: &xdr.LedgerCloseMetaV0{
LedgerHeader: xdr.LedgerHeaderHistoryEntry{
Header: xdr.LedgerHeader{
LedgerSeq: xdr.Uint32(7),
},
},
},
})

expectedErrMsg := fmt.Sprintf("failed to add ledger to LedgerCloseMetaObject: ledgers must be added sequentially."+
" Sequence number: %d, expected sequence number: %d", 7, 6)
assert.EqualError(s.T(), err, expectedErrMsg)

}

0 comments on commit 79627fe

Please sign in to comment.