Skip to content

Commit

Permalink
ingest: Verify the bucket list hash in CheckpointChangeReader (stella…
Browse files Browse the repository at this point in the history
  • Loading branch information
tamirms authored Apr 18, 2024
1 parent c63ad05 commit 12bbc68
Show file tree
Hide file tree
Showing 13 changed files with 163 additions and 91 deletions.
25 changes: 25 additions & 0 deletions ingest/checkpoint_change_reader.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package ingest

import (
"bytes"
"context"
"fmt"
"io"
"sync"
"time"
Expand Down Expand Up @@ -97,6 +99,29 @@ func NewCheckpointChangeReader(
}, nil
}

// VerifyBucketList verifies that the bucket list hash computed from the history archive snapshot
// associated with the CheckpointChangeReader matches the expectedHash.
// Assuming expectedHash comes from a trusted source (captive-core running in unbounded mode), this
// check will give you full security that the data returned by the CheckpointChangeReader can be trusted.
// Note that XdrStream will verify all the ledger entries from an individual bucket and
// VerifyBucketList() verifies the entire list of bucket hashes.
func (r *CheckpointChangeReader) VerifyBucketList(expectedHash xdr.Hash) error {
historyBucketListHash, err := r.has.BucketListHash()
if err != nil {
return errors.Wrap(err, "Error getting bucket list hash")
}

if !bytes.Equal(historyBucketListHash[:], expectedHash[:]) {
return fmt.Errorf(
"bucket list hash of history archive does not match expected hash: %#x %#x",
historyBucketListHash,
expectedHash,
)
}

return nil
}

func (r *CheckpointChangeReader) bucketExists(hash historyarchive.Hash) (bool, error) {
return r.archive.BucketExists(hash)
}
Expand Down
7 changes: 7 additions & 0 deletions ingest/mock_change_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package ingest

import (
"github.com/stretchr/testify/mock"

"github.com/stellar/go/xdr"
)

var _ ChangeReader = (*MockChangeReader)(nil)
Expand All @@ -19,3 +21,8 @@ func (m *MockChangeReader) Close() error {
args := m.Called()
return args.Error(0)
}

func (m *MockChangeReader) VerifyBucketList(expectedHash xdr.Hash) error {
args := m.Called(expectedHash)
return args.Error(0)
}
53 changes: 37 additions & 16 deletions services/horizon/internal/ingest/db_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,29 @@
package ingest

import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"path/filepath"
"testing"

"github.com/stretchr/testify/suite"

"github.com/stellar/go/ingest"
"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/services/horizon/internal/test"
"github.com/stellar/go/xdr"
"github.com/stretchr/testify/suite"
)

type memoryChangeReader xdr.LedgerEntryChanges
type memoryChangeReader struct {
changes xdr.LedgerEntryChanges
bucketListHash xdr.Hash
verified bool
}

func loadChanges(path string) (*memoryChangeReader, error) {
func loadChanges(bucketListHash xdr.Hash, path string) (*memoryChangeReader, error) {
contents, err := ioutil.ReadFile(path)
if err != nil {
return nil, err
Expand All @@ -29,25 +36,38 @@ func loadChanges(path string) (*memoryChangeReader, error) {
return nil, err
}

reader := memoryChangeReader(entryChanges)
return &reader, nil
return &memoryChangeReader{
changes: entryChanges,
bucketListHash: bucketListHash,
}, nil
}

func (r *memoryChangeReader) Read() (ingest.Change, error) {
entryChanges := *r
if len(entryChanges) == 0 {
if len(r.changes) == 0 {
return ingest.Change{}, io.EOF
}

change := entryChanges[0]
*r = entryChanges[1:]
change := r.changes[0]
r.changes = r.changes[1:]
return ingest.Change{
Type: change.State.Data.Type,
Post: change.State,
Pre: nil,
}, nil
}

func (r *memoryChangeReader) VerifyBucketList(expectedHash xdr.Hash) error {
if !bytes.Equal(r.bucketListHash[:], expectedHash[:]) {
return fmt.Errorf(
"bucket list hash of history archive does not match expected hash: %#x %#x",
r.bucketListHash,
expectedHash,
)
}
r.verified = true
return nil
}

func (r *memoryChangeReader) Close() error {
return nil
}
Expand All @@ -61,6 +81,7 @@ type DBTestSuite struct {
ctx context.Context
sampleFile string
sequence uint32
checkpointHash xdr.Hash
ledgerBackend *ledgerbackend.MockDatabaseBackend
historyAdapter *mockHistoryArchiveAdapter
system *system
Expand All @@ -76,7 +97,7 @@ func (s *DBTestSuite) SetupTest() {
// go test -v -timeout 5m --tags=update github.com/stellar/go/services/horizon/internal/ingest -run "^(TestUpdateSampleChanges)$"
// and commit the new file to the git repo.
s.sampleFile = filepath.Join("testdata", "sample-changes.xdr")

s.checkpointHash = xdr.Hash{1, 2, 3}
s.ledgerBackend = &ledgerbackend.MockDatabaseBackend{}
s.historyAdapter = &mockHistoryArchiveAdapter{}
var err error
Expand All @@ -99,18 +120,18 @@ func (s *DBTestSuite) SetupTest() {
}

func (s *DBTestSuite) mockChangeReader() {
changeReader, err := loadChanges(s.sampleFile)
changeReader, err := loadChanges(s.checkpointHash, s.sampleFile)
s.Assert().NoError(err)
s.T().Cleanup(func() {
s.tt.Assert.True(changeReader.verified)
})
s.historyAdapter.On("GetState", s.ctx, s.sequence).
Return(ingest.ChangeReader(changeReader), nil).Once()
}
func (s *DBTestSuite) setupMocksForBuildState() {
checkpointHash := xdr.Hash{1, 2, 3}
s.historyAdapter.On("GetLatestLedgerSequence").
Return(s.sequence, nil).Once()
s.mockChangeReader()
s.historyAdapter.On("BucketListHash", s.sequence).
Return(checkpointHash, nil).Once()

s.ledgerBackend.On("IsPrepared", s.ctx, ledgerbackend.UnboundedRange(s.sequence)).Return(true, nil).Once()
s.ledgerBackend.On("GetLedger", s.ctx, s.sequence).
Expand All @@ -120,7 +141,7 @@ func (s *DBTestSuite) setupMocksForBuildState() {
LedgerHeader: xdr.LedgerHeaderHistoryEntry{
Header: xdr.LedgerHeader{
LedgerSeq: xdr.Uint32(s.sequence),
BucketListHash: checkpointHash,
BucketListHash: s.checkpointHash,
},
},
},
Expand Down Expand Up @@ -148,7 +169,7 @@ func (s *DBTestSuite) TestBuildState() {
s.Assert().Equal(s.sequence, resume.latestSuccessfullyProcessedLedger)

s.mockChangeReader()
s.Assert().NoError(s.system.verifyState(false))
s.Assert().NoError(s.system.verifyState(false, s.sequence, s.checkpointHash))
}

func (s *DBTestSuite) TestVersionMismatchTriggersRebuild() {
Expand Down
9 changes: 6 additions & 3 deletions services/horizon/internal/ingest/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ func (r resumeState) run(s *system) (transition, error) {

localLog.Info("Processed ledger")

s.maybeVerifyState(ingestLedger)
s.maybeVerifyState(ingestLedger, ledgerCloseMeta.BucketListHash())
s.maybeReapLookupTables(ingestLedger)

return resumeImmediately(ingestLedger), nil
Expand Down Expand Up @@ -745,7 +745,6 @@ func (v verifyRangeState) run(s *system) (transition, error) {
return stop(), err
}

var ledgerCloseMeta xdr.LedgerCloseMeta
ledgerCloseMeta, err = s.ledgerBackend.GetLedger(s.ctx, sequence)
if err != nil {
return stop(), errors.Wrap(err, "error getting ledger")
Expand Down Expand Up @@ -782,7 +781,11 @@ func (v verifyRangeState) run(s *system) (transition, error) {
}

if v.verifyState {
err = s.verifyState(false)
err = s.verifyState(
false,
ledgerCloseMeta.LedgerSequence(),
ledgerCloseMeta.BucketListHash(),
)
}

return stop(), err
Expand Down
29 changes: 7 additions & 22 deletions services/horizon/internal/ingest/history_archive_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,14 @@ type historyArchiveAdapter struct {
archive historyarchive.ArchiveInterface
}

type verifiableChangeReader interface {
ingest.ChangeReader
VerifyBucketList(expectedHash xdr.Hash) error
}

type historyArchiveAdapterInterface interface {
GetLatestLedgerSequence() (uint32, error)
BucketListHash(sequence uint32) (xdr.Hash, error)
GetState(ctx context.Context, sequence uint32) (ingest.ChangeReader, error)
GetState(ctx context.Context, sequence uint32) (verifiableChangeReader, error)
GetStats() []historyarchive.ArchiveStats
}

Expand All @@ -36,27 +40,8 @@ func (haa *historyArchiveAdapter) GetLatestLedgerSequence() (uint32, error) {
return has.CurrentLedger, nil
}

// BucketListHash returns the bucket list hash to compare with hash in the
// ledger header fetched from Stellar-Core.
func (haa *historyArchiveAdapter) BucketListHash(sequence uint32) (xdr.Hash, error) {
exists, err := haa.archive.CategoryCheckpointExists("history", sequence)
if err != nil {
return xdr.Hash{}, errors.Wrap(err, "error checking if category checkpoint exists")
}
if !exists {
return xdr.Hash{}, errors.Errorf("history checkpoint does not exist for ledger %d", sequence)
}

has, err := haa.archive.GetCheckpointHAS(sequence)
if err != nil {
return xdr.Hash{}, errors.Wrapf(err, "unable to get checkpoint HAS at ledger sequence %d", sequence)
}

return has.BucketListHash()
}

// GetState returns a reader with the state of the ledger at the provided sequence number.
func (haa *historyArchiveAdapter) GetState(ctx context.Context, sequence uint32) (ingest.ChangeReader, error) {
func (haa *historyArchiveAdapter) GetState(ctx context.Context, sequence uint32) (verifiableChangeReader, error) {
exists, err := haa.archive.CategoryCheckpointExists("history", sequence)
if err != nil {
return nil, errors.Wrap(err, "error checking if category checkpoint exists")
Expand Down
10 changes: 5 additions & 5 deletions services/horizon/internal/ingest/history_archive_adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ import (
stdio "io"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

"github.com/stellar/go/historyarchive"
"github.com/stellar/go/ingest"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/xdr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)

type mockHistoryArchiveAdapter struct {
Expand All @@ -28,9 +28,9 @@ func (m *mockHistoryArchiveAdapter) BucketListHash(sequence uint32) (xdr.Hash, e
return args.Get(0).(xdr.Hash), args.Error(1)
}

func (m *mockHistoryArchiveAdapter) GetState(ctx context.Context, sequence uint32) (ingest.ChangeReader, error) {
func (m *mockHistoryArchiveAdapter) GetState(ctx context.Context, sequence uint32) (verifiableChangeReader, error) {
args := m.Called(ctx, sequence)
return args.Get(0).(ingest.ChangeReader), args.Error(1)
return args.Get(0).(verifiableChangeReader), args.Error(1)
}

func (m *mockHistoryArchiveAdapter) GetStats() []historyarchive.ArchiveStats {
Expand Down
5 changes: 3 additions & 2 deletions services/horizon/internal/ingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/stellar/go/support/errors"
logpkg "github.com/stellar/go/support/log"
"github.com/stellar/go/support/storage"
"github.com/stellar/go/xdr"
)

const (
Expand Down Expand Up @@ -669,7 +670,7 @@ func (s *system) runStateMachine(cur stateMachineNode) error {
}
}

func (s *system) maybeVerifyState(lastIngestedLedger uint32) {
func (s *system) maybeVerifyState(lastIngestedLedger uint32, expectedBucketListHash xdr.Hash) {
stateInvalid, err := s.historyQ.GetExpStateInvalid(s.ctx)
if err != nil {
if !isCancelledError(s.ctx, err) {
Expand All @@ -686,7 +687,7 @@ func (s *system) maybeVerifyState(lastIngestedLedger uint32) {
go func() {
defer s.wg.Done()

err := s.verifyState(true)
err := s.verifyState(true, lastIngestedLedger, expectedBucketListHash)
if err != nil {
if isCancelledError(s.ctx, err) {
return
Expand Down
10 changes: 5 additions & 5 deletions services/horizon/internal/ingest/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,19 +188,19 @@ func TestMaybeVerifyStateGetExpStateInvalidError(t *testing.T) {
defer func() { log = oldLogger }()

historyQ.On("GetExpStateInvalid", system.ctx).Return(false, db.ErrCancelled).Once()
system.maybeVerifyState(63)
system.maybeVerifyState(63, xdr.Hash{})
system.wg.Wait()

historyQ.On("GetExpStateInvalid", system.ctx).Return(false, context.Canceled).Once()
system.maybeVerifyState(63)
system.maybeVerifyState(63, xdr.Hash{})
system.wg.Wait()

logged := done()
assert.Len(t, logged, 0)

// Ensure state verifier does not start also for any other error
historyQ.On("GetExpStateInvalid", system.ctx).Return(false, errors.New("my error")).Once()
system.maybeVerifyState(63)
system.maybeVerifyState(63, xdr.Hash{})
system.wg.Wait()

historyQ.AssertExpectations(t)
Expand All @@ -227,11 +227,11 @@ func TestMaybeVerifyInternalDBErrCancelOrContextCanceled(t *testing.T) {
historyQ.On("CloneIngestionQ").Return(historyQ).Twice()

historyQ.On("BeginTx", mock.Anything, mock.Anything).Return(db.ErrCancelled).Once()
system.maybeVerifyState(63)
system.maybeVerifyState(63, xdr.Hash{})
system.wg.Wait()

historyQ.On("BeginTx", mock.Anything, mock.Anything).Return(context.Canceled).Once()
system.maybeVerifyState(63)
system.maybeVerifyState(63, xdr.Hash{})
system.wg.Wait()

logged := done()
Expand Down
Loading

0 comments on commit 12bbc68

Please sign in to comment.