Skip to content

Commit

Permalink
#5412: moved the producer fn into new cdp package under ingest
Browse files Browse the repository at this point in the history
  • Loading branch information
sreuland committed Sep 25, 2024
1 parent a26ff3d commit cd7bb34
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 421 deletions.
140 changes: 0 additions & 140 deletions ingest/ledgerbackend/buffered_storage_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ package ledgerbackend

import (
"context"
"fmt"
"math"
"sync"
"time"

Expand All @@ -15,56 +13,19 @@ import (

"github.com/stellar/go/support/datastore"
"github.com/stellar/go/support/log"
"github.com/stellar/go/support/ordered"
"github.com/stellar/go/xdr"
)

// Ensure BufferedStorageBackend implements LedgerBackend
var _ LedgerBackend = (*BufferedStorageBackend)(nil)

// provide testing hooks to inject mocks of these
var datastoreFactory = datastore.NewDataStore

type BufferedStorageBackendConfig struct {
BufferSize uint32 `toml:"buffer_size"`
NumWorkers uint32 `toml:"num_workers"`
RetryLimit uint32 `toml:"retry_limit"`
RetryWait time.Duration `toml:"retry_wait"`
}

// Generate a default buffered storage config with values
// set to optimize buffered performance to some degree based
// on number of ledgers per file expected in the underlying
// datastore used by an instance of BufferedStorageBackend.
//
// these numbers were derived empirically from benchmarking analysis:
// https://github.com/stellar/go/issues/5390
//
// ledgersPerFile - number of ledgers per file from remote datastore schema.
// return - preconfigured instance of BufferedStorageBackendConfig
func DefaultBufferedStorageBackendConfig(ledgersPerFile uint32) BufferedStorageBackendConfig {

config := BufferedStorageBackendConfig{
RetryLimit: 5,
RetryWait: 30 * time.Second,
}

switch {
case ledgersPerFile < 2:
config.BufferSize = 500
config.NumWorkers = 5
return config
case ledgersPerFile < 101:
config.BufferSize = 10
config.NumWorkers = 5
return config
default:
config.BufferSize = 10
config.NumWorkers = 2
return config
}
}

// BufferedStorageBackend is a ledger backend that reads from a storage service.
// The storage service contains files generated from the ledgerExporter.
type BufferedStorageBackend struct {
Expand Down Expand Up @@ -119,107 +80,6 @@ type PublisherConfig struct {
Log *log.Entry
}

// PublishFromBufferedStorageBackend is asynchronous.
// Proceeds to create an internal instance of BufferedStorageBackend
// using provided configs and emit ledgers asynchronously to the provided
// callback fn for all ledgers in the requested range.
//
// ledgerRange - the requested range. If bounded range, will close resultCh
// after last ledger is emitted.
//
// publisherConfig - PublisherConfig. Provide configuration settings for DataStore
// and BufferedStorageBackend. Use DefaultBufferedStorageBackendConfig() to create
// optimized BufferedStorageBackendConfig.
//
// ctx - the context. Caller uses this to cancel the asynchronousledger processing.
// If caller does cancel, can sync on resultCh to receive an error to confirm
// all asynchronous processing stopped.
//
// callback - function. Invoked for every LedgerCloseMeta. If callback invocation
// returns an error, the publishing will shut down and indicate with error on resultCh.
//
// return - channel, used to signal to caller when publishing has stopped.
// If stoppage was due to an error, the error will be sent on
// channel and then closed. If no errors and ledgerRange is bounded,
// the channel will be closed when range is completed. If ledgerRange
// is unbounded, then the channel is never closed until an error
// or caller cancels.
func PublishFromBufferedStorageBackend(ledgerRange Range,
publisherConfig PublisherConfig,
ctx context.Context,
callback func(xdr.LedgerCloseMeta) error) chan error {

logger := publisherConfig.Log
if logger == nil {
logger = log.DefaultLogger
}
resultCh := make(chan error, 1)

go func() {
dataStore, err := datastoreFactory(ctx, publisherConfig.DataStoreConfig)
if err != nil {
resultCh <- fmt.Errorf("failed to create datastore: %w", err)
return
}

var ledgerBackend LedgerBackend
ledgerBackend, err = NewBufferedStorageBackend(publisherConfig.BufferedStorageConfig, dataStore)
if err != nil {
resultCh <- fmt.Errorf("failed to create buffered storage backend: %w", err)
return
}

if publisherConfig.Registry != nil {
ledgerBackend = WithMetrics(ledgerBackend, publisherConfig.Registry, publisherConfig.RegistryNamespace)
}

if ledgerRange.bounded && ledgerRange.to <= ledgerRange.from {
resultCh <- errors.New("invalid end value for bounded range, must be greater than start")
return
}

if !ledgerRange.bounded && ledgerRange.to > 0 {
resultCh <- errors.New("invalid end value for unbounded ranged, must be zero")
return
}

from := ordered.Max(2, ledgerRange.from)
to := ledgerRange.to
if !ledgerRange.bounded {
to = math.MaxUint32
}

ledgerBackend.PrepareRange(ctx, ledgerRange)

for ledgerSeq := from; ledgerSeq <= to; ledgerSeq++ {
var ledgerCloseMeta xdr.LedgerCloseMeta

logger.WithField("sequence", ledgerSeq).Info("Requesting ledger from the backend...")
startTime := time.Now()
ledgerCloseMeta, err = ledgerBackend.GetLedger(ctx, ledgerSeq)

if err != nil {
resultCh <- errors.Wrap(err, "error getting ledger")
return
}

log.WithFields(log.F{
"sequence": ledgerSeq,
"duration": time.Since(startTime).Seconds(),
}).Info("Ledger returned from the backend")

err = callback(ledgerCloseMeta)
if err != nil {
resultCh <- errors.Wrap(err, "received an error from callback invocation")
return
}
}
close(resultCh)
}()

return resultCh
}

// GetLatestLedgerSequence returns the most recent ledger sequence number available in the buffer.
func (bsb *BufferedStorageBackend) GetLatestLedgerSequence(ctx context.Context) (uint32, error) {
bsb.bsBackendLock.RLock()
Expand Down
Loading

0 comments on commit cd7bb34

Please sign in to comment.