Skip to content

Commit

Permalink
Improve startup by eliminating unnecessary migration ranges (stellar#282
Browse files Browse the repository at this point in the history
)

Speed up startup time in "already migrated" cases by eliminating unnecessary
ledger range traversals.

Specifically, this includes the following changes:

 - Fee stats windows cannot exceed the history retention window, as this doesn't
   make sense.

 - Migrations that are already applied are not added to the list of
   "multi-migrations".

 - Fee stats window building conforms to the migration interface to simplify
   code.

 - LedgerSeqRange has been refactored to always use a value reference.

As a result, if all migrations have occurred, the traversal only occurs over the fee window.
  • Loading branch information
Shaptic authored Sep 4, 2024
1 parent b668361 commit e110732
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 62 deletions.
102 changes: 74 additions & 28 deletions cmd/soroban-rpc/internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,15 @@ import (
)

const (
prometheusNamespace = "soroban_rpc"
maxLedgerEntryWriteBatchSize = 150
defaultReadTimeout = 5 * time.Second
defaultShutdownGracePeriod = 10 * time.Second
inMemoryInitializationLedgerLogPeriod = 1_000_000
prometheusNamespace = "soroban_rpc"
maxLedgerEntryWriteBatchSize = 150
defaultReadTimeout = 5 * time.Second
defaultShutdownGracePeriod = 10 * time.Second

// Since our default retention window will be 7 days (7*17,280 ledgers),
// choose a random 5-digit prime to have irregular logging intervals at each
// halfish-day of processing
inMemoryInitializationLedgerLogPeriod = 10_099
)

type Daemon struct {
Expand Down Expand Up @@ -289,6 +293,29 @@ func MustNew(cfg *config.Config, logger *supportlog.Entry) *Daemon {

// mustInitializeStorage initializes the storage using what was on the DB
func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows {
//
// There's some complex "ledger window math" here so we should clarify it
// beforehand.
//
// There are two windows in play here:
// - the ledger retention window, which describes the range of txmeta
// to keep relative to the latest "ledger tip" of the network
// - the fee stats window, which describes a *subset* of the prior
// ledger retention window on which to perform fee analysis
//
// If the fee window *exceeds* the retention window, this doesn't make any
// sense since it implies the user wants to store N amount of actual
// historical data and M > N amount of ledgers just for fee processing,
// which is nonsense from a performance standpoint. We prevent this:
maxFeeRetentionWindow := max(
cfg.ClassicFeeStatsLedgerRetentionWindow,
cfg.SorobanFeeStatsLedgerRetentionWindow)
if maxFeeRetentionWindow > cfg.HistoryRetentionWindow {
d.logger.Fatalf(
"Fee stat analysis window (%d) cannot exceed history retention window (%d).",
maxFeeRetentionWindow, cfg.HistoryRetentionWindow)
}

feeWindows := feewindow.NewFeeWindows(
cfg.ClassicFeeStatsLedgerRetentionWindow,
cfg.SorobanFeeStatsLedgerRetentionWindow,
Expand All @@ -299,27 +326,42 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows
readTxMetaCtx, cancelReadTxMeta := context.WithTimeout(context.Background(), cfg.IngestionTimeout)
defer cancelReadTxMeta()

var initialSeq, currentSeq uint32
applicableRange, err := db.GetMigrationLedgerRange(readTxMetaCtx, d.db, cfg.HistoryRetentionWindow)
// To combine these windows, we launch as follows:
//
// 1. First, identify the ledger range for database migrations based on the
// ledger retention window. Since we don't do "partial" migrations (all or
// nothing), this represents the entire range of ledger metas we store.
//
retentionRange, err := db.GetMigrationLedgerRange(readTxMetaCtx, d.db, cfg.HistoryRetentionWindow)
if err != nil {
d.logger.WithError(err).Fatal("could not get ledger range for migration")
}

maxFeeRetentionWindow := max(cfg.ClassicFeeStatsLedgerRetentionWindow, cfg.SorobanFeeStatsLedgerRetentionWindow)
feeStatsRange, err := db.GetMigrationLedgerRange(readTxMetaCtx, d.db, maxFeeRetentionWindow)
dataMigrations, err := db.BuildMigrations(
readTxMetaCtx, d.logger, d.db, cfg.NetworkPassphrase, retentionRange)
if err != nil {
d.logger.WithError(err).Fatal("could not get ledger range for fee stats")
d.logger.WithError(err).Fatal("could not build migrations")
}

// Combine the ledger range for fees, events and transactions
ledgerSeqRange := feeStatsRange.Merge(applicableRange)

dataMigrations, err := db.BuildMigrations(readTxMetaCtx, d.logger, d.db, cfg.NetworkPassphrase, ledgerSeqRange)
// 2. Then, incorporate the fee analysis window. If there are migrations to
// do, this has no effect, since migration windows are larger than the fee
// window. In the absence of migrations, though, this means the ingestion
// range is just the fee stat range.
//
feeStatsRange, err := db.GetMigrationLedgerRange(readTxMetaCtx, d.db, maxFeeRetentionWindow)
if err != nil {
d.logger.WithError(err).Fatal("could not build migrations")
d.logger.WithError(err).Fatal("could not get ledger range for fee stats")
}

// Apply migration for events, transactions and fee stats
// Additionally, by treating the fee window *as if* it's a migration, we can
// make the interface here really clean.
dataMigrations.Append(feeWindows.AsMigration(feeStatsRange))
ledgerSeqRange := dataMigrations.ApplicableRange()

//
// 3. Apply all migrations, including fee stat analysis.
//
var initialSeq, currentSeq uint32
err = db.NewLedgerReader(d.db).StreamLedgerRange(
readTxMetaCtx,
ledgerSeqRange.First,
Expand All @@ -328,32 +370,36 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows
currentSeq = txMeta.LedgerSequence()
if initialSeq == 0 {
initialSeq = currentSeq
d.logger.WithField("seq", currentSeq).
Info("initializing in-memory store")
d.logger.
WithField("first", initialSeq).
WithField("last", ledgerSeqRange.Last).
Info("Initializing in-memory store")
} else if (currentSeq-initialSeq)%inMemoryInitializationLedgerLogPeriod == 0 {
d.logger.WithField("seq", currentSeq).
Debug("still initializing in-memory store")
}

if err = feeWindows.IngestFees(txMeta); err != nil {
d.logger.WithError(err).Fatal("could not initialize fee stats")
d.logger.
WithField("seq", currentSeq).
WithField("last", ledgerSeqRange.Last).
Debug("Still initializing in-memory store")
}

if err := dataMigrations.Apply(readTxMetaCtx, txMeta); err != nil {
d.logger.WithError(err).Fatal("could not apply migration for ledger ", currentSeq)
}

return nil
})
if err != nil {
d.logger.WithError(err).Fatal("could not obtain txmeta cache from the database")
d.logger.WithError(err).Fatal("Could not obtain txmeta cache from the database")
}

if err := dataMigrations.Commit(readTxMetaCtx); err != nil {
d.logger.WithError(err).Fatal("could not commit data migrations")
d.logger.WithError(err).Fatal("Could not commit data migrations")
}

if currentSeq != 0 {
d.logger.WithField("seq", currentSeq).
Info("finished initializing in-memory store and applying DB data migrations")
d.logger.
WithField("first", retentionRange.First).
WithField("last", retentionRange.Last).
Info("Finished initializing in-memory store and applying DB data migrations")
}

return feeWindows
Expand Down
6 changes: 3 additions & 3 deletions cmd/soroban-rpc/internal/db/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,8 @@ type eventTableMigration struct {
writer EventWriter
}

func (e *eventTableMigration) ApplicableRange() *LedgerSeqRange {
return &LedgerSeqRange{
func (e *eventTableMigration) ApplicableRange() LedgerSeqRange {
return LedgerSeqRange{
First: e.firstLedger,
Last: e.lastLedger,
}
Expand All @@ -326,7 +326,7 @@ func newEventTableMigration(
_ context.Context,
logger *log.Entry,
passphrase string,
ledgerSeqRange *LedgerSeqRange,
ledgerSeqRange LedgerSeqRange,
) migrationApplierFactory {
return migrationApplierFactoryF(func(db *DB) (MigrationApplier, error) {
migration := eventTableMigration{
Expand Down
76 changes: 50 additions & 26 deletions cmd/soroban-rpc/internal/db/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,38 +19,40 @@ type LedgerSeqRange struct {
Last uint32
}

func (mlr *LedgerSeqRange) IsLedgerIncluded(ledgerSeq uint32) bool {
if mlr == nil {
return false
}
func (mlr LedgerSeqRange) IsLedgerIncluded(ledgerSeq uint32) bool {
return ledgerSeq >= mlr.First && ledgerSeq <= mlr.Last
}

func (mlr *LedgerSeqRange) Merge(other *LedgerSeqRange) *LedgerSeqRange {
if mlr == nil {
func (mlr LedgerSeqRange) Merge(other LedgerSeqRange) LedgerSeqRange {
if mlr.Empty() {
return other
}
if other == nil {
if other.Empty() {
return mlr
}

// TODO: using min/max can result in a much larger range than needed,
// as an optimization, we should probably use a sequence of ranges instead.
return &LedgerSeqRange{
return LedgerSeqRange{
First: min(mlr.First, other.First),
Last: max(mlr.Last, other.Last),
}
}

func (mlr LedgerSeqRange) Empty() bool {
return mlr.First == 0 && mlr.Last == 0
}

type MigrationApplier interface {
// ApplicableRange returns the closed ledger sequence interval,
// where Apply() should be called. A null result indicates the empty range
ApplicableRange() *LedgerSeqRange
// where Apply() should be called.
ApplicableRange() LedgerSeqRange
// Apply applies the migration on a ledger. It should never be applied
// in ledgers outside the ApplicableRange()
Apply(ctx context.Context, meta xdr.LedgerCloseMeta) error
}

type migrationApplierF func(context.Context, *log.Entry, string, *LedgerSeqRange) migrationApplierFactory
type migrationApplierF func(context.Context, *log.Entry, string, LedgerSeqRange) migrationApplierFactory

type migrationApplierFactory interface {
New(db *DB) (MigrationApplier, error)
Expand All @@ -72,8 +74,15 @@ type MultiMigration struct {
db *DB
}

func (mm MultiMigration) ApplicableRange() *LedgerSeqRange {
var result *LedgerSeqRange
func (mm *MultiMigration) Append(m Migration) {
r := m.ApplicableRange()
if !r.Empty() {
mm.migrations = append(mm.migrations, m)
}
}

func (mm MultiMigration) ApplicableRange() LedgerSeqRange {
var result LedgerSeqRange
for _, m := range mm.migrations {
result = m.ApplicableRange().Merge(result)
}
Expand Down Expand Up @@ -117,13 +126,18 @@ type guardedMigration struct {
}

func newGuardedDataMigration(
ctx context.Context, uniqueMigrationName string, logger *log.Entry, factory migrationApplierFactory, db *DB,
ctx context.Context, uniqueMigrationName string,
logger *log.Entry, factory migrationApplierFactory, db *DB,
) (Migration, error) {
metaKey := "Migration" + uniqueMigrationName + "Done"
previouslyMigrated, err := getMetaBool(ctx, db, metaKey)
if err != nil && !errors.Is(err, ErrEmptyDB) {
return nil, err
}
if previouslyMigrated {
//nolint:nilnil // a sentinel value here would be stupid
return nil, nil
}
applier, err := factory.New(db)
if err != nil {
return nil, err
Expand All @@ -145,15 +159,15 @@ func (g *guardedMigration) Apply(ctx context.Context, meta xdr.LedgerCloseMeta)
return nil
}
if !g.applyLogged {
g.logger.WithField("ledger", meta.LedgerSequence()).Info("applying migration")
g.logger.WithField("ledger", meta.LedgerSequence()).Info("Applying migration")
g.applyLogged = true
}
return g.migration.Apply(ctx, meta)
}

func (g *guardedMigration) ApplicableRange() *LedgerSeqRange {
func (g *guardedMigration) ApplicableRange() LedgerSeqRange {
if g.alreadyMigrated {
return nil
return LedgerSeqRange{}
}
return g.migration.ApplicableRange()
}
Expand All @@ -165,38 +179,41 @@ func (g *guardedMigration) Commit(ctx context.Context) error {
return setMetaBool(ctx, g.db, g.guardMetaKey, true)
}

func GetMigrationLedgerRange(ctx context.Context, db *DB, retentionWindow uint32) (*LedgerSeqRange, error) {
func GetMigrationLedgerRange(ctx context.Context, db *DB, retentionWindow uint32) (LedgerSeqRange, error) {
firstLedgerToMigrate := firstLedger
latestLedger, err := NewLedgerEntryReader(db).GetLatestLedgerSequence(ctx)
if err != nil && !errors.Is(err, ErrEmptyDB) {
return nil, fmt.Errorf("failed to get latest ledger sequence: %w", err)
return LedgerSeqRange{}, fmt.Errorf("failed to get latest ledger sequence: %w", err)
}
if latestLedger > retentionWindow {
firstLedgerToMigrate = latestLedger - retentionWindow
}
return &LedgerSeqRange{
return LedgerSeqRange{
First: firstLedgerToMigrate,
Last: latestLedger,
}, nil
}

func BuildMigrations(ctx context.Context, logger *log.Entry, db *DB, networkPassphrase string,
ledgerSeqRange *LedgerSeqRange,
func BuildMigrations(
ctx context.Context, logger *log.Entry, db *DB, networkPassphrase string,
ledgerSeqRange LedgerSeqRange,
) (MultiMigration, error) {
// Start a common db transaction for the entire migration duration
err := db.Begin(ctx)
if err != nil {
return MultiMigration{}, errors.Join(err, db.Rollback())
}

migrationNameToFunc := map[string]migrationApplierF{
//
// Add new DB migrations here:
//
currentMigrations := map[string]migrationApplierF{
transactionsMigrationName: newTransactionTableMigration,
eventsMigrationName: newEventTableMigration,
}

migrations := make([]Migration, 0, len(migrationNameToFunc))

for migrationName, migrationFunc := range migrationNameToFunc {
migrations := make([]Migration, 0, len(currentMigrations))
for migrationName, migrationFunc := range currentMigrations {
migrationLogger := logger.WithField("migration", migrationName)
factory := migrationFunc(
ctx,
Expand All @@ -210,8 +227,15 @@ func BuildMigrations(ctx context.Context, logger *log.Entry, db *DB, networkPass
return MultiMigration{}, errors.Join(err, fmt.Errorf(
"could not create guarded migration for %s", migrationName), db.Rollback())
}

if guardedM == nil {
logger.Infof("Skipping completed migration %s", migrationName)
continue
}

migrations = append(migrations, guardedM)
}

return MultiMigration{
migrations: migrations,
db: db,
Expand Down
6 changes: 3 additions & 3 deletions cmd/soroban-rpc/internal/db/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,8 @@ type transactionTableMigration struct {
writer TransactionWriter
}

func (t *transactionTableMigration) ApplicableRange() *LedgerSeqRange {
return &LedgerSeqRange{
func (t *transactionTableMigration) ApplicableRange() LedgerSeqRange {
return LedgerSeqRange{
First: t.firstLedger,
Last: t.lastLedger,
}
Expand All @@ -270,7 +270,7 @@ func newTransactionTableMigration(
ctx context.Context,
logger *log.Entry,
passphrase string,
ledgerSeqRange *LedgerSeqRange,
ledgerSeqRange LedgerSeqRange,
) migrationApplierFactory {
return migrationApplierFactoryF(func(db *DB) (MigrationApplier, error) {
// Truncate the table, since it may contain data, causing insert conflicts later on.
Expand Down
Loading

0 comments on commit e110732

Please sign in to comment.