diff --git a/ingest/cdp/producer.go b/ingest/cdp/producer.go new file mode 100644 index 0000000000..05178b1164 --- /dev/null +++ b/ingest/cdp/producer.go @@ -0,0 +1,152 @@ +package cdp + +import ( + "context" + "fmt" + "math" + "time" + + "github.com/stellar/go/ingest/ledgerbackend" + "github.com/stellar/go/support/datastore" + "github.com/stellar/go/support/errors" + "github.com/stellar/go/support/log" + "github.com/stellar/go/support/ordered" + "github.com/stellar/go/xdr" +) + +// provide testing hooks to inject mocks of these +var datastoreFactory = datastore.NewDataStore + +// Generate a default buffered storage config with values +// set to optimize buffered performance to some degree based +// on number of ledgers per file expected in the underlying +// datastore used by an instance of BufferedStorageBackend. +// +// these numbers were derived empirically from benchmarking analysis: +// https://github.com/stellar/go/issues/5390 +// +// ledgersPerFile - number of ledgers per file from remote datastore schema. +// return - preconfigured instance of BufferedStorageBackendConfig +func DefaultBufferedStorageBackendConfig(ledgersPerFile uint32) ledgerbackend.BufferedStorageBackendConfig { + + config := ledgerbackend.BufferedStorageBackendConfig{ + RetryLimit: 5, + RetryWait: 30 * time.Second, + } + + switch { + case ledgersPerFile < 2: + config.BufferSize = 500 + config.NumWorkers = 5 + return config + case ledgersPerFile < 101: + config.BufferSize = 10 + config.NumWorkers = 5 + return config + default: + config.BufferSize = 10 + config.NumWorkers = 2 + return config + } +} + +// PublishFromBufferedStorageBackend is asynchronous. +// Proceeds to create an internal instance of BufferedStorageBackend +// using provided configs and emit ledgers asynchronously to the provided +// callback fn for all ledgers in the requested range. +// +// ledgerRange - the requested range. If bounded range, will close resultCh +// after last ledger is emitted. +// +// publisherConfig - PublisherConfig. Provide configuration settings for DataStore +// and BufferedStorageBackend. Use DefaultBufferedStorageBackendConfig() to create +// optimized BufferedStorageBackendConfig. +// +// ctx - the context. Caller uses this to cancel the asynchronousledger processing. +// If caller does cancel, can sync on resultCh to receive an error to confirm +// all asynchronous processing stopped. +// +// callback - function. Invoked for every LedgerCloseMeta. If callback invocation +// returns an error, the publishing will shut down and indicate with error on resultCh. +// +// return - channel, used to signal to caller when publishing has stopped. +// If stoppage was due to an error, the error will be sent on +// channel and then closed. If no errors and ledgerRange is bounded, +// the channel will be closed when range is completed. If ledgerRange +// is unbounded, then the channel is never closed until an error +// or caller cancels. +func PublishFromBufferedStorageBackend(ledgerRange ledgerbackend.Range, + publisherConfig ledgerbackend.PublisherConfig, + ctx context.Context, + callback func(xdr.LedgerCloseMeta) error) chan error { + + logger := publisherConfig.Log + if logger == nil { + logger = log.DefaultLogger + } + resultCh := make(chan error, 1) + + go func() { + dataStore, err := datastoreFactory(ctx, publisherConfig.DataStoreConfig) + if err != nil { + resultCh <- fmt.Errorf("failed to create datastore: %w", err) + return + } + + var ledgerBackend ledgerbackend.LedgerBackend + ledgerBackend, err = ledgerbackend.NewBufferedStorageBackend(publisherConfig.BufferedStorageConfig, dataStore) + if err != nil { + resultCh <- fmt.Errorf("failed to create buffered storage backend: %w", err) + return + } + + if publisherConfig.Registry != nil { + ledgerBackend = ledgerbackend.WithMetrics(ledgerBackend, publisherConfig.Registry, publisherConfig.RegistryNamespace) + } + + if ledgerRange.Bounded() && ledgerRange.To() <= ledgerRange.From() { + resultCh <- errors.New("invalid end value for bounded range, must be greater than start") + return + } + + if !ledgerRange.Bounded() && ledgerRange.To() > 0 { + resultCh <- errors.New("invalid end value for unbounded ranged, must be zero") + return + } + + from := ordered.Max(2, ledgerRange.From()) + to := ledgerRange.To() + if !ledgerRange.Bounded() { + to = math.MaxUint32 + } + + ledgerBackend.PrepareRange(ctx, ledgerRange) + + for ledgerSeq := from; ledgerSeq <= to; ledgerSeq++ { + var ledgerCloseMeta xdr.LedgerCloseMeta + + logger.WithField("sequence", ledgerSeq).Info("Requesting ledger from the backend...") + startTime := time.Now() + ledgerCloseMeta, err = ledgerBackend.GetLedger(ctx, ledgerSeq) + + if err != nil { + resultCh <- errors.Wrap(err, "error getting ledger") + return + } + + log.WithFields(log.F{ + "sequence": ledgerSeq, + "duration": time.Since(startTime).Seconds(), + }).Info("Ledger returned from the backend") + + err = callback(ledgerCloseMeta) + if err != nil { + resultCh <- errors.Wrap(err, "received an error from callback invocation") + return + } + } + close(resultCh) + }() + + return resultCh +} diff --git a/ingest/cdp/producer_test.go b/ingest/cdp/producer_test.go new file mode 100644 index 0000000000..d783d1209a --- /dev/null +++ b/ingest/cdp/producer_test.go @@ -0,0 +1,337 @@ +package cdp + +import ( + "bytes" + "context" + "fmt" + "io" + "math" + "os" + "testing" + "time" + + "github.com/stellar/go/ingest/ledgerbackend" + "github.com/stellar/go/support/compressxdr" + "github.com/stellar/go/support/datastore" + "github.com/stellar/go/support/errors" + "github.com/stellar/go/xdr" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +func TestDefaultBSBConfigs(t *testing.T) { + smallConfig := ledgerbackend.BufferedStorageBackendConfig{ + RetryLimit: 5, + RetryWait: 30 * time.Second, + BufferSize: 500, + NumWorkers: 5, + } + + mediumConfig := ledgerbackend.BufferedStorageBackendConfig{ + RetryLimit: 5, + RetryWait: 30 * time.Second, + BufferSize: 10, + NumWorkers: 5, + } + + largeConfig := ledgerbackend.BufferedStorageBackendConfig{ + RetryLimit: 5, + RetryWait: 30 * time.Second, + BufferSize: 10, + NumWorkers: 2, + } + + assert.Equal(t, DefaultBufferedStorageBackendConfig(1), smallConfig) + assert.Equal(t, DefaultBufferedStorageBackendConfig(2), mediumConfig) + assert.Equal(t, DefaultBufferedStorageBackendConfig(100), mediumConfig) + assert.Equal(t, DefaultBufferedStorageBackendConfig(101), largeConfig) + assert.Equal(t, DefaultBufferedStorageBackendConfig(1000), largeConfig) +} + +func TestBSBProducerFn(t *testing.T) { + startLedger := uint32(2) + endLedger := uint32(3) + ctx := context.Background() + ledgerRange := ledgerbackend.BoundedRange(startLedger, endLedger) + mockDataStore := createMockdataStore(t, startLedger, endLedger, 64000) + dsConfig := datastore.DataStoreConfig{} + pubConfig := ledgerbackend.PublisherConfig{ + DataStoreConfig: dsConfig, + BufferedStorageConfig: DefaultBufferedStorageBackendConfig(1), + } + + // inject the mock datastore using the package private testing factory override + datastoreFactory = func(ctx context.Context, datastoreConfig datastore.DataStoreConfig) (datastore.DataStore, error) { + assert.Equal(t, datastoreConfig, dsConfig) + return mockDataStore, nil + } + + expectedLcmSeqWasPublished := []bool{false, false} + + appCallback := func(lcm xdr.LedgerCloseMeta) error { + if lcm.MustV0().LedgerHeader.Header.LedgerSeq == 2 { + if expectedLcmSeqWasPublished[0] { + assert.Fail(t, "producer fn had multiple callback invocations for same lcm") + } + expectedLcmSeqWasPublished[0] = true + } + if lcm.MustV0().LedgerHeader.Header.LedgerSeq == 3 { + if expectedLcmSeqWasPublished[1] { + assert.Fail(t, "producer fn had multiple callback invocations for same lcm") + } + expectedLcmSeqWasPublished[1] = true + } + return nil + } + + resultCh := PublishFromBufferedStorageBackend(ledgerRange, pubConfig, ctx, appCallback) + + assert.Eventually(t, func() bool { + select { + case chErr, ok := <-resultCh: + if ok { + assert.Failf(t, "", "producer fn should not have stopped with error %v", chErr) + } + return true + default: + } + return false + }, + time.Second*3, + time.Millisecond*50) + + assert.Equal(t, expectedLcmSeqWasPublished, []bool{true, true}, "producer fn did not invoke callback for all expected lcm") + +} + +func TestBSBProducerFnDataStoreError(t *testing.T) { + ctx := context.Background() + ledgerRange := ledgerbackend.BoundedRange(uint32(2), uint32(3)) + pubConfig := ledgerbackend.PublisherConfig{ + DataStoreConfig: datastore.DataStoreConfig{}, + BufferedStorageConfig: ledgerbackend.BufferedStorageBackendConfig{}, + } + + datastoreFactory = func(ctx context.Context, datastoreConfig datastore.DataStoreConfig) (datastore.DataStore, error) { + return &datastore.MockDataStore{}, errors.New("uhoh") + } + + appCallback := func(lcm xdr.LedgerCloseMeta) error { + return nil + } + + resultCh := PublishFromBufferedStorageBackend(ledgerRange, pubConfig, ctx, appCallback) + assert.Eventually(t, func() bool { + select { + case chErr, ok := <-resultCh: + if ok { + assert.ErrorContains(t, chErr, "failed to create datastore:") + } else { + assert.Fail(t, "", "producer fn should not have closed the result ch") + } + return true + default: + } + return false + }, + time.Second*3, + time.Millisecond*50) +} + +func TestBSBProducerFnConfigError(t *testing.T) { + ctx := context.Background() + ledgerRange := ledgerbackend.BoundedRange(uint32(2), uint32(3)) + pubConfig := ledgerbackend.PublisherConfig{ + DataStoreConfig: datastore.DataStoreConfig{}, + BufferedStorageConfig: ledgerbackend.BufferedStorageBackendConfig{}, + } + mockDataStore := new(datastore.MockDataStore) + appCallback := func(lcm xdr.LedgerCloseMeta) error { + return nil + } + + datastoreFactory = func(_ context.Context, _ datastore.DataStoreConfig) (datastore.DataStore, error) { + return mockDataStore, nil + } + resultCh := PublishFromBufferedStorageBackend(ledgerRange, pubConfig, ctx, appCallback) + assert.Eventually(t, func() bool { + select { + case chErr, ok := <-resultCh: + if ok { + assert.ErrorContains(t, chErr, "failed to create buffered storage backend") + } else { + assert.Fail(t, "producer fn should not have closed the result ch") + } + return true + default: + } + return false + }, + time.Second*3, + time.Millisecond*50) +} + +func TestBSBProducerFnInvalidRange(t *testing.T) { + ctx := context.Background() + pubConfig := ledgerbackend.PublisherConfig{ + DataStoreConfig: datastore.DataStoreConfig{}, + BufferedStorageConfig: DefaultBufferedStorageBackendConfig(1), + } + mockDataStore := new(datastore.MockDataStore) + mockDataStore.On("GetSchema").Return(datastore.DataStoreSchema{ + LedgersPerFile: 1, + FilesPerPartition: 1, + }) + + appCallback := func(lcm xdr.LedgerCloseMeta) error { + return nil + } + + datastoreFactory = func(_ context.Context, _ datastore.DataStoreConfig) (datastore.DataStore, error) { + return mockDataStore, nil + } + resultCh := PublishFromBufferedStorageBackend(ledgerbackend.BoundedRange(uint32(3), uint32(2)), pubConfig, ctx, appCallback) + assert.Eventually(t, func() bool { + select { + case chErr, ok := <-resultCh: + if ok { + assert.ErrorContains(t, chErr, "invalid end value for bounded range, must be greater than start") + } else { + assert.Fail(t, "producer fn should not have closed the result ch") + } + return true + default: + } + return false + }, + time.Second*3, + time.Millisecond*50) +} + +func TestBSBProducerFnGetLedgerError(t *testing.T) { + ctx := context.Background() + pubConfig := ledgerbackend.PublisherConfig{ + DataStoreConfig: datastore.DataStoreConfig{}, + BufferedStorageConfig: DefaultBufferedStorageBackendConfig(1), + } + // we don't want to wait for retries, forece the first error to propagate + pubConfig.BufferedStorageConfig.RetryLimit = 0 + mockDataStore := new(datastore.MockDataStore) + mockDataStore.On("GetSchema").Return(datastore.DataStoreSchema{ + LedgersPerFile: 1, + FilesPerPartition: 1, + }) + + mockDataStore.On("GetFile", mock.Anything, "FFFFFFFD--2.xdr.zstd").Return(nil, os.ErrNotExist).Once() + mockDataStore.On("GetFile", mock.Anything, "FFFFFFFC--3.xdr.zstd").Return(makeSingleLCMBatch(3), nil).Once() + + appCallback := func(lcm xdr.LedgerCloseMeta) error { + return nil + } + + datastoreFactory = func(_ context.Context, _ datastore.DataStoreConfig) (datastore.DataStore, error) { + return mockDataStore, nil + } + resultCh := PublishFromBufferedStorageBackend(ledgerbackend.BoundedRange(uint32(2), uint32(3)), pubConfig, ctx, appCallback) + assert.Eventually(t, func() bool { + select { + case chErr, ok := <-resultCh: + if ok { + assert.ErrorContains(t, chErr, "error getting ledger") + } else { + assert.Fail(t, "producer fn should not have closed the result ch") + } + return true + default: + } + return false + }, + time.Second*3000, + time.Millisecond*50) +} + +func TestBSBProducerFnCallbackError(t *testing.T) { + ctx := context.Background() + pubConfig := ledgerbackend.PublisherConfig{ + DataStoreConfig: datastore.DataStoreConfig{}, + BufferedStorageConfig: DefaultBufferedStorageBackendConfig(1), + } + mockDataStore := createMockdataStore(t, 2, 3, 64000) + + appCallback := func(lcm xdr.LedgerCloseMeta) error { + return errors.New("uhoh") + } + + datastoreFactory = func(_ context.Context, _ datastore.DataStoreConfig) (datastore.DataStore, error) { + return mockDataStore, nil + } + resultCh := PublishFromBufferedStorageBackend(ledgerbackend.BoundedRange(uint32(2), uint32(3)), pubConfig, ctx, appCallback) + assert.Eventually(t, func() bool { + select { + case chErr, ok := <-resultCh: + if ok { + assert.ErrorContains(t, chErr, "received an error from callback invocation") + } else { + assert.Fail(t, "producer fn should not have closed the result ch") + } + return true + default: + } + return false + }, + time.Second*3, + time.Millisecond*50) +} + +func createMockdataStore(t *testing.T, start, end, partitionSize uint32) *datastore.MockDataStore { + mockDataStore := new(datastore.MockDataStore) + partition := partitionSize - 1 + for i := start; i <= end; i++ { + objectName := fmt.Sprintf("FFFFFFFF--0-%d/%08X--%d.xdr.zstd", partition, math.MaxUint32-i, i) + mockDataStore.On("GetFile", mock.Anything, objectName).Return(makeSingleLCMBatch(i), nil).Times(1) + } + mockDataStore.On("GetSchema").Return(datastore.DataStoreSchema{ + LedgersPerFile: 1, + FilesPerPartition: partitionSize, + }) + + t.Cleanup(func() { + mockDataStore.AssertExpectations(t) + }) + + return mockDataStore +} + +func makeSingleLCMBatch(seq uint32) io.ReadCloser { + lcm := xdr.LedgerCloseMetaBatch{ + StartSequence: xdr.Uint32(seq), + EndSequence: xdr.Uint32(seq), + LedgerCloseMetas: []xdr.LedgerCloseMeta{ + createLedgerCloseMeta(seq), + }, + } + encoder := compressxdr.NewXDREncoder(compressxdr.DefaultCompressor, lcm) + var buf bytes.Buffer + encoder.WriteTo(&buf) + capturedBuf := buf.Bytes() + reader := bytes.NewReader(capturedBuf) + return io.NopCloser(reader) +} + +func createLedgerCloseMeta(ledgerSeq uint32) xdr.LedgerCloseMeta { + return xdr.LedgerCloseMeta{ + V: int32(0), + V0: &xdr.LedgerCloseMetaV0{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + LedgerSeq: xdr.Uint32(ledgerSeq), + }, + }, + TxSet: xdr.TransactionSet{}, + TxProcessing: nil, + UpgradesProcessing: nil, + ScpInfo: nil, + }, + V1: nil, + } +}