Skip to content

Commit

Permalink
Run db checkpoint-prefilling in paralallel with captive core preparation
Browse files Browse the repository at this point in the history
  • Loading branch information
2opremio committed Jan 20, 2023
1 parent d7ffa06 commit f4ba5a3
Showing 1 changed file with 32 additions and 29 deletions.
61 changes: 32 additions & 29 deletions cmd/soroban-rpc/internal/ledgerentry_storage/ledgerentry_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,39 +65,33 @@ func (ls *ledgerEntryStorage) Close() error {
return ls.db.Close()
}

func (ls *ledgerEntryStorage) fillEntriesFromLatestCheckpoint(ctx context.Context, archive historyarchive.ArchiveInterface) (uint32, error) {
root, err := archive.GetRootHAS()
if err != nil {
return 0, err
}
startCheckpointLedger := root.CurrentLedger

ls.logger.Infof("Starting processing of checkpoint %d", startCheckpointLedger)
func (ls *ledgerEntryStorage) fillEntriesFromCheckpoint(ctx context.Context, archive historyarchive.ArchiveInterface, checkpointLedger uint32) error {
ls.logger.Infof("Starting processing of checkpoint %d", checkpointLedger)
checkpointCtx, cancelCheckpointCtx := context.WithTimeout(ctx, ls.timeout)
defer cancelCheckpointCtx()
reader, err := ingest.NewCheckpointChangeReader(checkpointCtx, archive, startCheckpointLedger)
reader, err := ingest.NewCheckpointChangeReader(checkpointCtx, archive, checkpointLedger)
if err != nil {
return 0, err
return err
}
tx, err := ls.db.NewLedgerEntryUpdaterTx(startCheckpointLedger, maxBatchSize)
tx, err := ls.db.NewLedgerEntryUpdaterTx(checkpointLedger, maxBatchSize)
if err != nil {
return 0, err
return err
}
// Make sure we finish the updating transaction
entryCount := 0

for {
select {
case <-ctx.Done():
return 0, context.Canceled
return context.Canceled
default:
}
change, err := reader.Read()
if err == io.EOF {
break
}
if err != nil {
return 0, err
return err
}

entry := change.Post
Expand All @@ -106,7 +100,7 @@ func (ls *ledgerEntryStorage) fillEntriesFromLatestCheckpoint(ctx context.Contex
continue
}
if err := tx.UpsertLedgerEntry(key, *entry); err != nil {
return 0, err
return err
}
entryCount++

Expand All @@ -115,45 +109,54 @@ func (ls *ledgerEntryStorage) fillEntriesFromLatestCheckpoint(ctx context.Contex
}
}

ls.logger.Info("Committing ledger entries")
ls.logger.Info("Committing checkpoint ledger entries")
if err = tx.Done(); err != nil {
return 0, err
return err
}

ls.logger.Info("Finished checkpoint processing")
return startCheckpointLedger, nil
return nil
}

func (ls *ledgerEntryStorage) run(ctx context.Context, archive historyarchive.ArchiveInterface, ledgerBackend backends.LedgerBackend) {
defer ls.wg.Done()
var checkPointPrefillWg sync.WaitGroup

// First, make sure the DB has a complete ledger entry baseline

startCheckpointLedger, err := ls.db.GetLatestLedgerSequence()
startLedger, err := ls.db.GetLatestLedgerSequence()
if err == ErrEmptyDB {
// DB is empty, let's fill it from a checkpoint
// DB is empty, let's fill it from the History Archive, using the latest available checkpoint
ls.logger.Infof("Found an empty database, filling it in from the most recent checkpoint (this can take up to 30 minutes, depending on the network)")
startCheckpointLedger, err = ls.fillEntriesFromLatestCheckpoint(ctx, archive)
root, err := archive.GetRootHAS()
if err != nil {
panic(err)
}
}
if err != nil {
startLedger = root.CurrentLedger
// Do it in parallel with the upcoming captive core preparation to save time
checkPointPrefillWg.Add(1)
go func() {
defer checkPointPrefillWg.Done()
if err = ls.fillEntriesFromCheckpoint(ctx, archive, startLedger); err != nil {
panic(err)
}
}()
} else if err != nil {
panic(err)
}

// Secondly, continuously process txmeta deltas

// TODO: we can probably do the preparation in parallel with the checkpoint processing above
prepareRangeCtx, cancelPrepareRange := context.WithTimeout(ctx, ls.timeout)
if err := ledgerBackend.PrepareRange(prepareRangeCtx, backends.UnboundedRange(startCheckpointLedger)); err != nil {
if err := ledgerBackend.PrepareRange(prepareRangeCtx, backends.UnboundedRange(startLedger)); err != nil {
panic(err)
}
cancelPrepareRange()

nextLedger := startCheckpointLedger + 1
// Make sure that the checkpoint prefill (if any), happened before starting to apply deltas
checkPointPrefillWg.Wait()

nextLedger := startLedger + 1
for {
ls.logger.Info("Processing txmeta of ledger", nextLedger)
ls.logger.Infof("Applying txmeta ledger entries changes for ledger %d", nextLedger)
reader, err := ingest.NewLedgerChangeReader(ctx, ledgerBackend, ls.networkPassPhrase, nextLedger)
if err != nil {
panic(err)
Expand Down

0 comments on commit f4ba5a3

Please sign in to comment.