From bc1a11170578a0be273cf920b28650f6dae35302 Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Thu, 27 Jun 2024 14:37:00 +0200 Subject: [PATCH] rework heightSub --- store/heightsub.go | 17 ++++++++--------- store/heightsub_test.go | 34 ++++++++++++++++++++++++++++++++-- store/store.go | 5 +++-- 3 files changed, 43 insertions(+), 13 deletions(-) diff --git a/store/heightsub.go b/store/heightsub.go index 8c6a28da..772933a4 100644 --- a/store/heightsub.go +++ b/store/heightsub.go @@ -28,14 +28,13 @@ func newHeightSub[H header.Header[H]]() *heightSub[H] { } } -// Height reports current height. -func (hs *heightSub[H]) Height() uint64 { - return hs.height.Load() +func (hs *heightSub[H]) isInited() bool { + return hs.height.Load() != 0 } -// SetHeight sets the new head height for heightSub. +// setHeight sets the new head height for heightSub. // Only higher then current height will be set. -func (hs *heightSub[H]) SetHeight(height uint64) { +func (hs *heightSub[H]) setHeight(height uint64) { for { curr := hs.height.Load() if curr >= height { @@ -52,12 +51,12 @@ func (hs *heightSub[H]) SetHeight(height uint64) { // and caller should get it elsewhere. func (hs *heightSub[H]) Sub(ctx context.Context, height uint64) (H, error) { var zero H - if hs.Height() >= height { + if hs.height.Load() >= height { return zero, errElapsedHeight } hs.heightReqsLk.Lock() - if hs.Height() >= height { + if hs.height.Load() >= height { // This is a rare case we have to account for. // The lock above can park a goroutine long enough for hs.height to change for a requested height, // leaving the request never fulfilled and the goroutine deadlocked. @@ -90,7 +89,7 @@ func (hs *heightSub[H]) Sub(ctx context.Context, height uint64) (H, error) { // Pub processes all the outstanding subscriptions matching the given headers. // Pub is only safe when called from one goroutine. -// For Pub to work correctly, heightSub has to be initialized with SetHeight +// For Pub to work correctly, heightSub has to be initialized with setHeight // so that given headers are contiguous to the height on heightSub. func (hs *heightSub[H]) Pub(headers ...H) { ln := len(headers) @@ -99,7 +98,7 @@ func (hs *heightSub[H]) Pub(headers ...H) { } from, to := headers[0].Height(), headers[ln-1].Height() - hs.SetHeight(to) + hs.setHeight(to) hs.heightReqsLk.Lock() defer hs.heightReqsLk.Unlock() diff --git a/store/heightsub_test.go b/store/heightsub_test.go index 64ef1804..88f4434b 100644 --- a/store/heightsub_test.go +++ b/store/heightsub_test.go @@ -20,7 +20,7 @@ func TestHeightSub(t *testing.T) { { h := headertest.RandDummyHeader(t) h.HeightI = 100 - hs.SetHeight(99) + hs.setHeight(99) hs.Pub(h) h, err := hs.Sub(ctx, 10) @@ -56,7 +56,7 @@ func TestHeightSubNonAdjacement(t *testing.T) { { h := headertest.RandDummyHeader(t) h.HeightI = 100 - hs.SetHeight(99) + hs.setHeight(99) hs.Pub(h) } @@ -78,6 +78,36 @@ func TestHeightSubNonAdjacement(t *testing.T) { } } +func TestHeightSub_monotonicHeight(t *testing.T) { + hs := newHeightSub[*headertest.DummyHeader]() + + { + h := headertest.RandDummyHeader(t) + h.HeightI = 100 + hs.setHeight(99) + hs.Pub(h) + } + + { + h1 := headertest.RandDummyHeader(t) + h1.HeightI = 200 + h2 := headertest.RandDummyHeader(t) + h2.HeightI = 300 + hs.Pub(h1, h2) + } + + { + + h1 := headertest.RandDummyHeader(t) + h1.HeightI = 120 + h2 := headertest.RandDummyHeader(t) + h2.HeightI = 130 + hs.Pub(h1, h2) + } + + assert.Equal(t, hs.height.Load(), uint64(300)) +} + func TestHeightSubCancellation(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() diff --git a/store/store.go b/store/store.go index 71790838..78c37043 100644 --- a/store/store.go +++ b/store/store.go @@ -53,7 +53,8 @@ type Store[H header.Header[H]] struct { writesDn chan struct{} // writeHead maintains the current write head writeHead atomic.Pointer[H] - + // knownHeaders tracks all processed headers + // to advance writeHead only over continuous headers. knownHeaders map[uint64]H // pending keeps headers pending to be written in one batch pending *batch[H] @@ -118,7 +119,7 @@ func newStore[H header.Header[H]](ds datastore.Batching, opts ...Option) (*Store } func (s *Store[H]) Init(ctx context.Context, initial H) error { - if s.heightSub.Height() != 0 { + if s.heightSub.isInited() { return errors.New("store already initialized") } // trust the given header as the initial head