Skip to content

Commit

Permalink
stellar#5412: forgot to include new files on last commit
Browse files Browse the repository at this point in the history
  • Loading branch information
sreuland committed Sep 25, 2024
1 parent cd7bb34 commit 3414d24
Show file tree
Hide file tree
Showing 2 changed files with 489 additions and 0 deletions.
152 changes: 152 additions & 0 deletions ingest/cdp/producer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package cdp

import (
"context"
"fmt"
"math"
"time"

"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/support/datastore"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/log"
"github.com/stellar/go/support/ordered"
"github.com/stellar/go/xdr"
)

// provide testing hooks to inject mocks of these
var datastoreFactory = datastore.NewDataStore

// Generate a default buffered storage config with values
// set to optimize buffered performance to some degree based
// on number of ledgers per file expected in the underlying
// datastore used by an instance of BufferedStorageBackend.
//
// these numbers were derived empirically from benchmarking analysis:
// https://github.com/stellar/go/issues/5390
//
// ledgersPerFile - number of ledgers per file from remote datastore schema.
// return - preconfigured instance of BufferedStorageBackendConfig
func DefaultBufferedStorageBackendConfig(ledgersPerFile uint32) ledgerbackend.BufferedStorageBackendConfig {

config := ledgerbackend.BufferedStorageBackendConfig{
RetryLimit: 5,
RetryWait: 30 * time.Second,
}

switch {
case ledgersPerFile < 2:
config.BufferSize = 500
config.NumWorkers = 5
return config
case ledgersPerFile < 101:
config.BufferSize = 10
config.NumWorkers = 5
return config
default:
config.BufferSize = 10
config.NumWorkers = 2
return config
}
}

// PublishFromBufferedStorageBackend is asynchronous.
// Proceeds to create an internal instance of BufferedStorageBackend
// using provided configs and emit ledgers asynchronously to the provided
// callback fn for all ledgers in the requested range.
//
// ledgerRange - the requested range. If bounded range, will close resultCh
// after last ledger is emitted.
//
// publisherConfig - PublisherConfig. Provide configuration settings for DataStore
// and BufferedStorageBackend. Use DefaultBufferedStorageBackendConfig() to create
// optimized BufferedStorageBackendConfig.
//
// ctx - the context. Caller uses this to cancel the asynchronousledger processing.
// If caller does cancel, can sync on resultCh to receive an error to confirm
// all asynchronous processing stopped.
//
// callback - function. Invoked for every LedgerCloseMeta. If callback invocation
// returns an error, the publishing will shut down and indicate with error on resultCh.
//
// return - channel, used to signal to caller when publishing has stopped.
// If stoppage was due to an error, the error will be sent on
// channel and then closed. If no errors and ledgerRange is bounded,
// the channel will be closed when range is completed. If ledgerRange
// is unbounded, then the channel is never closed until an error
// or caller cancels.
func PublishFromBufferedStorageBackend(ledgerRange ledgerbackend.Range,
publisherConfig ledgerbackend.PublisherConfig,
ctx context.Context,
callback func(xdr.LedgerCloseMeta) error) chan error {

logger := publisherConfig.Log
if logger == nil {
logger = log.DefaultLogger
}
resultCh := make(chan error, 1)

go func() {
dataStore, err := datastoreFactory(ctx, publisherConfig.DataStoreConfig)
if err != nil {
resultCh <- fmt.Errorf("failed to create datastore: %w", err)
return
}

var ledgerBackend ledgerbackend.LedgerBackend
ledgerBackend, err = ledgerbackend.NewBufferedStorageBackend(publisherConfig.BufferedStorageConfig, dataStore)
if err != nil {
resultCh <- fmt.Errorf("failed to create buffered storage backend: %w", err)
return
}

if publisherConfig.Registry != nil {
ledgerBackend = ledgerbackend.WithMetrics(ledgerBackend, publisherConfig.Registry, publisherConfig.RegistryNamespace)
}

if ledgerRange.Bounded() && ledgerRange.To() <= ledgerRange.From() {
resultCh <- errors.New("invalid end value for bounded range, must be greater than start")
return
}

if !ledgerRange.Bounded() && ledgerRange.To() > 0 {
resultCh <- errors.New("invalid end value for unbounded ranged, must be zero")
return
}

from := ordered.Max(2, ledgerRange.From())
to := ledgerRange.To()
if !ledgerRange.Bounded() {
to = math.MaxUint32
}

ledgerBackend.PrepareRange(ctx, ledgerRange)

for ledgerSeq := from; ledgerSeq <= to; ledgerSeq++ {
var ledgerCloseMeta xdr.LedgerCloseMeta

logger.WithField("sequence", ledgerSeq).Info("Requesting ledger from the backend...")
startTime := time.Now()
ledgerCloseMeta, err = ledgerBackend.GetLedger(ctx, ledgerSeq)

if err != nil {
resultCh <- errors.Wrap(err, "error getting ledger")
return
}

log.WithFields(log.F{
"sequence": ledgerSeq,
"duration": time.Since(startTime).Seconds(),
}).Info("Ledger returned from the backend")

err = callback(ledgerCloseMeta)
if err != nil {
resultCh <- errors.Wrap(err, "received an error from callback invocation")
return
}
}
close(resultCh)
}()

return resultCh
}
Loading

0 comments on commit 3414d24

Please sign in to comment.