Skip to content

Commit

Permalink
#4538: obtain trusted hash for captive core catchup command
Browse files Browse the repository at this point in the history
  • Loading branch information
sreuland committed Aug 19, 2024
1 parent 9b925b1 commit 61a5f80
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 34 deletions.
52 changes: 48 additions & 4 deletions ingest/ledgerbackend/captive_core_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,49 @@ func (c *CaptiveStellarCore) getLatestCheckpointSequence() (uint32, error) {
return has.CurrentLedger, nil
}

// Obtain hash for a ledger sequence from a live connection with network.
// Will NOT use history archives for hash retrieval.
// Returns the requested ledger sequence or the closest one to it that could be replayed
// and its associated trusted hash.
func (c *CaptiveStellarCore) getTrustedHashForLedger(sequence uint32) (uint32, string, error) {
// clone the current captive config but with unique context and hash store refs.
// create ad-hoc captive core instance to run unbounded range with requested sequence as start,
// this will trigger captive core online replay mode, obtaining the tx-meta and hash for the requested sequence
// first, direct from network which is considered trusted, after which the captive core instance is closed.
captiveConfigTrustedHash := c.config
captiveConfigTrustedHash.LedgerHashStore = nil
captiveConfigTrustedHash.Context = context.Background()
captiveTrustedHash, err := NewCaptive(captiveConfigTrustedHash)

if sequence <= 2 {
// The line below is to support minimum edge case for streaming ledgers from core in run 'from'
// earliest ledger it will emit on pipe will be 3.
sequence = 3
}

if err != nil {
return 0, "", errors.Wrapf(err, "error creating captive to get hash from network for Ledger %v", sequence)
}

defer func() {
if closeErr := captiveTrustedHash.Close(); closeErr != nil {
captiveTrustedHash.config.Log.Error("error when closing captive core for network hash", closeErr)
}
}()

err = captiveTrustedHash.PrepareRange(captiveTrustedHash.config.Context, UnboundedRange(sequence))
if err != nil {
return 0, "", errors.Wrapf(err, "error preparing to get network hash for Ledger %v", sequence)
}

networkLCM, err := captiveTrustedHash.GetLedger(captiveTrustedHash.config.Context, sequence)
if err != nil {
return 0, "", errors.Wrapf(err, "error getting network hash for Ledger %v", sequence)
}

return sequence, networkLCM.LedgerHash().HexString(), nil
}

func (c *CaptiveStellarCore) openOfflineReplaySubprocess(from, to uint32) error {
latestCheckpointSequence, err := c.getLatestCheckpointSequence()
if err != nil {
Expand All @@ -349,8 +392,12 @@ func (c *CaptiveStellarCore) openOfflineReplaySubprocess(from, to uint32) error
)
}

toLedger, toLedgerHash, err := c.getTrustedHashForLedger(to)
if err != nil {
return err
}
stellarCoreRunner := c.stellarCoreRunnerFactory()
if err = stellarCoreRunner.catchup(from, to); err != nil {
if err = stellarCoreRunner.catchup(from, toLedger, toLedgerHash); err != nil {
return errors.Wrap(err, "error running stellar-core")
}
c.stellarCoreRunner = stellarCoreRunner
Expand Down Expand Up @@ -517,9 +564,6 @@ func (c *CaptiveStellarCore) startPreparingRange(ctx context.Context, ledgerRang
// - For BoundedRange it will start Stellar-Core in catchup mode.
// - For UnboundedRange it will first catchup to starting ledger and then run
// it normally (including connecting to the Stellar network).
//
// Please note that using a BoundedRange, currently, requires a full-trust on
// history archive. This issue is being fixed in Stellar-Core.
func (c *CaptiveStellarCore) PrepareRange(ctx context.Context, ledgerRange Range) error {
if alreadyPrepared, err := c.startPreparingRange(ctx, ledgerRange); err != nil {
return errors.Wrap(err, "error starting prepare range")
Expand Down
4 changes: 2 additions & 2 deletions ingest/ledgerbackend/captive_core_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ func (m *stellarCoreRunnerMock) context() context.Context {
return a.Get(0).(context.Context)
}

func (m *stellarCoreRunnerMock) catchup(from, to uint32) error {
a := m.Called(from, to)
func (m *stellarCoreRunnerMock) catchup(from, to uint32, toLedgerHash string) error {
a := m.Called(from, to, toLedgerHash)
return a.Error(0)
}

Expand Down
6 changes: 5 additions & 1 deletion ingest/ledgerbackend/catchup.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@ type catchupStream struct {
dir workingDir
from uint32
to uint32
toLedgerHash string
coreCmdFactory coreCmdFactory
log *log.Entry
useDB bool
}

func newCatchupStream(r *stellarCoreRunner, from, to uint32) catchupStream {
func newCatchupStream(r *stellarCoreRunner, from, to uint32, toLedgerHash string) catchupStream {
// We want to use ephemeral directories in running the catchup command
// (used for the reingestion use case) because it's possible to run parallel
// reingestion where multiple captive cores are active on the same machine.
Expand All @@ -27,6 +28,7 @@ func newCatchupStream(r *stellarCoreRunner, from, to uint32) catchupStream {
dir: dir,
from: from,
to: to,
toLedgerHash: toLedgerHash,
coreCmdFactory: newCoreCmdFactory(r, dir),
log: r.log,
useDB: r.useDB,
Expand Down Expand Up @@ -58,6 +60,8 @@ func (s catchupStream) start(ctx context.Context) (cmdI, pipe, error) {
if err = cmd.Run(); err != nil {
return nil, pipe{}, fmt.Errorf("error initializing core db: %w", err)
}

params = append(params, "--trusted-hash", s.toLedgerHash)
} else {
params = append(params, "--in-memory")
}
Expand Down
6 changes: 3 additions & 3 deletions ingest/ledgerbackend/stellar_core_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

type stellarCoreRunnerInterface interface {
catchup(from, to uint32) error
catchup(from, to uint32, toLedgerHash string) error
runFrom(from uint32, hash string) error
getMetaPipe() (<-chan metaResult, bool)
context() context.Context
Expand Down Expand Up @@ -108,8 +108,8 @@ func (r *stellarCoreRunner) runFrom(from uint32, hash string) error {
}

// catchup executes the catchup command on the captive core subprocess
func (r *stellarCoreRunner) catchup(from, to uint32) error {
return r.startMetaStream(newCatchupStream(r, from, to))
func (r *stellarCoreRunner) catchup(from, to uint32, toLedgerHash string) error {
return r.startMetaStream(newCatchupStream(r, from, to, toLedgerHash))
}

type metaStream interface {
Expand Down
4 changes: 2 additions & 2 deletions ingest/ledgerbackend/stellar_core_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestCloseOffline(t *testing.T) {
scMock.On("removeAll", mock.Anything).Return(nil).Once()
runner.systemCaller = scMock

assert.NoError(t, runner.catchup(100, 200))
assert.NoError(t, runner.catchup(100, 200, ""))
assert.NoError(t, runner.close())
}

Expand Down Expand Up @@ -192,7 +192,7 @@ func TestCloseConcurrency(t *testing.T) {
scMock.On("removeAll", mock.Anything).Return(nil).Once()
runner.systemCaller = scMock

assert.NoError(t, runner.catchup(100, 200))
assert.NoError(t, runner.catchup(100, 200, ""))

var wg sync.WaitGroup
for i := 0; i < 10; i++ {
Expand Down

This file was deleted.

10 changes: 0 additions & 10 deletions services/horizon/internal/integration/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,11 +537,6 @@ func TestReingestDB(t *testing.T) {
// subprocesses to conflict.
itest.StopHorizon()

horizonConfig.CaptiveCoreConfigPath = filepath.Join(
filepath.Dir(horizonConfig.CaptiveCoreConfigPath),
"captive-core-reingest-range-integration-tests.cfg",
)

var rootCmd = horizoncmd.NewRootCmd()
rootCmd.SetArgs(command(t, horizonConfig, "db",
"reingest",
Expand Down Expand Up @@ -925,11 +920,6 @@ func TestFillGaps(t *testing.T) {
_, err = historyQ.DeleteRangeAll(context.Background(), oldestLedger, latestLedger)
tt.NoError(err)

horizonConfig.CaptiveCoreConfigPath = filepath.Join(
filepath.Dir(horizonConfig.CaptiveCoreConfigPath),
"captive-core-reingest-range-integration-tests.cfg",
)

rootCmd := horizoncmd.NewRootCmd()
rootCmd.SetArgs(command(t, horizonConfig, "db", "fill-gaps", "--parallel-workers=1"))
tt.NoError(rootCmd.Execute())
Expand Down

0 comments on commit 61a5f80

Please sign in to comment.