From 12bbc681b7dd6d54a47658c5a62c97dfd05a0a9c Mon Sep 17 00:00:00 2001 From: tamirms Date: Thu, 18 Apr 2024 10:51:27 +0100 Subject: [PATCH] ingest: Verify the bucket list hash in CheckpointChangeReader (#5280) --- ingest/checkpoint_change_reader.go | 25 +++++++++ ingest/mock_change_reader.go | 7 +++ .../internal/ingest/db_integration_test.go | 53 +++++++++++++------ services/horizon/internal/ingest/fsm.go | 9 ++-- .../ingest/history_archive_adapter.go | 29 +++------- .../ingest/history_archive_adapter_test.go | 10 ++-- services/horizon/internal/ingest/main.go | 5 +- services/horizon/internal/ingest/main_test.go | 10 ++-- .../internal/ingest/processor_runner.go | 35 +++--------- .../internal/ingest/processor_runner_test.go | 5 +- services/horizon/internal/ingest/verify.go | 11 +++- .../ingest/verify_range_state_test.go | 12 +++-- .../horizon/internal/ingest/verify_test.go | 43 ++++++++++++++- 13 files changed, 163 insertions(+), 91 deletions(-) diff --git a/ingest/checkpoint_change_reader.go b/ingest/checkpoint_change_reader.go index 1dbbed2a9e..e84e7631cf 100644 --- a/ingest/checkpoint_change_reader.go +++ b/ingest/checkpoint_change_reader.go @@ -1,7 +1,9 @@ package ingest import ( + "bytes" "context" + "fmt" "io" "sync" "time" @@ -97,6 +99,29 @@ func NewCheckpointChangeReader( }, nil } +// VerifyBucketList verifies that the bucket list hash computed from the history archive snapshot +// associated with the CheckpointChangeReader matches the expectedHash. +// Assuming expectedHash comes from a trusted source (captive-core running in unbounded mode), this +// check will give you full security that the data returned by the CheckpointChangeReader can be trusted. +// Note that XdrStream will verify all the ledger entries from an individual bucket and +// VerifyBucketList() verifies the entire list of bucket hashes. +func (r *CheckpointChangeReader) VerifyBucketList(expectedHash xdr.Hash) error { + historyBucketListHash, err := r.has.BucketListHash() + if err != nil { + return errors.Wrap(err, "Error getting bucket list hash") + } + + if !bytes.Equal(historyBucketListHash[:], expectedHash[:]) { + return fmt.Errorf( + "bucket list hash of history archive does not match expected hash: %#x %#x", + historyBucketListHash, + expectedHash, + ) + } + + return nil +} + func (r *CheckpointChangeReader) bucketExists(hash historyarchive.Hash) (bool, error) { return r.archive.BucketExists(hash) } diff --git a/ingest/mock_change_reader.go b/ingest/mock_change_reader.go index c70d78d397..8616b86a04 100644 --- a/ingest/mock_change_reader.go +++ b/ingest/mock_change_reader.go @@ -2,6 +2,8 @@ package ingest import ( "github.com/stretchr/testify/mock" + + "github.com/stellar/go/xdr" ) var _ ChangeReader = (*MockChangeReader)(nil) @@ -19,3 +21,8 @@ func (m *MockChangeReader) Close() error { args := m.Called() return args.Error(0) } + +func (m *MockChangeReader) VerifyBucketList(expectedHash xdr.Hash) error { + args := m.Called(expectedHash) + return args.Error(0) +} diff --git a/services/horizon/internal/ingest/db_integration_test.go b/services/horizon/internal/ingest/db_integration_test.go index 60a45f158e..606cd9fb2b 100644 --- a/services/horizon/internal/ingest/db_integration_test.go +++ b/services/horizon/internal/ingest/db_integration_test.go @@ -3,22 +3,29 @@ package ingest import ( + "bytes" "context" + "fmt" "io" "io/ioutil" "path/filepath" "testing" + "github.com/stretchr/testify/suite" + "github.com/stellar/go/ingest" "github.com/stellar/go/ingest/ledgerbackend" "github.com/stellar/go/services/horizon/internal/test" "github.com/stellar/go/xdr" - "github.com/stretchr/testify/suite" ) -type memoryChangeReader xdr.LedgerEntryChanges +type memoryChangeReader struct { + changes xdr.LedgerEntryChanges + bucketListHash xdr.Hash + verified bool +} -func loadChanges(path string) (*memoryChangeReader, error) { +func loadChanges(bucketListHash xdr.Hash, path string) (*memoryChangeReader, error) { contents, err := ioutil.ReadFile(path) if err != nil { return nil, err @@ -29,18 +36,19 @@ func loadChanges(path string) (*memoryChangeReader, error) { return nil, err } - reader := memoryChangeReader(entryChanges) - return &reader, nil + return &memoryChangeReader{ + changes: entryChanges, + bucketListHash: bucketListHash, + }, nil } func (r *memoryChangeReader) Read() (ingest.Change, error) { - entryChanges := *r - if len(entryChanges) == 0 { + if len(r.changes) == 0 { return ingest.Change{}, io.EOF } - change := entryChanges[0] - *r = entryChanges[1:] + change := r.changes[0] + r.changes = r.changes[1:] return ingest.Change{ Type: change.State.Data.Type, Post: change.State, @@ -48,6 +56,18 @@ func (r *memoryChangeReader) Read() (ingest.Change, error) { }, nil } +func (r *memoryChangeReader) VerifyBucketList(expectedHash xdr.Hash) error { + if !bytes.Equal(r.bucketListHash[:], expectedHash[:]) { + return fmt.Errorf( + "bucket list hash of history archive does not match expected hash: %#x %#x", + r.bucketListHash, + expectedHash, + ) + } + r.verified = true + return nil +} + func (r *memoryChangeReader) Close() error { return nil } @@ -61,6 +81,7 @@ type DBTestSuite struct { ctx context.Context sampleFile string sequence uint32 + checkpointHash xdr.Hash ledgerBackend *ledgerbackend.MockDatabaseBackend historyAdapter *mockHistoryArchiveAdapter system *system @@ -76,7 +97,7 @@ func (s *DBTestSuite) SetupTest() { // go test -v -timeout 5m --tags=update github.com/stellar/go/services/horizon/internal/ingest -run "^(TestUpdateSampleChanges)$" // and commit the new file to the git repo. s.sampleFile = filepath.Join("testdata", "sample-changes.xdr") - + s.checkpointHash = xdr.Hash{1, 2, 3} s.ledgerBackend = &ledgerbackend.MockDatabaseBackend{} s.historyAdapter = &mockHistoryArchiveAdapter{} var err error @@ -99,18 +120,18 @@ func (s *DBTestSuite) SetupTest() { } func (s *DBTestSuite) mockChangeReader() { - changeReader, err := loadChanges(s.sampleFile) + changeReader, err := loadChanges(s.checkpointHash, s.sampleFile) s.Assert().NoError(err) + s.T().Cleanup(func() { + s.tt.Assert.True(changeReader.verified) + }) s.historyAdapter.On("GetState", s.ctx, s.sequence). Return(ingest.ChangeReader(changeReader), nil).Once() } func (s *DBTestSuite) setupMocksForBuildState() { - checkpointHash := xdr.Hash{1, 2, 3} s.historyAdapter.On("GetLatestLedgerSequence"). Return(s.sequence, nil).Once() s.mockChangeReader() - s.historyAdapter.On("BucketListHash", s.sequence). - Return(checkpointHash, nil).Once() s.ledgerBackend.On("IsPrepared", s.ctx, ledgerbackend.UnboundedRange(s.sequence)).Return(true, nil).Once() s.ledgerBackend.On("GetLedger", s.ctx, s.sequence). @@ -120,7 +141,7 @@ func (s *DBTestSuite) setupMocksForBuildState() { LedgerHeader: xdr.LedgerHeaderHistoryEntry{ Header: xdr.LedgerHeader{ LedgerSeq: xdr.Uint32(s.sequence), - BucketListHash: checkpointHash, + BucketListHash: s.checkpointHash, }, }, }, @@ -148,7 +169,7 @@ func (s *DBTestSuite) TestBuildState() { s.Assert().Equal(s.sequence, resume.latestSuccessfullyProcessedLedger) s.mockChangeReader() - s.Assert().NoError(s.system.verifyState(false)) + s.Assert().NoError(s.system.verifyState(false, s.sequence, s.checkpointHash)) } func (s *DBTestSuite) TestVersionMismatchTriggersRebuild() { diff --git a/services/horizon/internal/ingest/fsm.go b/services/horizon/internal/ingest/fsm.go index 3ce02864c6..5dce974e35 100644 --- a/services/horizon/internal/ingest/fsm.go +++ b/services/horizon/internal/ingest/fsm.go @@ -551,7 +551,7 @@ func (r resumeState) run(s *system) (transition, error) { localLog.Info("Processed ledger") - s.maybeVerifyState(ingestLedger) + s.maybeVerifyState(ingestLedger, ledgerCloseMeta.BucketListHash()) s.maybeReapLookupTables(ingestLedger) return resumeImmediately(ingestLedger), nil @@ -745,7 +745,6 @@ func (v verifyRangeState) run(s *system) (transition, error) { return stop(), err } - var ledgerCloseMeta xdr.LedgerCloseMeta ledgerCloseMeta, err = s.ledgerBackend.GetLedger(s.ctx, sequence) if err != nil { return stop(), errors.Wrap(err, "error getting ledger") @@ -782,7 +781,11 @@ func (v verifyRangeState) run(s *system) (transition, error) { } if v.verifyState { - err = s.verifyState(false) + err = s.verifyState( + false, + ledgerCloseMeta.LedgerSequence(), + ledgerCloseMeta.BucketListHash(), + ) } return stop(), err diff --git a/services/horizon/internal/ingest/history_archive_adapter.go b/services/horizon/internal/ingest/history_archive_adapter.go index 7e415787e3..71c72f91a5 100644 --- a/services/horizon/internal/ingest/history_archive_adapter.go +++ b/services/horizon/internal/ingest/history_archive_adapter.go @@ -14,10 +14,14 @@ type historyArchiveAdapter struct { archive historyarchive.ArchiveInterface } +type verifiableChangeReader interface { + ingest.ChangeReader + VerifyBucketList(expectedHash xdr.Hash) error +} + type historyArchiveAdapterInterface interface { GetLatestLedgerSequence() (uint32, error) - BucketListHash(sequence uint32) (xdr.Hash, error) - GetState(ctx context.Context, sequence uint32) (ingest.ChangeReader, error) + GetState(ctx context.Context, sequence uint32) (verifiableChangeReader, error) GetStats() []historyarchive.ArchiveStats } @@ -36,27 +40,8 @@ func (haa *historyArchiveAdapter) GetLatestLedgerSequence() (uint32, error) { return has.CurrentLedger, nil } -// BucketListHash returns the bucket list hash to compare with hash in the -// ledger header fetched from Stellar-Core. -func (haa *historyArchiveAdapter) BucketListHash(sequence uint32) (xdr.Hash, error) { - exists, err := haa.archive.CategoryCheckpointExists("history", sequence) - if err != nil { - return xdr.Hash{}, errors.Wrap(err, "error checking if category checkpoint exists") - } - if !exists { - return xdr.Hash{}, errors.Errorf("history checkpoint does not exist for ledger %d", sequence) - } - - has, err := haa.archive.GetCheckpointHAS(sequence) - if err != nil { - return xdr.Hash{}, errors.Wrapf(err, "unable to get checkpoint HAS at ledger sequence %d", sequence) - } - - return has.BucketListHash() -} - // GetState returns a reader with the state of the ledger at the provided sequence number. -func (haa *historyArchiveAdapter) GetState(ctx context.Context, sequence uint32) (ingest.ChangeReader, error) { +func (haa *historyArchiveAdapter) GetState(ctx context.Context, sequence uint32) (verifiableChangeReader, error) { exists, err := haa.archive.CategoryCheckpointExists("history", sequence) if err != nil { return nil, errors.Wrap(err, "error checking if category checkpoint exists") diff --git a/services/horizon/internal/ingest/history_archive_adapter_test.go b/services/horizon/internal/ingest/history_archive_adapter_test.go index 20d84149fa..168c812d5d 100644 --- a/services/horizon/internal/ingest/history_archive_adapter_test.go +++ b/services/horizon/internal/ingest/history_archive_adapter_test.go @@ -6,12 +6,12 @@ import ( stdio "io" "testing" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stellar/go/historyarchive" - "github.com/stellar/go/ingest" "github.com/stellar/go/support/errors" "github.com/stellar/go/xdr" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" ) type mockHistoryArchiveAdapter struct { @@ -28,9 +28,9 @@ func (m *mockHistoryArchiveAdapter) BucketListHash(sequence uint32) (xdr.Hash, e return args.Get(0).(xdr.Hash), args.Error(1) } -func (m *mockHistoryArchiveAdapter) GetState(ctx context.Context, sequence uint32) (ingest.ChangeReader, error) { +func (m *mockHistoryArchiveAdapter) GetState(ctx context.Context, sequence uint32) (verifiableChangeReader, error) { args := m.Called(ctx, sequence) - return args.Get(0).(ingest.ChangeReader), args.Error(1) + return args.Get(0).(verifiableChangeReader), args.Error(1) } func (m *mockHistoryArchiveAdapter) GetStats() []historyarchive.ArchiveStats { diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index fe9c62eba4..7dbaacaadb 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -24,6 +24,7 @@ import ( "github.com/stellar/go/support/errors" logpkg "github.com/stellar/go/support/log" "github.com/stellar/go/support/storage" + "github.com/stellar/go/xdr" ) const ( @@ -669,7 +670,7 @@ func (s *system) runStateMachine(cur stateMachineNode) error { } } -func (s *system) maybeVerifyState(lastIngestedLedger uint32) { +func (s *system) maybeVerifyState(lastIngestedLedger uint32, expectedBucketListHash xdr.Hash) { stateInvalid, err := s.historyQ.GetExpStateInvalid(s.ctx) if err != nil { if !isCancelledError(s.ctx, err) { @@ -686,7 +687,7 @@ func (s *system) maybeVerifyState(lastIngestedLedger uint32) { go func() { defer s.wg.Done() - err := s.verifyState(true) + err := s.verifyState(true, lastIngestedLedger, expectedBucketListHash) if err != nil { if isCancelledError(s.ctx, err) { return diff --git a/services/horizon/internal/ingest/main_test.go b/services/horizon/internal/ingest/main_test.go index 0db777306c..6f5d89bd6a 100644 --- a/services/horizon/internal/ingest/main_test.go +++ b/services/horizon/internal/ingest/main_test.go @@ -188,11 +188,11 @@ func TestMaybeVerifyStateGetExpStateInvalidError(t *testing.T) { defer func() { log = oldLogger }() historyQ.On("GetExpStateInvalid", system.ctx).Return(false, db.ErrCancelled).Once() - system.maybeVerifyState(63) + system.maybeVerifyState(63, xdr.Hash{}) system.wg.Wait() historyQ.On("GetExpStateInvalid", system.ctx).Return(false, context.Canceled).Once() - system.maybeVerifyState(63) + system.maybeVerifyState(63, xdr.Hash{}) system.wg.Wait() logged := done() @@ -200,7 +200,7 @@ func TestMaybeVerifyStateGetExpStateInvalidError(t *testing.T) { // Ensure state verifier does not start also for any other error historyQ.On("GetExpStateInvalid", system.ctx).Return(false, errors.New("my error")).Once() - system.maybeVerifyState(63) + system.maybeVerifyState(63, xdr.Hash{}) system.wg.Wait() historyQ.AssertExpectations(t) @@ -227,11 +227,11 @@ func TestMaybeVerifyInternalDBErrCancelOrContextCanceled(t *testing.T) { historyQ.On("CloneIngestionQ").Return(historyQ).Twice() historyQ.On("BeginTx", mock.Anything, mock.Anything).Return(db.ErrCancelled).Once() - system.maybeVerifyState(63) + system.maybeVerifyState(63, xdr.Hash{}) system.wg.Wait() historyQ.On("BeginTx", mock.Anything, mock.Anything).Return(context.Canceled).Once() - system.maybeVerifyState(63) + system.maybeVerifyState(63, xdr.Hash{}) system.wg.Wait() logged := done() diff --git a/services/horizon/internal/ingest/processor_runner.go b/services/horizon/internal/ingest/processor_runner.go index 6832a5078f..9c38c01154 100644 --- a/services/horizon/internal/ingest/processor_runner.go +++ b/services/horizon/internal/ingest/processor_runner.go @@ -1,7 +1,6 @@ package ingest import ( - "bytes" "context" "fmt" "io" @@ -198,30 +197,6 @@ func (s *ProcessorRunner) checkIfProtocolVersionSupported(ledgerProtocolVersion return nil } -// validateBucketList validates if the bucket list hash in history archive -// matches the one in corresponding ledger header in stellar-core backend. -// This gives you full security if data in stellar-core backend can be trusted -// (ex. you run it in your infrastructure). -// The hashes of actual buckets of this HAS file are checked using -// historyarchive.XdrStream.SetExpectedHash (this is done in -// CheckpointChangeReader). -func (s *ProcessorRunner) validateBucketList(ledgerSequence uint32, ledgerBucketHashList xdr.Hash) error { - historyBucketListHash, err := s.historyAdapter.BucketListHash(ledgerSequence) - if err != nil { - return errors.Wrap(err, "Error getting bucket list hash") - } - - if !bytes.Equal(historyBucketListHash[:], ledgerBucketHashList[:]) { - return fmt.Errorf( - "Bucket list hash of history archive and ledger header does not match: %#x %#x", - historyBucketListHash, - ledgerBucketHashList, - ) - } - - return nil -} - func (s *ProcessorRunner) RunGenesisStateIngestion() (ingest.StatsChangeProcessorResults, error) { return s.RunHistoryArchiveIngestion(1, false, 0, xdr.Hash{}) } @@ -257,10 +232,6 @@ func (s *ProcessorRunner) RunHistoryArchiveIngestion( if err := s.checkIfProtocolVersionSupported(ledgerProtocolVersion); err != nil { return changeStats.GetResults(), errors.Wrap(err, "Error while checking for supported protocol version") } - - if err := s.validateBucketList(checkpointLedger, bucketListHash); err != nil { - return changeStats.GetResults(), errors.Wrap(err, "Error validating bucket list from HAS") - } } changeReader, err := s.historyAdapter.GetState(s.ctx, checkpointLedger) @@ -268,6 +239,12 @@ func (s *ProcessorRunner) RunHistoryArchiveIngestion( return changeStats.GetResults(), errors.Wrap(err, "Error creating HAS reader") } + if !skipChecks { + if err = changeReader.VerifyBucketList(bucketListHash); err != nil { + return changeStats.GetResults(), errors.Wrap(err, "Error validating bucket list from HAS") + } + } + defer changeReader.Close() log.WithField("sequence", checkpointLedger). diff --git a/services/horizon/internal/ingest/processor_runner_test.go b/services/horizon/internal/ingest/processor_runner_test.go index 8f6eb58d74..78faf853f3 100644 --- a/services/horizon/internal/ingest/processor_runner_test.go +++ b/services/horizon/internal/ingest/processor_runner_test.go @@ -78,13 +78,12 @@ func TestProcessorRunnerRunHistoryArchiveIngestionHistoryArchive(t *testing.T) { historyAdapter := &mockHistoryArchiveAdapter{} defer mock.AssertExpectationsForObjects(t, historyAdapter) - bucketListHash := xdr.Hash([32]byte{0, 1, 2}) - historyAdapter.On("BucketListHash", uint32(63)).Return(bucketListHash, nil).Once() - m := &ingest.MockChangeReader{} m.On("Read").Return(ingest.GenesisChange(network.PublicNetworkPassphrase), nil).Once() m.On("Read").Return(ingest.Change{}, io.EOF).Once() m.On("Close").Return(nil).Once() + bucketListHash := xdr.Hash([32]byte{0, 1, 2}) + m.On("VerifyBucketList", bucketListHash).Return(nil).Once() historyAdapter. On("GetState", ctx, uint32(63)). diff --git a/services/horizon/internal/ingest/verify.go b/services/horizon/internal/ingest/verify.go index 41b0eb98c5..294acd2a51 100644 --- a/services/horizon/internal/ingest/verify.go +++ b/services/horizon/internal/ingest/verify.go @@ -35,7 +35,7 @@ const stateVerifierExpectedIngestionVersion = 18 // verifyState is called as a go routine from pipeline post hook every 64 // ledgers. It checks if the state is correct. If another go routine is already // running it exits. -func (s *system) verifyState(verifyAgainstLatestCheckpoint bool) error { +func (s *system) verifyState(verifyAgainstLatestCheckpoint bool, checkpointSequence uint32, expectedBucketListHash xdr.Hash) error { s.stateVerificationMutex.Lock() if s.stateVerificationRunning { log.Warn("State verification is already running...") @@ -94,6 +94,12 @@ func (s *system) verifyState(verifyAgainstLatestCheckpoint bool) error { return nil } + if ledgerSequence != checkpointSequence { + localLog.WithField("checkpointSequence", checkpointSequence). + Info("Current ledger does not match checkpoint sequence. Canceling...") + return nil + } + ok, err := historyQ.TryStateVerificationLock(ctx) if err != nil { return errors.Wrap(err, "Error acquiring state verification lock") @@ -168,6 +174,9 @@ func (s *system) verifyState(verifyAgainstLatestCheckpoint bool) error { return errors.Wrap(err, "Error running GetState") } defer stateReader.Close() + if err = stateReader.VerifyBucketList(expectedBucketListHash); err != nil { + return ingest.NewStateError(err) + } verifier := verify.NewStateVerifier(stateReader, func(entry xdr.LedgerEntry) (bool, xdr.LedgerEntry) { entryType := entry.Data.Type diff --git a/services/horizon/internal/ingest/verify_range_state_test.go b/services/horizon/internal/ingest/verify_range_state_test.go index 7440f7dce0..a1df30d854 100644 --- a/services/horizon/internal/ingest/verify_range_state_test.go +++ b/services/horizon/internal/ingest/verify_range_state_test.go @@ -255,7 +255,9 @@ func (s *VerifyRangeStateTestSuite) TestSuccessWithVerify() { V0: &xdr.LedgerCloseMetaV0{ LedgerHeader: xdr.LedgerHeaderHistoryEntry{ Header: xdr.LedgerHeader{ - LedgerSeq: xdr.Uint32(i), + LedgerSeq: xdr.Uint32(i), + LedgerVersion: xdr.Uint32(MaxSupportedProtocolVersion), + BucketListHash: xdr.Hash{byte(i), 2, 3}, }, }, }, @@ -281,7 +283,10 @@ func (s *VerifyRangeStateTestSuite) TestSuccessWithVerify() { s.Assert().True(arg.ReadOnly) }).Return(nil).Once() clonedQ.On("Rollback").Return(nil).Once() - clonedQ.On("GetLastLedgerIngestNonBlocking", s.ctx).Return(uint32(63), nil).Once() + clonedQ.On("GetLastLedgerIngestNonBlocking", s.ctx).Return(uint32(110), nil).Once() + s.system.runStateVerificationOnLedger = func(u uint32) bool { + return u == 110 + } clonedQ.On("TryStateVerificationLock", s.ctx).Return(true, nil).Once() mockChangeReader := &ingest.MockChangeReader{} mockChangeReader.On("Close").Return(nil).Once() @@ -482,7 +487,8 @@ func (s *VerifyRangeStateTestSuite) TestSuccessWithVerify() { mockChangeReader.On("Read").Return(liquidityPoolChange, nil).Once() mockChangeReader.On("Read").Return(ingest.Change{}, io.EOF).Once() mockChangeReader.On("Read").Return(ingest.Change{}, io.EOF).Once() - s.historyAdapter.On("GetState", s.ctx, uint32(63)).Return(mockChangeReader, nil).Once() + mockChangeReader.On("VerifyBucketList", xdr.Hash{110, 2, 3}).Return(nil).Once() + s.historyAdapter.On("GetState", s.ctx, uint32(110)).Return(mockChangeReader, nil).Once() mockAccount := history.AccountEntry{ AccountID: mockAccountID, Balance: 600, diff --git a/services/horizon/internal/ingest/verify_test.go b/services/horizon/internal/ingest/verify_test.go index f32bccdc8f..b86eaf4db1 100644 --- a/services/horizon/internal/ingest/verify_test.go +++ b/services/horizon/internal/ingest/verify_test.go @@ -3,6 +3,7 @@ package ingest import ( "crypto/sha256" "database/sql" + "fmt" "io" "math/rand" "regexp" @@ -355,7 +356,7 @@ func TestStateVerifierLockBusy(t *testing.T) { tt.Assert.NoError(err) tt.Assert.True(ok) - tt.Assert.NoError(sys.verifyState(false)) + tt.Assert.NoError(sys.verifyState(false, checkpointLedger, xdr.Hash{})) mockHistoryAdapter.AssertExpectations(t) tt.Assert.NoError(otherQ.Rollback()) @@ -386,6 +387,8 @@ func TestStateVerifier(t *testing.T) { mockChangeReader.On("Read").Return(ingest.Change{}, io.EOF).Twice() mockChangeReader.On("Close").Return(nil).Once() + bucketListHash := xdr.Hash{1, 2, 3} + mockChangeReader.On("VerifyBucketList", bucketListHash).Return(nil).Once() mockHistoryAdapter := &mockHistoryArchiveAdapter{} mockHistoryAdapter.On("GetState", mock.AnythingOfType("*context.timerCtx"), uint32(checkpointLedger)).Return(mockChangeReader, nil).Once() @@ -399,7 +402,43 @@ func TestStateVerifier(t *testing.T) { } sys.initMetrics() - tt.Assert.NoError(sys.verifyState(false)) + tt.Assert.NoError(sys.verifyState(false, checkpointLedger, bucketListHash)) + mockChangeReader.AssertExpectations(t) + mockHistoryAdapter.AssertExpectations(t) +} + +func TestStateVerifierHashError(t *testing.T) { + tt := test.Start(t) + defer tt.Finish() + test.ResetHorizonDB(t, tt.HorizonDB) + q := &history.Q{&db.Session{DB: tt.HorizonDB}} + + ledger := rand.Int31() + checkpointLedger := uint32(ledger - (ledger % 64) - 1) + mockChangeReader := &ingest.MockChangeReader{} + + q.UpdateLastLedgerIngest(tt.Ctx, checkpointLedger) + + mockChangeReader.On("Close").Return(nil).Once() + bucketListHash := xdr.Hash{1, 2, 3} + mockChangeReader.On("VerifyBucketList", bucketListHash).Return(fmt.Errorf("hash mismatch error")).Once() + + mockHistoryAdapter := &mockHistoryArchiveAdapter{} + mockHistoryAdapter.On("GetState", mock.AnythingOfType("*context.timerCtx"), uint32(checkpointLedger)).Return(mockChangeReader, nil).Once() + + sys := &system{ + ctx: tt.Ctx, + historyQ: q, + historyAdapter: mockHistoryAdapter, + runStateVerificationOnLedger: ledgerEligibleForStateVerification(64, 1), + config: Config{StateVerificationTimeout: time.Hour}, + } + sys.initMetrics() + + err := sys.verifyState(false, checkpointLedger, bucketListHash) + tt.Assert.EqualError(err, "hash mismatch error") + _, isStateError := err.(ingest.StateError) + tt.Assert.True(isStateError) mockChangeReader.AssertExpectations(t) mockHistoryAdapter.AssertExpectations(t) }