diff --git a/header.go b/header.go index 010ab2ca..75cfa4d0 100644 --- a/header.go +++ b/header.go @@ -12,7 +12,8 @@ type Header[H any] interface { // New creates new instance of a header. // It exists to overcome limitation of Go's type system. // See: - // https://go.googlesource.com/proposal/+/refs/heads/master/design/43651-type-parameters.md#pointer-method-example + // + //https://go.googlesource.com/proposal/+/refs/heads/master/design/43651-type-parameters.md#pointer-method-example New() H // IsZero reports whether Header is a zero value of it's concrete type. IsZero() bool diff --git a/headertest/dummy_header.go b/headertest/dummy_header.go index e8480dde..108c4bc3 100644 --- a/headertest/dummy_header.go +++ b/headertest/dummy_header.go @@ -30,6 +30,9 @@ type DummyHeader struct { // SoftFailure allows for testing scenarios where a header would fail // verification with SoftFailure set to true SoftFailure bool + + // VerifyFn can be used to change header.Verify behaviour per header. + VerifyFn func(hdr *DummyHeader) error `json:"-"` } func RandDummyHeader(t *testing.T) *DummyHeader { @@ -100,6 +103,9 @@ func (d *DummyHeader) IsExpired(period time.Duration) bool { } func (d *DummyHeader) Verify(hdr *DummyHeader) error { + if d.VerifyFn != nil { + return d.VerifyFn(hdr) + } if hdr.VerifyFailure { return &header.VerifyError{Reason: ErrDummyVerify, SoftFailure: hdr.SoftFailure} } diff --git a/p2p/server_test.go b/p2p/server_test.go index 1e896b2e..77496d92 100644 --- a/p2p/server_test.go +++ b/p2p/server_test.go @@ -176,7 +176,7 @@ func (timeoutStore[H]) Append(ctx context.Context, _ ...H) error { return ctx.Err() } -func (timeoutStore[H]) GetRange(ctx context.Context, _ uint64, _ uint64) ([]H, error) { +func (timeoutStore[H]) GetRange(ctx context.Context, _, _ uint64) ([]H, error) { <-ctx.Done() return nil, ctx.Err() } diff --git a/sync/metrics.go b/sync/metrics.go index 590bd5ba..ceb43766 100644 --- a/sync/metrics.go +++ b/sync/metrics.go @@ -22,6 +22,7 @@ type metrics struct { trustedPeersOutOfSync metric.Int64Counter outdatedHeader metric.Int64Counter subjectiveInit metric.Int64Counter + failedBifurcations metric.Int64Counter subjectiveHead atomic.Int64 @@ -71,6 +72,16 @@ func newMetrics() (*metrics, error) { return nil, err } + failedBifurcations, err := meter.Int64Counter( + "hdr_failed_bifurcations_total", + metric.WithDescription( + "tracks how many times bifurcation failed against subjective head", + ), + ) + if err != nil { + return nil, err + } + subjectiveHead, err := meter.Int64ObservableGauge( "hdr_sync_subjective_head_gauge", metric.WithDescription("subjective head height"), @@ -112,6 +123,7 @@ func newMetrics() (*metrics, error) { trustedPeersOutOfSync: trustedPeersOutOfSync, outdatedHeader: outdatedHeader, subjectiveInit: subjectiveInit, + failedBifurcations: failedBifurcations, syncLoopDurationHist: syncLoopDurationHist, syncLoopRunningInst: syncLoopRunningInst, requestRangeTimeHist: requestRangeTimeHist, @@ -186,6 +198,17 @@ func (m *metrics) newSubjectiveHead(ctx context.Context, height uint64, timestam }) } +func (m *metrics) failedBifurcation(ctx context.Context, height int64, hash string) { + m.observe(ctx, func(ctx context.Context) { + m.failedBifurcations.Add(ctx, 1, + metric.WithAttributes( + attribute.Int64("height", height), + attribute.String("hash", hash), + ), + ) + }) +} + func (m *metrics) rangeRequestStart() { if m == nil { return diff --git a/sync/sync_head.go b/sync/sync_head.go index c74347b7..99de4cf0 100644 --- a/sync/sync_head.go +++ b/sync/sync_head.go @@ -3,6 +3,7 @@ package sync import ( "context" "errors" + "fmt" "time" "github.com/celestiaorg/go-header" @@ -159,7 +160,7 @@ func (s *Syncer[H]) incomingNetworkHead(ctx context.Context, head H) error { } // verify verifies given network head candidate. -func (s *Syncer[H]) verify(ctx context.Context, newHead H) (bool, error) { +func (s *Syncer[H]) verify(ctx context.Context, newHead H) (isSoft bool, _ error) { sbjHead, err := s.subjectiveHead(ctx) if err != nil { log.Errorw("getting subjective head during validation", "err", err) @@ -173,7 +174,12 @@ func (s *Syncer[H]) verify(ctx context.Context, newHead H) (bool, error) { } var verErr *header.VerifyError - if errors.As(err, &verErr) && !verErr.SoftFailure { + if errors.As(err, &verErr) { + if verErr.SoftFailure { + err := s.verifyBifurcating(ctx, sbjHead, newHead) + return err != nil, err + } + logF := log.Warnw if errors.Is(err, header.ErrKnownHeader) { logF = log.Debugw @@ -186,7 +192,63 @@ func (s *Syncer[H]) verify(ctx context.Context, newHead H) (bool, error) { "reason", verErr.Reason) } - return verErr.SoftFailure, err + return false, err +} + +// verifyBifurcating verifies networkHead against subjHead via the interim headers when direct +// verification is impossible. +// It tries to find a header (or several headers if necessary) between the networkHead and +// the subjectiveHead such that non-adjacent (or in the worst case adjacent) verification +// passes and the networkHead can be verified as a valid sync target against the syncer's +// subjectiveHead. +// A non-nil error is returned when networkHead can't be verified. +func (s *Syncer[H]) verifyBifurcating(ctx context.Context, subjHead, networkHead H) error { + log.Warnw("header bifurcation started", "height", networkHead.Height(), "hash", networkHead.Hash().String()) + + subjHeight := subjHead.Height() + + diff := networkHead.Height() - subjHeight + + for diff > 1 { + candidateHeight := subjHeight + diff/2 + + candidateHeader, err := s.getter.GetByHeight(ctx, candidateHeight) + if err != nil { + return err + } + + if err := header.Verify(subjHead, candidateHeader); err != nil { + var verErr *header.VerifyError + if errors.As(err, &verErr) && !verErr.SoftFailure { + return err + } + + // candidate failed, go deeper in 1st half. + diff = diff / 2 + continue + } + + // candidate was validated properly, update subjHead. + subjHead = candidateHeader + s.setSubjectiveHead(ctx, subjHead) + + if err := header.Verify(subjHead, networkHead); err == nil { + // network head validate properly, return success. + return nil + } + + // new subjHead failed, go deeper in 2nd half. + subjHeight = subjHead.Height() + diff = networkHead.Height() - subjHeight + } + + s.metrics.failedBifurcation(ctx, int64(networkHead.Height()), networkHead.Hash().String()) + log.Warnw("header bifurcation failed", "height", networkHead.Height(), "hash", networkHead.Hash().String()) + + return &header.VerifyError{ + Reason: fmt.Errorf("sync: header validation against subjHead height:%d hash:%x", networkHead.Height(), networkHead.Hash().String()), + SoftFailure: false, + } } // isExpired checks if header is expired against trusting period. diff --git a/sync/sync_head_test.go b/sync/sync_head_test.go index eecd3a35..7a285461 100644 --- a/sync/sync_head_test.go +++ b/sync/sync_head_test.go @@ -95,6 +95,230 @@ func TestSyncer_HeadWithTrustedHead(t *testing.T) { require.True(t, wrappedGetter.withTrustedHead) } +// Test will simulate a case with upto `iters` failures before we will get to +// the header that can be verified against subjectiveHead. +func TestSyncer_verifyBifurcatingSuccess(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + t.Cleanup(cancel) + + suite := headertest.NewTestSuite(t) + head := suite.Head() + + localStore := newTestStore(t, ctx, head) + remoteStore := newTestStore(t, ctx, head) + + // create a wrappedGetter to track exchange interactions + wrappedGetter := newWrappedGetter(local.NewExchange(remoteStore)) + + syncer, err := NewSyncer( + wrappedGetter, + localStore, + headertest.NewDummySubscriber(), + WithBlockTime(time.Nanosecond), + WithRecencyThreshold(time.Nanosecond), // forces a request for a new sync target + // ensures that syncer's store contains a subjective head that is within + // the unbonding period so that the syncer can use a header from the network + // as a sync target + WithTrustingPeriod(time.Hour), + ) + require.NoError(t, err) + + // start the syncer which triggers a Head request that will + // load the syncer's subjective head from the store, and request + // a new sync target from the network rather than from trusted peers + err = syncer.Start(ctx) + require.NoError(t, err) + t.Cleanup(func() { + err = syncer.Stop(ctx) + require.NoError(t, err) + }) + + // when + const total = 1000 + const badHeaderHeight = total + 1 // make the last header bad + const iters = 4 + + headers := suite.GenDummyHeaders(total) + err = remoteStore.Append(ctx, headers...) + require.NoError(t, err) + + // configure header verification method is such way + // that the first [iters] verification will fail + // but all other will be ok. + var verifyCounter atomic.Int32 + for i := range total { + headers[i].VerifyFn = func(hdr *headertest.DummyHeader) error { + if hdr.Height() != badHeaderHeight { + return nil + } + + verifyCounter.Add(1) + if verifyCounter.Load() >= iters { + return nil + } + + return &header.VerifyError{ + Reason: headertest.ErrDummyVerify, + SoftFailure: hdr.SoftFailure, + } + } + } + + headers[total-1].VerifyFailure = true + headers[total-1].SoftFailure = true + + subjHead, err := syncer.subjectiveHead(ctx) + require.NoError(t, err) + + err = syncer.verifyBifurcating(ctx, subjHead, headers[total-1]) + require.NoError(t, err) +} + +// Test will simulate a case with upto `iters` failures before we will get to +// the header that can be verified against subjectiveHead. +func TestSyncer_verifyBifurcatingSuccessWithBadCandidates(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + t.Cleanup(cancel) + + suite := headertest.NewTestSuite(t) + head := suite.Head() + + localStore := newTestStore(t, ctx, head) + remoteStore := newTestStore(t, ctx, head) + + // create a wrappedGetter to track exchange interactions + wrappedGetter := newWrappedGetter(local.NewExchange(remoteStore)) + + syncer, err := NewSyncer( + wrappedGetter, + localStore, + headertest.NewDummySubscriber(), + WithBlockTime(time.Nanosecond), + WithRecencyThreshold(time.Nanosecond), // forces a request for a new sync target + // ensures that syncer's store contains a subjective head that is within + // the unbonding period so that the syncer can use a header from the network + // as a sync target + WithTrustingPeriod(time.Hour), + ) + require.NoError(t, err) + + // start the syncer which triggers a Head request that will + // load the syncer's subjective head from the store, and request + // a new sync target from the network rather than from trusted peers + err = syncer.Start(ctx) + require.NoError(t, err) + t.Cleanup(func() { + err = syncer.Stop(ctx) + require.NoError(t, err) + }) + + const total = 1000 + const badHeaderHeight = total + 1 + const iters = 4 + + headers := suite.GenDummyHeaders(total) + err = remoteStore.Append(ctx, headers...) + require.NoError(t, err) + + // configure header verification method is such way + // that the first [iters] verification will fail + // but all other will be ok. + var verifyCounter atomic.Int32 + for i := range total { + headers[i].VerifyFn = func(hdr *headertest.DummyHeader) error { + if i >= 501 { + return nil + } + + verifyCounter.Add(1) + if verifyCounter.Load() > iters { + return nil + } + return &header.VerifyError{ + Reason: headertest.ErrDummyVerify, + SoftFailure: hdr.SoftFailure, + } + } + } + + headers[total-1].VerifyFailure = true + headers[total-1].SoftFailure = true + + subjHead, err := syncer.subjectiveHead(ctx) + require.NoError(t, err) + + err = syncer.verifyBifurcating(ctx, subjHead, headers[total-1]) + require.NoError(t, err) +} + +// Test will simulate a case when no headers can be verified against subjectiveHead. +// As a result the [NewValidatorSetCantBeTrustedError] error will be returned. +func TestSyncer_verifyBifurcatingCannotVerify(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + t.Cleanup(cancel) + + suite := headertest.NewTestSuite(t) + head := suite.Head() + + localStore := newTestStore(t, ctx, head) + remoteStore := newTestStore(t, ctx, head) + + // create a wrappedGetter to track exchange interactions + wrappedGetter := newWrappedGetter(local.NewExchange(remoteStore)) + + syncer, err := NewSyncer( + wrappedGetter, + localStore, + headertest.NewDummySubscriber(), + WithBlockTime(time.Nanosecond), + WithRecencyThreshold(time.Nanosecond), // forces a request for a new sync target + // ensures that syncer's store contains a subjective head that is within + // the unbonding period so that the syncer can use a header from the network + // as a sync target + WithTrustingPeriod(time.Hour), + ) + require.NoError(t, err) + + // start the syncer which triggers a Head request that will + // load the syncer's subjective head from the store, and request + // a new sync target from the network rather than from trusted peers + err = syncer.Start(ctx) + require.NoError(t, err) + t.Cleanup(func() { + err = syncer.Stop(ctx) + require.NoError(t, err) + }) + + const total = 1000 + const badHeaderHeight = total + 1 + + headers := suite.GenDummyHeaders(total) + err = remoteStore.Append(ctx, headers...) + require.NoError(t, err) + + for i := range total { + headers[i].VerifyFn = func(hdr *headertest.DummyHeader) error { + if hdr.Height() != badHeaderHeight { + return nil + } + + return &header.VerifyError{ + Reason: headertest.ErrDummyVerify, + SoftFailure: hdr.SoftFailure, + } + } + } + + headers[total-1].VerifyFailure = true + headers[total-1].SoftFailure = true + + subjHead, err := syncer.subjectiveHead(ctx) + require.NoError(t, err) + + err = syncer.verifyBifurcating(ctx, subjHead, headers[total-1]) + assert.Error(t, err) +} + type wrappedGetter struct { ex header.Exchange[*headertest.DummyHeader] @@ -126,7 +350,7 @@ func (t *wrappedGetter) Get(ctx context.Context, hash header.Hash) (*headertest. } func (t *wrappedGetter) GetByHeight(ctx context.Context, u uint64) (*headertest.DummyHeader, error) { - return nil, errors.New("implement me") + return t.ex.GetByHeight(ctx, u) } func (t *wrappedGetter) GetRangeByHeight(