From cd7bb34d6a03a876cfc5c8ae164f80fc43b808d5 Mon Sep 17 00:00:00 2001 From: Shawn Reuland Date: Tue, 24 Sep 2024 22:05:46 -0700 Subject: [PATCH] #5412: moved the producer fn into new cdp package under ingest --- .../ledgerbackend/buffered_storage_backend.go | 140 --------- .../buffered_storage_backend_test.go | 281 ------------------ ingest/ledgerbackend/range.go | 12 + 3 files changed, 12 insertions(+), 421 deletions(-) diff --git a/ingest/ledgerbackend/buffered_storage_backend.go b/ingest/ledgerbackend/buffered_storage_backend.go index bc54bc8a6f..4bbf05a3c0 100644 --- a/ingest/ledgerbackend/buffered_storage_backend.go +++ b/ingest/ledgerbackend/buffered_storage_backend.go @@ -5,8 +5,6 @@ package ledgerbackend import ( "context" - "fmt" - "math" "sync" "time" @@ -15,16 +13,12 @@ import ( "github.com/stellar/go/support/datastore" "github.com/stellar/go/support/log" - "github.com/stellar/go/support/ordered" "github.com/stellar/go/xdr" ) // Ensure BufferedStorageBackend implements LedgerBackend var _ LedgerBackend = (*BufferedStorageBackend)(nil) -// provide testing hooks to inject mocks of these -var datastoreFactory = datastore.NewDataStore - type BufferedStorageBackendConfig struct { BufferSize uint32 `toml:"buffer_size"` NumWorkers uint32 `toml:"num_workers"` @@ -32,39 +26,6 @@ type BufferedStorageBackendConfig struct { RetryWait time.Duration `toml:"retry_wait"` } -// Generate a default buffered storage config with values -// set to optimize buffered performance to some degree based -// on number of ledgers per file expected in the underlying -// datastore used by an instance of BufferedStorageBackend. -// -// these numbers were derived empirically from benchmarking analysis: -// https://github.com/stellar/go/issues/5390 -// -// ledgersPerFile - number of ledgers per file from remote datastore schema. -// return - preconfigured instance of BufferedStorageBackendConfig -func DefaultBufferedStorageBackendConfig(ledgersPerFile uint32) BufferedStorageBackendConfig { - - config := BufferedStorageBackendConfig{ - RetryLimit: 5, - RetryWait: 30 * time.Second, - } - - switch { - case ledgersPerFile < 2: - config.BufferSize = 500 - config.NumWorkers = 5 - return config - case ledgersPerFile < 101: - config.BufferSize = 10 - config.NumWorkers = 5 - return config - default: - config.BufferSize = 10 - config.NumWorkers = 2 - return config - } -} - // BufferedStorageBackend is a ledger backend that reads from a storage service. // The storage service contains files generated from the ledgerExporter. type BufferedStorageBackend struct { @@ -119,107 +80,6 @@ type PublisherConfig struct { Log *log.Entry } -// PublishFromBufferedStorageBackend is asynchronous. -// Proceeds to create an internal instance of BufferedStorageBackend -// using provided configs and emit ledgers asynchronously to the provided -// callback fn for all ledgers in the requested range. -// -// ledgerRange - the requested range. If bounded range, will close resultCh -// after last ledger is emitted. -// -// publisherConfig - PublisherConfig. Provide configuration settings for DataStore -// and BufferedStorageBackend. Use DefaultBufferedStorageBackendConfig() to create -// optimized BufferedStorageBackendConfig. -// -// ctx - the context. Caller uses this to cancel the asynchronousledger processing. -// If caller does cancel, can sync on resultCh to receive an error to confirm -// all asynchronous processing stopped. -// -// callback - function. Invoked for every LedgerCloseMeta. If callback invocation -// returns an error, the publishing will shut down and indicate with error on resultCh. -// -// return - channel, used to signal to caller when publishing has stopped. -// If stoppage was due to an error, the error will be sent on -// channel and then closed. If no errors and ledgerRange is bounded, -// the channel will be closed when range is completed. If ledgerRange -// is unbounded, then the channel is never closed until an error -// or caller cancels. -func PublishFromBufferedStorageBackend(ledgerRange Range, - publisherConfig PublisherConfig, - ctx context.Context, - callback func(xdr.LedgerCloseMeta) error) chan error { - - logger := publisherConfig.Log - if logger == nil { - logger = log.DefaultLogger - } - resultCh := make(chan error, 1) - - go func() { - dataStore, err := datastoreFactory(ctx, publisherConfig.DataStoreConfig) - if err != nil { - resultCh <- fmt.Errorf("failed to create datastore: %w", err) - return - } - - var ledgerBackend LedgerBackend - ledgerBackend, err = NewBufferedStorageBackend(publisherConfig.BufferedStorageConfig, dataStore) - if err != nil { - resultCh <- fmt.Errorf("failed to create buffered storage backend: %w", err) - return - } - - if publisherConfig.Registry != nil { - ledgerBackend = WithMetrics(ledgerBackend, publisherConfig.Registry, publisherConfig.RegistryNamespace) - } - - if ledgerRange.bounded && ledgerRange.to <= ledgerRange.from { - resultCh <- errors.New("invalid end value for bounded range, must be greater than start") - return - } - - if !ledgerRange.bounded && ledgerRange.to > 0 { - resultCh <- errors.New("invalid end value for unbounded ranged, must be zero") - return - } - - from := ordered.Max(2, ledgerRange.from) - to := ledgerRange.to - if !ledgerRange.bounded { - to = math.MaxUint32 - } - - ledgerBackend.PrepareRange(ctx, ledgerRange) - - for ledgerSeq := from; ledgerSeq <= to; ledgerSeq++ { - var ledgerCloseMeta xdr.LedgerCloseMeta - - logger.WithField("sequence", ledgerSeq).Info("Requesting ledger from the backend...") - startTime := time.Now() - ledgerCloseMeta, err = ledgerBackend.GetLedger(ctx, ledgerSeq) - - if err != nil { - resultCh <- errors.Wrap(err, "error getting ledger") - return - } - - log.WithFields(log.F{ - "sequence": ledgerSeq, - "duration": time.Since(startTime).Seconds(), - }).Info("Ledger returned from the backend") - - err = callback(ledgerCloseMeta) - if err != nil { - resultCh <- errors.Wrap(err, "received an error from callback invocation") - return - } - } - close(resultCh) - }() - - return resultCh -} - // GetLatestLedgerSequence returns the most recent ledger sequence number available in the buffer. func (bsb *BufferedStorageBackend) GetLatestLedgerSequence(ctx context.Context) (uint32, error) { bsb.bsBackendLock.RLock() diff --git a/ingest/ledgerbackend/buffered_storage_backend_test.go b/ingest/ledgerbackend/buffered_storage_backend_test.go index 08b77149b5..0d461cff07 100644 --- a/ingest/ledgerbackend/buffered_storage_backend_test.go +++ b/ingest/ledgerbackend/buffered_storage_backend_test.go @@ -11,7 +11,6 @@ import ( "testing" "time" - "github.com/pkg/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -303,286 +302,6 @@ func TestBSBPrepareRange(t *testing.T) { assert.NotNil(t, bsb.prepared) } -func TestBSBProducerFn(t *testing.T) { - startLedger := uint32(2) - endLedger := uint32(3) - ctx := context.Background() - ledgerRange := BoundedRange(startLedger, endLedger) - mockDataStore := createMockdataStore(t, startLedger, endLedger, partitionSize, ledgerPerFileCount) - dsConfig := datastore.DataStoreConfig{} - pubConfig := PublisherConfig{ - DataStoreConfig: dsConfig, - BufferedStorageConfig: DefaultBufferedStorageBackendConfig(1), - } - - // inject the mock datastore using the package private testing factory override - datastoreFactory = func(ctx context.Context, datastoreConfig datastore.DataStoreConfig) (datastore.DataStore, error) { - assert.Equal(t, datastoreConfig, dsConfig) - return mockDataStore, nil - } - - expectedLcmSeqWasPublished := []bool{false, false} - - appCallback := func(lcm xdr.LedgerCloseMeta) error { - if lcm.MustV0().LedgerHeader.Header.LedgerSeq == 2 { - if expectedLcmSeqWasPublished[0] { - assert.Fail(t, "producer fn had multiple callback invocations for same lcm") - } - expectedLcmSeqWasPublished[0] = true - } - if lcm.MustV0().LedgerHeader.Header.LedgerSeq == 3 { - if expectedLcmSeqWasPublished[1] { - assert.Fail(t, "producer fn had multiple callback invocations for same lcm") - } - expectedLcmSeqWasPublished[1] = true - } - return nil - } - - resultCh := PublishFromBufferedStorageBackend(ledgerRange, pubConfig, ctx, appCallback) - - assert.Eventually(t, func() bool { - select { - case chErr, ok := <-resultCh: - if ok { - assert.Failf(t, "", "producer fn should not have stopped with error %v", chErr) - } - return true - default: - } - return false - }, - time.Second*3, - time.Millisecond*50) - - assert.Equal(t, expectedLcmSeqWasPublished, []bool{true, true}, "producer fn did not invoke callback for all expected lcm") - -} - -func TestBSBProducerFnDataStoreError(t *testing.T) { - ctx := context.Background() - ledgerRange := BoundedRange(uint32(2), uint32(3)) - pubConfig := PublisherConfig{ - DataStoreConfig: datastore.DataStoreConfig{}, - BufferedStorageConfig: BufferedStorageBackendConfig{}, - } - - datastoreFactory = func(ctx context.Context, datastoreConfig datastore.DataStoreConfig) (datastore.DataStore, error) { - return &datastore.MockDataStore{}, errors.New("uhoh") - } - - appCallback := func(lcm xdr.LedgerCloseMeta) error { - return nil - } - - resultCh := PublishFromBufferedStorageBackend(ledgerRange, pubConfig, ctx, appCallback) - assert.Eventually(t, func() bool { - select { - case chErr, ok := <-resultCh: - if ok { - assert.ErrorContains(t, chErr, "failed to create datastore:") - } else { - assert.Fail(t, "", "producer fn should not have closed the result ch") - } - return true - default: - } - return false - }, - time.Second*3, - time.Millisecond*50) -} - -func TestBSBProducerFnConfigError(t *testing.T) { - ctx := context.Background() - ledgerRange := BoundedRange(uint32(2), uint32(3)) - pubConfig := PublisherConfig{ - DataStoreConfig: datastore.DataStoreConfig{}, - BufferedStorageConfig: BufferedStorageBackendConfig{}, - } - mockDataStore := new(datastore.MockDataStore) - appCallback := func(lcm xdr.LedgerCloseMeta) error { - return nil - } - - datastoreFactory = func(_ context.Context, _ datastore.DataStoreConfig) (datastore.DataStore, error) { - return mockDataStore, nil - } - resultCh := PublishFromBufferedStorageBackend(ledgerRange, pubConfig, ctx, appCallback) - assert.Eventually(t, func() bool { - select { - case chErr, ok := <-resultCh: - if ok { - assert.ErrorContains(t, chErr, "failed to create buffered storage backend") - } else { - assert.Fail(t, "producer fn should not have closed the result ch") - } - return true - default: - } - return false - }, - time.Second*3, - time.Millisecond*50) -} - -func TestBSBProducerFnInvalidRange(t *testing.T) { - ctx := context.Background() - pubConfig := PublisherConfig{ - DataStoreConfig: datastore.DataStoreConfig{}, - BufferedStorageConfig: DefaultBufferedStorageBackendConfig(1), - } - mockDataStore := new(datastore.MockDataStore) - mockDataStore.On("GetSchema").Return(datastore.DataStoreSchema{ - LedgersPerFile: 1, - FilesPerPartition: 1, - }) - - appCallback := func(lcm xdr.LedgerCloseMeta) error { - return nil - } - - datastoreFactory = func(_ context.Context, _ datastore.DataStoreConfig) (datastore.DataStore, error) { - return mockDataStore, nil - } - resultCh := PublishFromBufferedStorageBackend(BoundedRange(uint32(3), uint32(2)), pubConfig, ctx, appCallback) - assert.Eventually(t, func() bool { - select { - case chErr, ok := <-resultCh: - if ok { - assert.ErrorContains(t, chErr, "invalid end value for bounded range, must be greater than start") - } else { - assert.Fail(t, "producer fn should not have closed the result ch") - } - return true - default: - } - return false - }, - time.Second*3, - time.Millisecond*50) - - resultCh = PublishFromBufferedStorageBackend(Range{from: uint32(2), to: uint32(3), bounded: false}, pubConfig, ctx, appCallback) - assert.Eventually(t, func() bool { - select { - case chErr, ok := <-resultCh: - if ok { - assert.ErrorContains(t, chErr, "invalid end value for unbounded ranged, must be zero") - } else { - assert.Fail(t, "producer fn should not have closed the result ch") - } - return true - default: - } - return false - }, - time.Second*3, - time.Millisecond*50) -} - -func TestBSBProducerFnGetLedgerError(t *testing.T) { - ctx := context.Background() - pubConfig := PublisherConfig{ - DataStoreConfig: datastore.DataStoreConfig{}, - BufferedStorageConfig: DefaultBufferedStorageBackendConfig(1), - } - // we don't want to wait for retries, forece the first error to propagate - pubConfig.BufferedStorageConfig.RetryLimit = 0 - mockDataStore := new(datastore.MockDataStore) - mockDataStore.On("GetSchema").Return(datastore.DataStoreSchema{ - LedgersPerFile: 1, - FilesPerPartition: 1, - }) - mockDataStore.On("GetFile", mock.Anything, "FFFFFFFD--2.xdr.zstd").Return(nil, os.ErrNotExist).Once() - mockDataStore.On("GetFile", mock.Anything, "FFFFFFFC--3.xdr.zstd").Return(createLCMBatchReader(3, 3, 1), nil).Once() - - appCallback := func(lcm xdr.LedgerCloseMeta) error { - return nil - } - - datastoreFactory = func(_ context.Context, _ datastore.DataStoreConfig) (datastore.DataStore, error) { - return mockDataStore, nil - } - resultCh := PublishFromBufferedStorageBackend(BoundedRange(uint32(2), uint32(3)), pubConfig, ctx, appCallback) - assert.Eventually(t, func() bool { - select { - case chErr, ok := <-resultCh: - if ok { - assert.ErrorContains(t, chErr, "error getting ledger") - } else { - assert.Fail(t, "producer fn should not have closed the result ch") - } - return true - default: - } - return false - }, - time.Second*3000, - time.Millisecond*50) -} - -func TestBSBProducerFnCallbackError(t *testing.T) { - ctx := context.Background() - pubConfig := PublisherConfig{ - DataStoreConfig: datastore.DataStoreConfig{}, - BufferedStorageConfig: DefaultBufferedStorageBackendConfig(1), - } - mockDataStore := createMockdataStore(t, 2, 3, partitionSize, ledgerPerFileCount) - - appCallback := func(lcm xdr.LedgerCloseMeta) error { - return errors.New("uhoh") - } - - datastoreFactory = func(_ context.Context, _ datastore.DataStoreConfig) (datastore.DataStore, error) { - return mockDataStore, nil - } - resultCh := PublishFromBufferedStorageBackend(BoundedRange(uint32(2), uint32(3)), pubConfig, ctx, appCallback) - assert.Eventually(t, func() bool { - select { - case chErr, ok := <-resultCh: - if ok { - assert.ErrorContains(t, chErr, "received an error from callback invocation") - } else { - assert.Fail(t, "producer fn should not have closed the result ch") - } - return true - default: - } - return false - }, - time.Second*3, - time.Millisecond*50) -} - -func TestDefaultBSBConfigs(t *testing.T) { - smallConfig := BufferedStorageBackendConfig{ - RetryLimit: 5, - RetryWait: 30 * time.Second, - BufferSize: 500, - NumWorkers: 5, - } - - mediumConfig := BufferedStorageBackendConfig{ - RetryLimit: 5, - RetryWait: 30 * time.Second, - BufferSize: 10, - NumWorkers: 5, - } - - largeConfig := BufferedStorageBackendConfig{ - RetryLimit: 5, - RetryWait: 30 * time.Second, - BufferSize: 10, - NumWorkers: 2, - } - - assert.Equal(t, DefaultBufferedStorageBackendConfig(1), smallConfig) - assert.Equal(t, DefaultBufferedStorageBackendConfig(2), mediumConfig) - assert.Equal(t, DefaultBufferedStorageBackendConfig(100), mediumConfig) - assert.Equal(t, DefaultBufferedStorageBackendConfig(101), largeConfig) - assert.Equal(t, DefaultBufferedStorageBackendConfig(1000), largeConfig) -} - func TestBSBIsPrepared_Bounded(t *testing.T) { startLedger := uint32(3) endLedger := uint32(5) diff --git a/ingest/ledgerbackend/range.go b/ingest/ledgerbackend/range.go index f0c80695a1..99b4dfc800 100644 --- a/ingest/ledgerbackend/range.go +++ b/ingest/ledgerbackend/range.go @@ -46,6 +46,18 @@ func (r Range) String() string { return fmt.Sprintf("[%d,latest)", r.from) } +func (r Range) Bounded() bool { + return r.bounded +} + +func (r Range) To() uint32 { + return r.to +} + +func (r Range) From() uint32 { + return r.from +} + func (r Range) Contains(other Range) bool { if r.bounded && !other.bounded { return false