-
Notifications
You must be signed in to change notification settings - Fork 502
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ingest/ledgerbackend: add trusted hash to captive core catchup #5431
Changes from all commits
61a5f80
1a50ed7
3b706e4
2937c18
116f3b1
8da847f
f91fa55
d049e73
805950a
3b335b7
c1295eb
be4df19
e56e346
2270bba
7315813
d236308
32bb6b9
0ee83dc
659d0e0
6e61306
3d23723
d58b15a
41eeb8f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,16 +28,6 @@ var _ LedgerBackend = (*CaptiveStellarCore)(nil) | |
// ErrCannotStartFromGenesis is returned when attempting to prepare a range from ledger 1 | ||
var ErrCannotStartFromGenesis = errors.New("CaptiveCore is unable to start from ledger 1, start from ledger 2") | ||
|
||
func (c *CaptiveStellarCore) roundDownToFirstReplayAfterCheckpointStart(ledger uint32) uint32 { | ||
r := c.checkpointManager.GetCheckpointRange(ledger) | ||
if r.Low <= 1 { | ||
// Stellar-Core doesn't stream ledger 1 | ||
return 2 | ||
} | ||
// All other checkpoints start at the next multiple of 64 | ||
return r.Low | ||
} | ||
|
||
// CaptiveStellarCore is a ledger backend that starts internal Stellar-Core | ||
// subprocess responsible for streaming ledger data. It provides better decoupling | ||
// than DatabaseBackend but requires some extra init time. | ||
|
@@ -163,7 +153,7 @@ type CaptiveCoreConfig struct { | |
} | ||
|
||
// NewCaptive returns a new CaptiveStellarCore instance. | ||
func NewCaptive(config CaptiveCoreConfig) (*CaptiveStellarCore, error) { | ||
func NewCaptive(config CaptiveCoreConfig) (LedgerBackend, error) { | ||
// Here we set defaults in the config. Because config is not a pointer this code should | ||
// not mutate the original CaptiveCoreConfig instance which was passed into NewCaptive() | ||
|
||
|
@@ -327,56 +317,18 @@ func (c *CaptiveStellarCore) getLatestCheckpointSequence() (uint32, error) { | |
return has.CurrentLedger, nil | ||
} | ||
|
||
func (c *CaptiveStellarCore) openOfflineReplaySubprocess(from, to uint32) error { | ||
latestCheckpointSequence, err := c.getLatestCheckpointSequence() | ||
if err != nil { | ||
return errors.Wrap(err, "error getting latest checkpoint sequence") | ||
} | ||
|
||
if from > latestCheckpointSequence { | ||
return errors.Errorf( | ||
"from sequence: %d is greater than max available in history archives: %d", | ||
from, | ||
latestCheckpointSequence, | ||
) | ||
} | ||
|
||
if to > latestCheckpointSequence { | ||
return errors.Errorf( | ||
"to sequence: %d is greater than max available in history archives: %d", | ||
to, | ||
latestCheckpointSequence, | ||
) | ||
} | ||
|
||
stellarCoreRunner := c.stellarCoreRunnerFactory() | ||
if err = stellarCoreRunner.catchup(from, to); err != nil { | ||
return errors.Wrap(err, "error running stellar-core") | ||
} | ||
c.stellarCoreRunner = stellarCoreRunner | ||
|
||
// The next ledger should be the first ledger of the checkpoint containing | ||
// the requested ledger | ||
ran := BoundedRange(from, to) | ||
c.ledgerSequenceLock.Lock() | ||
defer c.ledgerSequenceLock.Unlock() | ||
|
||
c.prepared = &ran | ||
c.nextLedger = c.roundDownToFirstReplayAfterCheckpointStart(from) | ||
c.lastLedger = &to | ||
c.previousLedgerHash = nil | ||
|
||
return nil | ||
} | ||
|
||
func (c *CaptiveStellarCore) openOnlineReplaySubprocess(ctx context.Context, from uint32) error { | ||
runFrom, ledgerHash, err := c.runFromParams(ctx, from) | ||
func (c *CaptiveStellarCore) openOnlineReplaySubprocess(ctx context.Context, ledgerRange Range) error { | ||
runFrom, ledgerHash, err := c.runFromParams(ctx, ledgerRange.from) | ||
if err != nil { | ||
return errors.Wrap(err, "error calculating ledger and hash for stellar-core run") | ||
} | ||
|
||
stellarCoreRunner := c.stellarCoreRunnerFactory() | ||
if err = stellarCoreRunner.runFrom(runFrom, ledgerHash); err != nil { | ||
runnerMode := stellarCoreRunnerModeActive | ||
if ledgerRange.bounded { | ||
runnerMode = stellarCoreRunnerModePassive | ||
} | ||
if err = stellarCoreRunner.runFrom(runFrom, ledgerHash, runnerMode); err != nil { | ||
return errors.Wrap(err, "error running stellar-core") | ||
} | ||
c.stellarCoreRunner = stellarCoreRunner | ||
|
@@ -388,9 +340,15 @@ func (c *CaptiveStellarCore) openOnlineReplaySubprocess(ctx context.Context, fro | |
defer c.ledgerSequenceLock.Unlock() | ||
|
||
c.nextLedger = 0 | ||
ran := UnboundedRange(from) | ||
ran := ledgerRange | ||
var last *uint32 | ||
if ledgerRange.bounded { | ||
boundedTo := ledgerRange.to | ||
last = &boundedTo | ||
} | ||
|
||
c.lastLedger = last | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is the key to being able to (re)use live |
||
c.prepared = &ran | ||
c.lastLedger = nil | ||
c.previousLedgerHash = nil | ||
|
||
return nil | ||
|
@@ -497,12 +455,8 @@ func (c *CaptiveStellarCore) startPreparingRange(ctx context.Context, ledgerRang | |
} | ||
} | ||
|
||
var err error | ||
if ledgerRange.bounded { | ||
err = c.openOfflineReplaySubprocess(ledgerRange.from, ledgerRange.to) | ||
} else { | ||
err = c.openOnlineReplaySubprocess(ctx, ledgerRange.from) | ||
} | ||
err := c.openOnlineReplaySubprocess(ctx, ledgerRange) | ||
|
||
if err != nil { | ||
return false, errors.Wrap(err, "opening subprocess") | ||
} | ||
|
@@ -513,13 +467,9 @@ func (c *CaptiveStellarCore) startPreparingRange(ctx context.Context, ledgerRang | |
// PrepareRange prepares the given range (including from and to) to be loaded. | ||
// Captive stellar-core backend needs to initialize Stellar-Core state to be | ||
// able to stream ledgers. | ||
// Stellar-Core mode depends on the provided ledgerRange: | ||
// - For BoundedRange it will start Stellar-Core in catchup mode. | ||
// - For UnboundedRange it will first catchup to starting ledger and then run | ||
// it normally (including connecting to the Stellar network). | ||
// | ||
// Please note that using a BoundedRange, currently, requires a full-trust on | ||
// history archive. This issue is being fixed in Stellar-Core. | ||
// ctx - caller context | ||
// ledgerRange - specify the range info | ||
func (c *CaptiveStellarCore) PrepareRange(ctx context.Context, ledgerRange Range) error { | ||
if alreadyPrepared, err := c.startPreparingRange(ctx, ledgerRange); err != nil { | ||
return errors.Wrap(err, "error starting prepare range") | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
using latest checkpoint on testnet for verify-range, tried similar on pubnet, takes 45+ minutes, mostly on change processor ledger entry processing.