Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ingest/ledgerbackend: Create functional producer for BufferedStorageBackend #5462

Merged
merged 19 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
1b3f07e
#5412: functional producer for bufferedstorage backend
sreuland Sep 17, 2024
58402cb
Merge remote-tracking branch 'upstream/master' into func_buffered
sreuland Sep 17, 2024
a26ff3d
#5412: updated changelog to reflect new function feature
sreuland Sep 20, 2024
cd7bb34
#5412: moved the producer fn into new cdp package under ingest
sreuland Sep 25, 2024
3414d24
#5412: forgot to include new files on last commit
sreuland Sep 25, 2024
68f7f43
#5412: moved PublisherConfig to cdp package
sreuland Sep 25, 2024
adeaaf0
Merge remote-tracking branch 'upstream/master' into func_buffered
sreuland Sep 25, 2024
4876f2e
#5412: review feedback
sreuland Sep 26, 2024
94285c0
#5412: review feedback on loop logic
sreuland Sep 26, 2024
52bb0bd
Merge remote-tracking branch 'upstream/master' into func_buffered
sreuland Sep 27, 2024
8c3f694
#5412: converted producer function to sync signature, per review feed…
sreuland Sep 27, 2024
f1e9d27
#5412: add unit test to assert caller ctx cancellation outcome
sreuland Sep 27, 2024
952d4d6
#5412: fixed unit test for producer caller cancel ctx
sreuland Sep 30, 2024
4447f68
#5412: fixed unit test for producer get ledger error case
sreuland Sep 30, 2024
03cd4b6
#5412: fixed unit test for producer get ledger error case, mock asserts
sreuland Sep 30, 2024
d25a506
#5412: included changelog on new ingest/cdp package
sreuland Sep 30, 2024
33ba777
Merge remote-tracking branch 'upstream/master' into func_buffered
sreuland Oct 2, 2024
1b1244b
#5412: renamed the producer fn to ApplyLedgerMetadata
sreuland Oct 3, 2024
6eba05b
Merge remote-tracking branch 'upstream/master' into func_buffered
sreuland Oct 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions ingest/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

All notable changes to this project will be documented in this file. This project adheres to [Semantic Versioning](http://semver.org/).

## Pending

### New Features
* Create new package `ingest/cdp` for new components which will assist towards writing data transformation pipelines as part of [Composable Data Platform](https://stellar.org/blog/developers/composable-data-platform).
* Add new functional producer, `cdp.PublishFromBufferedStorageBackend`. A new function which enables a private instance of `BufferedStorageBackend` to perfrom the role of a producer operator in streaming pipeline designs. It will emit pre-computed `LedgerCloseMeta` from a chosen `DataStore`. The stream can use `PublishFromBufferedStorageBackend` as the origin of `LedgerCloseMeta`, providing a callback function which acts as the next operator in the stream, receiving the `LedgerCloseMeta`. [5462](https://github.com/stellar/go/pull/5462).

### Stellar Core Protocol 21 Configuration Update:
* BucketlistDB is now the default database for stellar-core, replacing the experimental option. As a result, the `EXPERIMENTAL_BUCKETLIST_DB` configuration parameter has been deprecated.
* A new mandatory parameter, `DEPRECATED_SQL_LEDGER_STATE`, has been added with a default value of false which equivalent to `EXPERIMENTAL_BUCKETLIST_DB` being set to true.
Expand Down
147 changes: 147 additions & 0 deletions ingest/cdp/producer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package cdp
sreuland marked this conversation as resolved.
Show resolved Hide resolved

import (
"context"
"fmt"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/support/datastore"
"github.com/stellar/go/support/log"
"github.com/stellar/go/support/ordered"
"github.com/stellar/go/xdr"
)

// provide testing hooks to inject mocks of these
var datastoreFactory = datastore.NewDataStore

// Generate a default buffered storage config with values
// set to optimize buffered performance to some degree based
// on number of ledgers per file expected in the underlying
// datastore used by an instance of BufferedStorageBackend.
//
// these numbers were derived empirically from benchmarking analysis:
// https://github.com/stellar/go/issues/5390
//
// ledgersPerFile - number of ledgers per file from remote datastore schema.
// return - preconfigured instance of BufferedStorageBackendConfig
func DefaultBufferedStorageBackendConfig(ledgersPerFile uint32) ledgerbackend.BufferedStorageBackendConfig {

config := ledgerbackend.BufferedStorageBackendConfig{
RetryLimit: 5,
RetryWait: 30 * time.Second,
}

switch {
case ledgersPerFile < 2:
config.BufferSize = 500
config.NumWorkers = 5
return config
case ledgersPerFile < 101:
config.BufferSize = 10
config.NumWorkers = 5
return config
default:
config.BufferSize = 10
config.NumWorkers = 2
return config
}
}

type PublisherConfig struct {
// Registry, optional, include to capture buffered storage backend metrics
Registry *prometheus.Registry
// RegistryNamespace, optional, include to emit buffered storage backend
// under this namespace
RegistryNamespace string
// BufferedStorageConfig, required
BufferedStorageConfig ledgerbackend.BufferedStorageBackendConfig
//DataStoreConfig, required
DataStoreConfig datastore.DataStoreConfig
// Log, optional, if nil uses go default logger
Log *log.Entry
}

// PublishFromBufferedStorageBackend - create an internal instance
// of BufferedStorageBackend using provided config and emit
// ledger metadata for the requested range by invoking the provided callback
// once per ledger.
//
// The function is blocking, it will only return when a bounded range
// is completed, the ctx is canceled, or an error occurs.
//
// ledgerRange - the requested range, can be bounded or unbounded.
//
// publisherConfig - PublisherConfig. Provide configuration settings for DataStore
// and BufferedStorageBackend. Use DefaultBufferedStorageBackendConfig() to create
// optimized BufferedStorageBackendConfig.
//
// ctx - the context. Caller uses this to cancel the internal ledger processing,
// when canceled, the function will return asap with that error.
//
// callback - function. Invoked for every LedgerCloseMeta. If callback invocation
// returns an error, the processing will stop and return an error asap.
//
// return - error, function only returns if requested range is bounded or an error occured.
// nil will be returned only if bounded range requested and completed processing with no errors.
// otherwise return will always be an error.
func PublishFromBufferedStorageBackend(ledgerRange ledgerbackend.Range,
sreuland marked this conversation as resolved.
Show resolved Hide resolved
publisherConfig PublisherConfig,
ctx context.Context,
callback func(xdr.LedgerCloseMeta) error) error {

logger := publisherConfig.Log
if logger == nil {
logger = log.DefaultLogger
}

dataStore, err := datastoreFactory(ctx, publisherConfig.DataStoreConfig)
if err != nil {
return fmt.Errorf("failed to create datastore: %w", err)
}

var ledgerBackend ledgerbackend.LedgerBackend
ledgerBackend, err = ledgerbackend.NewBufferedStorageBackend(publisherConfig.BufferedStorageConfig, dataStore)
if err != nil {
return fmt.Errorf("failed to create buffered storage backend: %w", err)
}

if publisherConfig.Registry != nil {
ledgerBackend = ledgerbackend.WithMetrics(ledgerBackend, publisherConfig.Registry, publisherConfig.RegistryNamespace)
}

if ledgerRange.Bounded() && ledgerRange.To() <= ledgerRange.From() {
return fmt.Errorf("invalid end value for bounded range, must be greater than start")
}

if !ledgerRange.Bounded() && ledgerRange.To() > 0 {
return fmt.Errorf("invalid end value for unbounded range, must be zero")
}

from := ordered.Max(2, ledgerRange.From())
ledgerBackend.PrepareRange(ctx, ledgerRange)

for ledgerSeq := from; ledgerSeq <= ledgerRange.To() || !ledgerRange.Bounded(); ledgerSeq++ {
var ledgerCloseMeta xdr.LedgerCloseMeta

logger.WithField("sequence", ledgerSeq).Info("Requesting ledger from the backend...")
startTime := time.Now()
ledgerCloseMeta, err = ledgerBackend.GetLedger(ctx, ledgerSeq)

if err != nil {
return fmt.Errorf("error getting ledger, %w", err)
}

log.WithFields(log.F{
"sequence": ledgerSeq,
"duration": time.Since(startTime).Seconds(),
}).Info("Ledger returned from the backend")

err = callback(ledgerCloseMeta)
if err != nil {
return fmt.Errorf("received an error from callback invocation: %w", err)
}
}
return nil
}
Loading
Loading