Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ledgerexporter: Add lexicographical ordering token to filenames #5305

Merged
merged 4 commits into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 3 additions & 33 deletions exp/services/ledgerexporter/internal/exportmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import (
"context"
"fmt"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -138,43 +137,14 @@
require.EqualError(s.T(), err, "context canceled")
}

func (s *ExportManagerSuite) TestGetObjectKeyFromSequenceNumber() {
chowbao marked this conversation as resolved.
Show resolved Hide resolved
testCases := []struct {
filesPerPartition uint32
ledgerSeq uint32
ledgersPerFile uint32
fileSuffix string
expectedKey string
}{
{0, 5, 1, ".xdr.gz", "5.xdr.gz"},
{0, 5, 10, ".xdr.gz", "0-9.xdr.gz"},
{2, 10, 100, ".xdr.gz", "0-199/0-99.xdr.gz"},
{2, 150, 50, ".xdr.gz", "100-199/150-199.xdr.gz"},
{2, 300, 200, ".xdr.gz", "0-399/200-399.xdr.gz"},
{2, 1, 1, ".xdr.gz", "0-1/1.xdr.gz"},
{4, 10, 100, ".xdr.gz", "0-399/0-99.xdr.gz"},
{4, 250, 50, ".xdr.gz", "200-399/250-299.xdr.gz"},
{1, 300, 200, ".xdr.gz", "200-399.xdr.gz"},
{1, 1, 1, ".xdr.gz", "1.xdr.gz"},
}

for _, tc := range testCases {
s.T().Run(fmt.Sprintf("LedgerSeq-%d-LedgersPerFile-%d", tc.ledgerSeq, tc.ledgersPerFile), func(t *testing.T) {
config := datastore.LedgerBatchConfig{FilesPerPartition: tc.filesPerPartition, LedgersPerFile: tc.ledgersPerFile, FileSuffix: tc.fileSuffix}
key := config.GetObjectKeyFromSequenceNumber(tc.ledgerSeq)
require.Equal(t, tc.expectedKey, key)
})
}
}

func (s *ExportManagerSuite) TestAddLedgerCloseMeta() {
config := datastore.LedgerBatchConfig{LedgersPerFile: 1, FilesPerPartition: 10, FileSuffix: ".xdr.gz"}
registry := prometheus.NewRegistry()
queue := NewUploadQueue(1, registry)
exporter, err := NewExportManager(config, &s.mockBackend, queue, registry)
require.NoError(s.T(), err)

expectedkeys := set.NewSet[string](10)
expectedKeys := set.NewSet[string](10)
actualKeys := set.NewSet[string](10)

wg := sync.WaitGroup{}
Expand All @@ -197,12 +167,12 @@
require.NoError(s.T(), exporter.AddLedgerCloseMeta(context.Background(), datastore.CreateLedgerCloseMeta(i)))

key := config.GetObjectKeyFromSequenceNumber(i)
expectedkeys.Add(key)
expectedKeys.Add(key)
}

queue.Close()
wg.Wait()
require.Equal(s.T(), expectedkeys, actualKeys)
require.Equal(s.T(), expectedKeys, actualKeys)

Check failure on line 175 in exp/services/ledgerexporter/internal/exportmanager_test.go

View workflow job for this annotation

GitHub Actions / golangci

s.T undefined (type *ExportManagerSuite has no field or method T) (typecheck)
}

func (s *ExportManagerSuite) TestAddLedgerCloseMetaContextCancel() {
Expand Down
13 changes: 7 additions & 6 deletions ingest/ledgerbackend/buffered_storage_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"fmt"
"io"
"math"
"os"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -66,10 +67,10 @@ func createMockdataStore(t *testing.T, start, end, partitionSize, count uint32)
if count > 1 {
endFileSeq := i + count - 1
readCloser = createLCMBatchReader(i, endFileSeq, count)
objectName = fmt.Sprintf("0-%d/%d-%d.xdr.gz", partition, i, endFileSeq)
objectName = fmt.Sprintf("FFFFFFFF--0-%d/%08X--%d-%d.xdr.gz", partition, math.MaxUint32-i, i, endFileSeq)
} else {
readCloser = createLCMBatchReader(i, i, count)
objectName = fmt.Sprintf("0-%d/%d.xdr.gz", partition, i)
objectName = fmt.Sprintf("FFFFFFFF--0-%d/%08X--%d.xdr.gz", partition, math.MaxUint32-i, i)
}
mockDataStore.On("GetFile", mock.Anything, objectName).Return(readCloser, nil)
}
Expand Down Expand Up @@ -440,7 +441,7 @@ func TestLedgerBufferClose(t *testing.T) {
mockDataStore := new(datastore.MockDataStore)
partition := ledgerPerFileCount*partitionSize - 1

objectName := fmt.Sprintf("0-%d/%d.xdr.gz", partition, 3)
objectName := fmt.Sprintf("FFFFFFFF--0-%d/%08X--%d.xdr.gz", partition, math.MaxUint32-3, 3)
afterPrepareRange := make(chan struct{})
mockDataStore.On("GetFile", mock.Anything, objectName).Return(io.NopCloser(&bytes.Buffer{}), context.Canceled).Run(func(args mock.Arguments) {
<-afterPrepareRange
Expand Down Expand Up @@ -472,7 +473,7 @@ func TestLedgerBufferBoundedObjectNotFound(t *testing.T) {
mockDataStore := new(datastore.MockDataStore)
partition := ledgerPerFileCount*partitionSize - 1

objectName := fmt.Sprintf("0-%d/%d.xdr.gz", partition, 3)
objectName := fmt.Sprintf("FFFFFFFF--0-%d/%08X--%d.xdr.gz", partition, math.MaxUint32-3, 3)
mockDataStore.On("GetFile", mock.Anything, objectName).Return(io.NopCloser(&bytes.Buffer{}), os.ErrNotExist).Once()
t.Cleanup(func() {
mockDataStore.AssertExpectations(t)
Expand All @@ -498,7 +499,7 @@ func TestLedgerBufferUnboundedObjectNotFound(t *testing.T) {
mockDataStore := new(datastore.MockDataStore)
partition := ledgerPerFileCount*partitionSize - 1

objectName := fmt.Sprintf("0-%d/%d.xdr.gz", partition, 3)
objectName := fmt.Sprintf("FFFFFFFF--0-%d/%08X--%d.xdr.gz", partition, math.MaxUint32-3, 3)
iteration := &atomic.Int32{}
cancelAfter := int32(bsb.config.RetryLimit) + 2
mockDataStore.On("GetFile", mock.Anything, objectName).Return(io.NopCloser(&bytes.Buffer{}), os.ErrNotExist).Run(func(args mock.Arguments) {
Expand Down Expand Up @@ -530,7 +531,7 @@ func TestLedgerBufferRetryLimit(t *testing.T) {
mockDataStore := new(datastore.MockDataStore)
partition := ledgerPerFileCount*partitionSize - 1

objectName := fmt.Sprintf("0-%d/%d.xdr.gz", partition, 3)
objectName := fmt.Sprintf("FFFFFFFF--0-%d/%08X--%d.xdr.gz", partition, math.MaxUint32-3, 3)
mockDataStore.On("GetFile", mock.Anything, objectName).
Return(io.NopCloser(&bytes.Buffer{}), fmt.Errorf("transient error")).
Times(int(bsb.config.RetryLimit) + 1)
Expand Down
6 changes: 4 additions & 2 deletions support/datastore/ledgerbatch_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package datastore

import (
"fmt"
"math"
)

type LedgerBatchConfig struct {
Expand Down Expand Up @@ -29,12 +30,13 @@ func (ec LedgerBatchConfig) GetObjectKeyFromSequenceNumber(ledgerSeq uint32) str
partitionSize := ec.LedgersPerFile * ec.FilesPerPartition
partitionStart := (ledgerSeq / partitionSize) * partitionSize
partitionEnd := partitionStart + partitionSize - 1
objectKey = fmt.Sprintf("%d-%d/", partitionStart, partitionEnd)

objectKey = fmt.Sprintf("%08X--%d-%d/", math.MaxUint32-partitionStart, partitionStart, partitionEnd)
}

fileStart := ec.GetSequenceNumberStartBoundary(ledgerSeq)
fileEnd := ec.GetSequenceNumberEndBoundary(ledgerSeq)
objectKey += fmt.Sprintf("%d", fileStart)
objectKey += fmt.Sprintf("%08X--%d", math.MaxUint32-fileStart, fileStart)

// Multiple ledgers per file
if fileStart != fileEnd {
Expand Down
20 changes: 10 additions & 10 deletions support/datastore/ledgerbatch_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@ func TestGetObjectKeyFromSequenceNumber(t *testing.T) {
fileSuffix string
tamirms marked this conversation as resolved.
Show resolved Hide resolved
expectedKey string
}{
{0, 5, 1, ".xdr.gz", "5.xdr.gz"},
{0, 5, 10, ".xdr.gz", "0-9.xdr.gz"},
{2, 10, 100, ".xdr.gz", "0-199/0-99.xdr.gz"},
{2, 150, 50, ".xdr.gz", "100-199/150-199.xdr.gz"},
{2, 300, 200, ".xdr.gz", "0-399/200-399.xdr.gz"},
{2, 1, 1, ".xdr.gz", "0-1/1.xdr.gz"},
{4, 10, 100, ".xdr.gz", "0-399/0-99.xdr.gz"},
{4, 250, 50, ".xdr.gz", "200-399/250-299.xdr.gz"},
{1, 300, 200, ".xdr.gz", "200-399.xdr.gz"},
{1, 1, 1, ".xdr.gz", "1.xdr.gz"},
{0, 5, 1, ".xdr.gz", "FFFFFFFA--5.xdr.gz"},
{0, 5, 10, ".xdr.gz", "FFFFFFFF--0-9.xdr.gz"},
{2, 10, 100, ".xdr.gz", "FFFFFFFF--0-199/FFFFFFFF--0-99.xdr.gz"},
{2, 150, 50, ".xdr.gz", "FFFFFF9B--100-199/FFFFFF69--150-199.xdr.gz"},
{2, 300, 200, ".xdr.gz", "FFFFFFFF--0-399/FFFFFF37--200-399.xdr.gz"},
{2, 1, 1, ".xdr.gz", "FFFFFFFF--0-1/FFFFFFFE--1.xdr.gz"},
{4, 10, 100, ".xdr.gz", "FFFFFFFF--0-399/FFFFFFFF--0-99.xdr.gz"},
{4, 250, 50, ".xdr.gz", "FFFFFF37--200-399/FFFFFF05--250-299.xdr.gz"},
{1, 300, 200, ".xdr.gz", "FFFFFF37--200-399.xdr.gz"},
{1, 1, 1, ".xdr.gz", "FFFFFFFE--1.xdr.gz"},
}

for _, tc := range testCases {
Expand Down
68 changes: 34 additions & 34 deletions support/datastore/resumablemanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,58 +208,58 @@
mockDataStore := &MockDataStore{}

//"End ledger same as start, data store has it"
mockDataStore.On("Exists", ctx, "0-9.xdr.gz").Return(true, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFFFFF--0-9.xdr.gz").Return(true, nil).Once()

Check failure on line 211 in support/datastore/resumablemanager_test.go

View workflow job for this annotation

GitHub Actions / golangci

mockDataStore.On undefined (type *MockDataStore has no field or method On) (typecheck)

//"End ledger same as start, data store does not have it"
mockDataStore.On("Exists", ctx, "10-19.xdr.gz").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFFFF5--10-19.xdr.gz").Return(false, nil).Once()

Check failure on line 214 in support/datastore/resumablemanager_test.go

View workflow job for this annotation

GitHub Actions / golangci

mockDataStore.On undefined (type *MockDataStore has no field or method On) (typecheck)

//"binary search encounters an error during datastore retrieval",
mockDataStore.On("Exists", ctx, "20-29.xdr.gz").Return(false, errors.New("datastore error happened")).Once()
mockDataStore.On("Exists", ctx, "FFFFFFEB--20-29.xdr.gz").Return(false, errors.New("datastore error happened")).Once()

Check failure on line 217 in support/datastore/resumablemanager_test.go

View workflow job for this annotation

GitHub Actions / golangci

mockDataStore.On undefined (type *MockDataStore has no field or method On) (typecheck)

//"Data store is beyond boundary aligned start ledger"
mockDataStore.On("Exists", ctx, "30-39.xdr.gz").Return(true, nil).Once()
mockDataStore.On("Exists", ctx, "40-49.xdr.gz").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFFFE1--30-39.xdr.gz").Return(true, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFFFD7--40-49.xdr.gz").Return(false, nil).Once()

//"Data store is beyond non boundary aligned start ledger"
mockDataStore.On("Exists", ctx, "70-79.xdr.gz").Return(true, nil).Once()
mockDataStore.On("Exists", ctx, "80-89.xdr.gz").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFFFB9--70-79.xdr.gz").Return(true, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFFFAF--80-89.xdr.gz").Return(false, nil).Once()

//"Data store is beyond start and end ledger"
mockDataStore.On("Exists", ctx, "260-269.xdr.gz").Return(true, nil).Once()
mockDataStore.On("Exists", ctx, "270-279.xdr.gz").Return(true, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFFEFB--260-269.xdr.gz").Return(true, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFFEF1--270-279.xdr.gz").Return(true, nil).Once()

//"Data store is not beyond start ledger"
mockDataStore.On("Exists", ctx, "110-119.xdr.gz").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "100-109.xdr.gz").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "90-99.xdr.gz").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFFF91--110-119.xdr.gz").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFFF9B--100-109.xdr.gz").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFFFA5--90-99.xdr.gz").Return(false, nil).Once()

//"No end ledger provided, data store not beyond start" uses latest from network="test2"
mockDataStore.On("Exists", ctx, "1630-1639.xdr.gz").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "1390-1399.xdr.gz").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "1260-1269.xdr.gz").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "1200-1209.xdr.gz").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "1160-1169.xdr.gz").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "1170-1179.xdr.gz").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "1150-1159.xdr.gz").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "1140-1149.xdr.gz").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFF9A1--1630-1639.xdr.gz").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFFA91--1390-1399.xdr.gz").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFFB13--1260-1269.xdr.gz").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFFB4F--1200-1209.xdr.gz").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFFB77--1160-1169.xdr.gz").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFFB6D--1170-1179.xdr.gz").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFFB81--1150-1159.xdr.gz").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFFB8B--1140-1149.xdr.gz").Return(false, nil).Once()

//"No end ledger provided, data store is beyond start" uses latest from network="test3"
mockDataStore.On("Exists", ctx, "2630-2639.xdr.gz").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "2390-2399.xdr.gz").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "2260-2269.xdr.gz").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "2250-2259.xdr.gz").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "2240-2249.xdr.gz").Return(true, nil).Once()
mockDataStore.On("Exists", ctx, "2230-2239.xdr.gz").Return(true, nil).Once()
mockDataStore.On("Exists", ctx, "2200-2209.xdr.gz").Return(true, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFF5B9--2630-2639.xdr.gz").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFF6A9--2390-2399.xdr.gz").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFF72B--2260-2269.xdr.gz").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFF735--2250-2259.xdr.gz").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFF73F--2240-2249.xdr.gz").Return(true, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFF749--2230-2239.xdr.gz").Return(true, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFF767--2200-2209.xdr.gz").Return(true, nil).Once()

//"No end ledger provided, data store is beyond start and archive network latest, and partially into checkpoint frequency padding" uses latest from network="test4"
mockDataStore.On("Exists", ctx, "3630-3639.xdr.gz").Return(true, nil).Once()
mockDataStore.On("Exists", ctx, "3880-3889.xdr.gz").Return(true, nil).Once()
mockDataStore.On("Exists", ctx, "4000-4009.xdr.gz").Return(true, nil).Once()
mockDataStore.On("Exists", ctx, "4060-4069.xdr.gz").Return(true, nil).Once()
mockDataStore.On("Exists", ctx, "4090-4099.xdr.gz").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "4080-4089.xdr.gz").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "4070-4079.xdr.gz").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFF1D1--3630-3639.xdr.gz").Return(true, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFF0D7--3880-3889.xdr.gz").Return(true, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFF05F--4000-4009.xdr.gz").Return(true, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFF023--4060-4069.xdr.gz").Return(true, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFF005--4090-4099.xdr.gz").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFF00F--4080-4089.xdr.gz").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFF019--4070-4079.xdr.gz").Return(false, nil).Once()

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
Loading