Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ingest/ledgerbackend: Create functional producer for BufferedStorageBackend #5462

Merged
merged 19 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
1b3f07e
#5412: functional producer for bufferedstorage backend
sreuland Sep 17, 2024
58402cb
Merge remote-tracking branch 'upstream/master' into func_buffered
sreuland Sep 17, 2024
a26ff3d
#5412: updated changelog to reflect new function feature
sreuland Sep 20, 2024
cd7bb34
#5412: moved the producer fn into new cdp package under ingest
sreuland Sep 25, 2024
3414d24
#5412: forgot to include new files on last commit
sreuland Sep 25, 2024
68f7f43
#5412: moved PublisherConfig to cdp package
sreuland Sep 25, 2024
adeaaf0
Merge remote-tracking branch 'upstream/master' into func_buffered
sreuland Sep 25, 2024
4876f2e
#5412: review feedback
sreuland Sep 26, 2024
94285c0
#5412: review feedback on loop logic
sreuland Sep 26, 2024
52bb0bd
Merge remote-tracking branch 'upstream/master' into func_buffered
sreuland Sep 27, 2024
8c3f694
#5412: converted producer function to sync signature, per review feed…
sreuland Sep 27, 2024
f1e9d27
#5412: add unit test to assert caller ctx cancellation outcome
sreuland Sep 27, 2024
952d4d6
#5412: fixed unit test for producer caller cancel ctx
sreuland Sep 30, 2024
4447f68
#5412: fixed unit test for producer get ledger error case
sreuland Sep 30, 2024
03cd4b6
#5412: fixed unit test for producer get ledger error case, mock asserts
sreuland Sep 30, 2024
d25a506
#5412: included changelog on new ingest/cdp package
sreuland Sep 30, 2024
33ba777
Merge remote-tracking branch 'upstream/master' into func_buffered
sreuland Oct 2, 2024
1b1244b
#5412: renamed the producer fn to ApplyLedgerMetadata
sreuland Oct 3, 2024
6eba05b
Merge remote-tracking branch 'upstream/master' into func_buffered
sreuland Oct 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 156 additions & 0 deletions ingest/ledgerbackend/buffered_storage_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,66 @@ package ledgerbackend

import (
"context"
"fmt"
"math"
"sync"
"time"

"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"

"github.com/stellar/go/support/datastore"
"github.com/stellar/go/support/log"
"github.com/stellar/go/support/ordered"
"github.com/stellar/go/xdr"
)

// Ensure BufferedStorageBackend implements LedgerBackend
var _ LedgerBackend = (*BufferedStorageBackend)(nil)

// provide testing hooks to inject mocks of these
var datastoreFactory = datastore.NewDataStore

type BufferedStorageBackendConfig struct {
BufferSize uint32 `toml:"buffer_size"`
NumWorkers uint32 `toml:"num_workers"`
RetryLimit uint32 `toml:"retry_limit"`
RetryWait time.Duration `toml:"retry_wait"`
}

// Generate a default buffered storage config with values
// set to optimize buffered performance to some degree based
// on number of ledgers per file expected in the underlying
// datastore used by an instance of BufferedStorageBackend.
//
// these numbers were derived empirically from benchmarking analysis:
// https://github.com/stellar/go/issues/5390
//
// ledgersPerFile - number of ledgers per file from remote datastore schema.
// return - preconfigured instance of BufferedStorageBackendConfig
func DefaultBufferedStorageBackendConfig(ledgersPerFile uint32) BufferedStorageBackendConfig {

config := BufferedStorageBackendConfig{
RetryLimit: 5,
RetryWait: 30 * time.Second,
}

switch {
case ledgersPerFile < 2:
config.BufferSize = 500
config.NumWorkers = 5
return config
case ledgersPerFile < 101:
config.BufferSize = 10
config.NumWorkers = 5
return config
default:
config.BufferSize = 10
config.NumWorkers = 2
return config
}
}

// BufferedStorageBackend is a ledger backend that reads from a storage service.
// The storage service contains files generated from the ledgerExporter.
type BufferedStorageBackend struct {
Expand Down Expand Up @@ -64,6 +105,121 @@ func NewBufferedStorageBackend(config BufferedStorageBackendConfig, dataStore da
return bsBackend, nil
}

type PublisherConfig struct {
// Registry, optional, include to capture buffered storage backend metrics
Registry *prometheus.Registry
// RegistryNamespace, optional, include to emit buffered storage backend
// under this namespace
RegistryNamespace string
// BufferedStorageConfig, required
BufferedStorageConfig BufferedStorageBackendConfig
//DataStoreConfig, required
DataStoreConfig datastore.DataStoreConfig
// Log, optional, if nil uses go default logger
Log *log.Entry
}

// PublishFromBufferedStorageBackend is asynchronous.
// Proceeds to create an internal instance of BufferedStorageBackend
// using provided configs and emit ledgers asynchronously to the provided
// callback fn for all ledgers in the requested range.
//
// ledgerRange - the requested range. If bounded range, will close resultCh
// after last ledger is emitted.
//
// publisherConfig - PublisherConfig. Provide configuration settings for DataStore
// and BufferedStorageBackend. Use DefaultBufferedStorageBackendConfig() to create
// optimized BufferedStorageBackendConfig.
//
// ctx - the context. Caller uses this to cancel the asynchronousledger processing.
// If caller does cancel, can sync on resultCh to receive an error to confirm
// all asynchronous processing stopped.
//
// callback - function. Invoked for every LedgerCloseMeta. If callback invocation
// returns an error, the publishing will shut down and indicate with error on resultCh.
//
// return - channel, used to signal to caller when publishing has stopped.
// If stoppage was due to an error, the error will be sent on
// channel and then closed. If no errors and ledgerRange is bounded,
// the channel will be closed when range is completed. If ledgerRange
// is unbounded, then the channel is never closed until an error
// or caller cancels.
func PublishFromBufferedStorageBackend(ledgerRange Range,
publisherConfig PublisherConfig,
ctx context.Context,
callback func(xdr.LedgerCloseMeta) error) chan error {

logger := publisherConfig.Log
if logger == nil {
logger = log.DefaultLogger
}
resultCh := make(chan error, 1)

go func() {
sreuland marked this conversation as resolved.
Show resolved Hide resolved
sreuland marked this conversation as resolved.
Show resolved Hide resolved
dataStore, err := datastoreFactory(ctx, publisherConfig.DataStoreConfig)
if err != nil {
resultCh <- fmt.Errorf("failed to create datastore: %w", err)
return
}

var ledgerBackend LedgerBackend
ledgerBackend, err = NewBufferedStorageBackend(publisherConfig.BufferedStorageConfig, dataStore)
if err != nil {
resultCh <- fmt.Errorf("failed to create buffered storage backend: %w", err)
return
}

if publisherConfig.Registry != nil {
ledgerBackend = WithMetrics(ledgerBackend, publisherConfig.Registry, publisherConfig.RegistryNamespace)
}

if ledgerRange.bounded && ledgerRange.to <= ledgerRange.from {
resultCh <- errors.New("invalid end value for bounded range, must be greater than start")
return
}

if !ledgerRange.bounded && ledgerRange.to > 0 {
resultCh <- errors.New("invalid end value for unbounded ranged, must be zero")
sreuland marked this conversation as resolved.
Show resolved Hide resolved
return
}

from := ordered.Max(2, ledgerRange.from)
to := ledgerRange.to
if !ledgerRange.bounded {
to = math.MaxUint32
}
sreuland marked this conversation as resolved.
Show resolved Hide resolved

ledgerBackend.PrepareRange(ctx, ledgerRange)

for ledgerSeq := from; ledgerSeq <= to; ledgerSeq++ {
var ledgerCloseMeta xdr.LedgerCloseMeta

logger.WithField("sequence", ledgerSeq).Info("Requesting ledger from the backend...")
startTime := time.Now()
ledgerCloseMeta, err = ledgerBackend.GetLedger(ctx, ledgerSeq)

if err != nil {
resultCh <- errors.Wrap(err, "error getting ledger")
return
}

log.WithFields(log.F{
"sequence": ledgerSeq,
"duration": time.Since(startTime).Seconds(),
}).Info("Ledger returned from the backend")

err = callback(ledgerCloseMeta)
if err != nil {
resultCh <- errors.Wrap(err, "received an error from callback invocation")
sreuland marked this conversation as resolved.
Show resolved Hide resolved
return
}
}
close(resultCh)
sreuland marked this conversation as resolved.
Show resolved Hide resolved
}()

return resultCh
}

// GetLatestLedgerSequence returns the most recent ledger sequence number available in the buffer.
func (bsb *BufferedStorageBackend) GetLatestLedgerSequence(ctx context.Context) (uint32, error) {
bsb.bsBackendLock.RLock()
Expand Down
Loading
Loading