Skip to content

Commit

Permalink
support/datastore: Make resumability robust to unexpected overlaps in…
Browse files Browse the repository at this point in the history
… adjacent ranges (stellar#5326)
  • Loading branch information
tamirms authored May 30, 2024
1 parent e86ca9f commit 083b7bb
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 70 deletions.
191 changes: 121 additions & 70 deletions support/datastore/resumablemanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ import (
"testing"

"github.com/pkg/errors"
"github.com/stellar/go/historyarchive"
"github.com/stretchr/testify/require"

"github.com/stellar/go/historyarchive"
)

func TestResumability(t *testing.T) {

ctx := context.Background()
tests := []struct {
name string
startLedger uint32
Expand All @@ -22,6 +23,7 @@ func TestResumability(t *testing.T) {
latestLedger uint32
errorSnippet string
archiveError error
registerMockCalls func(*MockDataStore)
}{
{
name: "archive error when resolving network latest",
Expand All @@ -33,9 +35,10 @@ func TestResumability(t *testing.T) {
FilesPerPartition: uint32(1),
LedgersPerFile: uint32(10),
},
networkName: "test",
errorSnippet: "archive error",
archiveError: errors.New("archive error"),
networkName: "test",
errorSnippet: "archive error",
archiveError: errors.New("archive error"),
registerMockCalls: func(store *MockDataStore) {},
},
{
name: "End ledger same as start, data store has it",
Expand All @@ -48,6 +51,9 @@ func TestResumability(t *testing.T) {
LedgersPerFile: uint32(10),
},
networkName: "test",
registerMockCalls: func(mockDataStore *MockDataStore) {
mockDataStore.On("Exists", ctx, "FFFFFFFF--0-9.xdr.zstd").Return(true, nil).Once()
},
},
{
name: "End ledger same as start, data store does not have it",
Expand All @@ -60,6 +66,55 @@ func TestResumability(t *testing.T) {
LedgersPerFile: uint32(10),
},
networkName: "test",
registerMockCalls: func(mockDataStore *MockDataStore) {
mockDataStore.On("Exists", ctx, "FFFFFFF5--10-19.xdr.zstd").Return(false, nil).Twice()
},
},
{
name: "start and end ledger are in same file, data store does not have it",
startLedger: 64,
endLedger: 68,
absentLedger: 64,
findStartOk: true,
ledgerBatchConfig: LedgerBatchConfig{
FilesPerPartition: uint32(100),
LedgersPerFile: uint32(64),
},
networkName: "test",
registerMockCalls: func(mockDataStore *MockDataStore) {
mockDataStore.On("Exists", ctx, "FFFFFFFF--0-6399/FFFFFFBF--64-127.xdr.zstd").Return(false, nil).Twice()
},
},
{
name: "start and end ledger are in same file, data store has it",
startLedger: 128,
endLedger: 130,
absentLedger: 0,
findStartOk: false,
ledgerBatchConfig: LedgerBatchConfig{
FilesPerPartition: uint32(100),
LedgersPerFile: uint32(64),
},
networkName: "test",
registerMockCalls: func(mockDataStore *MockDataStore) {
mockDataStore.On("Exists", ctx, "FFFFFFFF--0-6399/FFFFFF7F--128-191.xdr.zstd").Return(true, nil).Once()
},
},
{
name: "ledger range overlaps with a range which is already exported",
startLedger: 2,
endLedger: 127,
absentLedger: 2,
findStartOk: true,
ledgerBatchConfig: LedgerBatchConfig{
FilesPerPartition: uint32(100),
LedgersPerFile: uint32(64),
},
networkName: "test",
registerMockCalls: func(mockDataStore *MockDataStore) {
mockDataStore.On("Exists", ctx, "FFFFFFFF--0-6399/FFFFFFBF--64-127.xdr.zstd").Return(true, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFFFFF--0-6399/FFFFFFFF--0-63.xdr.zstd").Return(false, nil).Once()
},
},
{
name: "binary search encounters an error during datastore retrieval",
Expand All @@ -73,6 +128,9 @@ func TestResumability(t *testing.T) {
},
networkName: "test",
errorSnippet: "datastore error happened",
registerMockCalls: func(mockDataStore *MockDataStore) {
mockDataStore.On("Exists", ctx, "FFFFFFEB--20-29.xdr.zstd").Return(false, errors.New("datastore error happened")).Once()
},
},
{
name: "Data store is beyond boundary aligned start ledger",
Expand All @@ -85,6 +143,11 @@ func TestResumability(t *testing.T) {
LedgersPerFile: uint32(10),
},
networkName: "test",
registerMockCalls: func(mockDataStore *MockDataStore) {
mockDataStore.On("Exists", ctx, "FFFFFFCD--50-59.xdr.zstd").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFFFE1--30-39.xdr.zstd").Return(true, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFFFD7--40-49.xdr.zstd").Return(false, nil).Once()
},
},
{
name: "Data store is beyond non boundary aligned start ledger",
Expand All @@ -97,6 +160,10 @@ func TestResumability(t *testing.T) {
LedgersPerFile: uint32(10),
},
networkName: "test",
registerMockCalls: func(mockDataStore *MockDataStore) {
mockDataStore.On("Exists", ctx, "FFFFFFB9--70-79.xdr.zstd").Return(true, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFFFAF--80-89.xdr.zstd").Return(false, nil).Twice()
},
},
{
name: "Data store is beyond start and end ledger",
Expand All @@ -109,6 +176,10 @@ func TestResumability(t *testing.T) {
LedgersPerFile: uint32(10),
},
networkName: "test",
registerMockCalls: func(mockDataStore *MockDataStore) {
mockDataStore.On("Exists", ctx, "FFFFFEFB--260-269.xdr.zstd").Return(true, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFFEF1--270-279.xdr.zstd").Return(true, nil).Once()
},
},
{
name: "Data store is not beyond start ledger",
Expand All @@ -121,6 +192,12 @@ func TestResumability(t *testing.T) {
LedgersPerFile: uint32(10),
},
networkName: "test",
registerMockCalls: func(mockDataStore *MockDataStore) {
mockDataStore.On("Exists", ctx, "FFFFFF87--120-129.xdr.zstd").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFFF91--110-119.xdr.zstd").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFFF9B--100-109.xdr.zstd").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFFFA5--90-99.xdr.zstd").Return(false, nil).Once()
},
},
{
name: "No start ledger provided",
Expand All @@ -132,8 +209,9 @@ func TestResumability(t *testing.T) {
FilesPerPartition: uint32(1),
LedgersPerFile: uint32(10),
},
networkName: "test",
errorSnippet: "Invalid start value",
networkName: "test",
errorSnippet: "Invalid start value",
registerMockCalls: func(store *MockDataStore) {},
},
{
name: "No end ledger provided, data store not beyond start",
Expand All @@ -147,6 +225,16 @@ func TestResumability(t *testing.T) {
},
networkName: "test2",
latestLedger: uint32(2000),
registerMockCalls: func(mockDataStore *MockDataStore) {
mockDataStore.On("Exists", ctx, "FFFFF9A1--1630-1639.xdr.zstd").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFFA91--1390-1399.xdr.zstd").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFFB13--1260-1269.xdr.zstd").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFFB4F--1200-1209.xdr.zstd").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFFB77--1160-1169.xdr.zstd").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFFB6D--1170-1179.xdr.zstd").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFFB81--1150-1159.xdr.zstd").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFFB8B--1140-1149.xdr.zstd").Return(false, nil).Once()
},
},
{
name: "No end ledger provided, data store is beyond start",
Expand All @@ -160,6 +248,15 @@ func TestResumability(t *testing.T) {
},
networkName: "test3",
latestLedger: uint32(3000),
registerMockCalls: func(mockDataStore *MockDataStore) {
mockDataStore.On("Exists", ctx, "FFFFF5B9--2630-2639.xdr.zstd").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFF6A9--2390-2399.xdr.zstd").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFF72B--2260-2269.xdr.zstd").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFF735--2250-2259.xdr.zstd").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFF73F--2240-2249.xdr.zstd").Return(true, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFF749--2230-2239.xdr.zstd").Return(true, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFF767--2200-2209.xdr.zstd").Return(true, nil).Once()
},
},
{
name: "No end ledger provided, data store is beyond start and archive network latest, and partially into checkpoint frequency padding",
Expand All @@ -173,6 +270,15 @@ func TestResumability(t *testing.T) {
},
networkName: "test4",
latestLedger: uint32(4000),
registerMockCalls: func(mockDataStore *MockDataStore) {
mockDataStore.On("Exists", ctx, "FFFFF1D1--3630-3639.xdr.zstd").Return(true, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFF0D7--3880-3889.xdr.zstd").Return(true, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFF05F--4000-4009.xdr.zstd").Return(true, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFF023--4060-4069.xdr.zstd").Return(true, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFF005--4090-4099.xdr.zstd").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFF00F--4080-4089.xdr.zstd").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFF019--4070-4079.xdr.zstd").Return(false, nil).Once()
},
},
{
name: "No end ledger provided, start is beyond archive network latest and checkpoint frequency padding",
Expand All @@ -184,75 +290,20 @@ func TestResumability(t *testing.T) {
FilesPerPartition: uint32(1),
LedgersPerFile: uint32(10),
},
networkName: "test5",
latestLedger: uint32(5000),
errorSnippet: "Invalid start value of 5129, it is greater than network's latest ledger of 5128",
networkName: "test5",
latestLedger: uint32(5000),
errorSnippet: "Invalid start value of 5129, it is greater than network's latest ledger of 5128",
registerMockCalls: func(store *MockDataStore) {},
},
}

ctx := context.Background()

mockDataStore := &MockDataStore{}

//"End ledger same as start, data store has it"
mockDataStore.On("Exists", ctx, "FFFFFFFF--0-9.xdr.zstd").Return(true, nil).Once()

//"End ledger same as start, data store does not have it"
mockDataStore.On("Exists", ctx, "FFFFFFF5--10-19.xdr.zstd").Return(false, nil).Once()

//"binary search encounters an error during datastore retrieval",
mockDataStore.On("Exists", ctx, "FFFFFFEB--20-29.xdr.zstd").Return(false, errors.New("datastore error happened")).Once()

//"Data store is beyond boundary aligned start ledger"
mockDataStore.On("Exists", ctx, "FFFFFFE1--30-39.xdr.zstd").Return(true, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFFFD7--40-49.xdr.zstd").Return(false, nil).Once()

//"Data store is beyond non boundary aligned start ledger"
mockDataStore.On("Exists", ctx, "FFFFFFB9--70-79.xdr.zstd").Return(true, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFFFAF--80-89.xdr.zstd").Return(false, nil).Once()

//"Data store is beyond start and end ledger"
mockDataStore.On("Exists", ctx, "FFFFFEFB--260-269.xdr.zstd").Return(true, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFFEF1--270-279.xdr.zstd").Return(true, nil).Once()

//"Data store is not beyond start ledger"
mockDataStore.On("Exists", ctx, "FFFFFF91--110-119.xdr.zstd").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFFF9B--100-109.xdr.zstd").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFFFA5--90-99.xdr.zstd").Return(false, nil).Once()

//"No end ledger provided, data store not beyond start" uses latest from network="test2"
mockDataStore.On("Exists", ctx, "FFFFF9A1--1630-1639.xdr.zstd").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFFA91--1390-1399.xdr.zstd").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFFB13--1260-1269.xdr.zstd").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFFB4F--1200-1209.xdr.zstd").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFFB77--1160-1169.xdr.zstd").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFFB6D--1170-1179.xdr.zstd").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFFB81--1150-1159.xdr.zstd").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFFB8B--1140-1149.xdr.zstd").Return(false, nil).Once()

//"No end ledger provided, data store is beyond start" uses latest from network="test3"
mockDataStore.On("Exists", ctx, "FFFFF5B9--2630-2639.xdr.zstd").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFF6A9--2390-2399.xdr.zstd").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFF72B--2260-2269.xdr.zstd").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFF735--2250-2259.xdr.zstd").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFF73F--2240-2249.xdr.zstd").Return(true, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFF749--2230-2239.xdr.zstd").Return(true, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFF767--2200-2209.xdr.zstd").Return(true, nil).Once()

//"No end ledger provided, data store is beyond start and archive network latest, and partially into checkpoint frequency padding" uses latest from network="test4"
mockDataStore.On("Exists", ctx, "FFFFF1D1--3630-3639.xdr.zstd").Return(true, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFF0D7--3880-3889.xdr.zstd").Return(true, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFF05F--4000-4009.xdr.zstd").Return(true, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFF023--4060-4069.xdr.zstd").Return(true, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFF005--4090-4099.xdr.zstd").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFF00F--4080-4089.xdr.zstd").Return(false, nil).Once()
mockDataStore.On("Exists", ctx, "FFFFF019--4070-4079.xdr.zstd").Return(false, nil).Once()

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mockArchive := &historyarchive.MockArchive{}
mockArchive.On("GetRootHAS").Return(historyarchive.HistoryArchiveState{CurrentLedger: tt.latestLedger}, tt.archiveError).Once()

mockDataStore := &MockDataStore{}
tt.registerMockCalls(mockDataStore)

resumableManager := NewResumableManager(mockDataStore, tt.networkName, tt.ledgerBatchConfig, mockArchive)
absentLedger, ok, err := resumableManager.FindStart(ctx, tt.startLedger, tt.endLedger)
if tt.errorSnippet != "" {
Expand All @@ -266,8 +317,8 @@ func TestResumability(t *testing.T) {
// archives are only expected to be called when end = 0
mockArchive.AssertExpectations(t)
}
mockDataStore.AssertExpectations(t)
})
}

mockDataStore.AssertExpectations(t)
}
24 changes: 24 additions & 0 deletions support/datastore/resumeablemanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sort"

"github.com/pkg/errors"

"github.com/stellar/go/historyarchive"
"github.com/stellar/go/support/log"
)
Expand Down Expand Up @@ -77,6 +78,29 @@ func (rm resumableManagerService) FindStart(ctx context.Context, start, end uint
return 0, false, errors.Errorf("Invalid start value of %v, it is greater than network's latest ledger of %v", start, networkLatest)
}
end = networkLatest
} else if end >= rm.ledgerBatchConfig.LedgersPerFile {
// Adjacent ranges may end up overlapping due to the clamping behavior in adjustLedgerRange()
// https://github.com/stellar/go/blob/fff01229a5af77dee170a37bf0c71b2ce8bb8474/exp/services/ledgerexporter/internal/config.go#L173-L192
// For example, assuming 64 ledgers per file, [2, 100] and [101, 150] get adjusted to [2, 127] and [64, 191]
// If we export [64, 191] and then try to resume on [2, 127], the binary search logic will determine that
// [2, 127] is fully exported because the midpoint of [2, 127] is present.
// To fix this issue we query the end ledger and if it is present, we only do the binary search on the
// preceding sub range. This will allow resumability to work on adjacent ranges that end up overlapping
// due to adjustLedgerRange().
// Note that if there is an overlap the size of the overlap will never be larger than the number of files
// per partition and that is why it is sufficient to only check if the end ledger is present.
exists, err := rm.dataStore.Exists(ctx, rm.ledgerBatchConfig.GetObjectKeyFromSequenceNumber(end))
if err != nil {
return 0, false, err
}
if exists {
end -= rm.ledgerBatchConfig.LedgersPerFile
if start > end {
// data store had all ledgers for requested range, no resumability needed.
log.Infof("Resumability found no absent object keys in requested ledger range")
return 0, false, nil
}
}
}

rangeSize := max(int(end-start), 1)
Expand Down

0 comments on commit 083b7bb

Please sign in to comment.