From f4ba5a3e8abc07f00c16abdee90001af71b73932 Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Fri, 20 Jan 2023 14:33:00 +0100 Subject: [PATCH] Run db checkpoint-prefilling in paralallel with captive core preparation --- .../ledgerentry_storage.go | 61 ++++++++++--------- 1 file changed, 32 insertions(+), 29 deletions(-) diff --git a/cmd/soroban-rpc/internal/ledgerentry_storage/ledgerentry_storage.go b/cmd/soroban-rpc/internal/ledgerentry_storage/ledgerentry_storage.go index f678510a9a..f06e045d5c 100644 --- a/cmd/soroban-rpc/internal/ledgerentry_storage/ledgerentry_storage.go +++ b/cmd/soroban-rpc/internal/ledgerentry_storage/ledgerentry_storage.go @@ -65,23 +65,17 @@ func (ls *ledgerEntryStorage) Close() error { return ls.db.Close() } -func (ls *ledgerEntryStorage) fillEntriesFromLatestCheckpoint(ctx context.Context, archive historyarchive.ArchiveInterface) (uint32, error) { - root, err := archive.GetRootHAS() - if err != nil { - return 0, err - } - startCheckpointLedger := root.CurrentLedger - - ls.logger.Infof("Starting processing of checkpoint %d", startCheckpointLedger) +func (ls *ledgerEntryStorage) fillEntriesFromCheckpoint(ctx context.Context, archive historyarchive.ArchiveInterface, checkpointLedger uint32) error { + ls.logger.Infof("Starting processing of checkpoint %d", checkpointLedger) checkpointCtx, cancelCheckpointCtx := context.WithTimeout(ctx, ls.timeout) defer cancelCheckpointCtx() - reader, err := ingest.NewCheckpointChangeReader(checkpointCtx, archive, startCheckpointLedger) + reader, err := ingest.NewCheckpointChangeReader(checkpointCtx, archive, checkpointLedger) if err != nil { - return 0, err + return err } - tx, err := ls.db.NewLedgerEntryUpdaterTx(startCheckpointLedger, maxBatchSize) + tx, err := ls.db.NewLedgerEntryUpdaterTx(checkpointLedger, maxBatchSize) if err != nil { - return 0, err + return err } // Make sure we finish the updating transaction entryCount := 0 @@ -89,7 +83,7 @@ func (ls *ledgerEntryStorage) fillEntriesFromLatestCheckpoint(ctx context.Contex for { select { case <-ctx.Done(): - return 0, context.Canceled + return context.Canceled default: } change, err := reader.Read() @@ -97,7 +91,7 @@ func (ls *ledgerEntryStorage) fillEntriesFromLatestCheckpoint(ctx context.Contex break } if err != nil { - return 0, err + return err } entry := change.Post @@ -106,7 +100,7 @@ func (ls *ledgerEntryStorage) fillEntriesFromLatestCheckpoint(ctx context.Contex continue } if err := tx.UpsertLedgerEntry(key, *entry); err != nil { - return 0, err + return err } entryCount++ @@ -115,45 +109,54 @@ func (ls *ledgerEntryStorage) fillEntriesFromLatestCheckpoint(ctx context.Contex } } - ls.logger.Info("Committing ledger entries") + ls.logger.Info("Committing checkpoint ledger entries") if err = tx.Done(); err != nil { - return 0, err + return err } ls.logger.Info("Finished checkpoint processing") - return startCheckpointLedger, nil + return nil } func (ls *ledgerEntryStorage) run(ctx context.Context, archive historyarchive.ArchiveInterface, ledgerBackend backends.LedgerBackend) { defer ls.wg.Done() + var checkPointPrefillWg sync.WaitGroup // First, make sure the DB has a complete ledger entry baseline - - startCheckpointLedger, err := ls.db.GetLatestLedgerSequence() + startLedger, err := ls.db.GetLatestLedgerSequence() if err == ErrEmptyDB { - // DB is empty, let's fill it from a checkpoint + // DB is empty, let's fill it from the History Archive, using the latest available checkpoint ls.logger.Infof("Found an empty database, filling it in from the most recent checkpoint (this can take up to 30 minutes, depending on the network)") - startCheckpointLedger, err = ls.fillEntriesFromLatestCheckpoint(ctx, archive) + root, err := archive.GetRootHAS() if err != nil { panic(err) } - } - if err != nil { + startLedger = root.CurrentLedger + // Do it in parallel with the upcoming captive core preparation to save time + checkPointPrefillWg.Add(1) + go func() { + defer checkPointPrefillWg.Done() + if err = ls.fillEntriesFromCheckpoint(ctx, archive, startLedger); err != nil { + panic(err) + } + }() + } else if err != nil { panic(err) } // Secondly, continuously process txmeta deltas - - // TODO: we can probably do the preparation in parallel with the checkpoint processing above prepareRangeCtx, cancelPrepareRange := context.WithTimeout(ctx, ls.timeout) - if err := ledgerBackend.PrepareRange(prepareRangeCtx, backends.UnboundedRange(startCheckpointLedger)); err != nil { + if err := ledgerBackend.PrepareRange(prepareRangeCtx, backends.UnboundedRange(startLedger)); err != nil { panic(err) } cancelPrepareRange() - nextLedger := startCheckpointLedger + 1 + // Make sure that the checkpoint prefill (if any), happened before starting to apply deltas + checkPointPrefillWg.Wait() + + nextLedger := startLedger + 1 for { - ls.logger.Info("Processing txmeta of ledger", nextLedger) + ls.logger.Infof("Applying txmeta ledger entries changes for ledger %d", nextLedger) reader, err := ingest.NewLedgerChangeReader(ctx, ledgerBackend, ls.networkPassPhrase, nextLedger) if err != nil { panic(err)