From 9e25e4abbd74b252541cd5347427dc28e7d99d93 Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Tue, 25 Jun 2024 19:09:17 +0200 Subject: [PATCH 01/21] fix(store): properly update heightSub height --- store/heightsub.go | 35 +++++++++++++++++++++------ store/heightsub_test.go | 31 ++++++++++++++++++++++++ store/store_test.go | 52 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 111 insertions(+), 7 deletions(-) diff --git a/store/heightsub.go b/store/heightsub.go index 2335001d..002505a9 100644 --- a/store/heightsub.go +++ b/store/heightsub.go @@ -19,12 +19,14 @@ type heightSub[H header.Header[H]] struct { height atomic.Uint64 heightReqsLk sync.Mutex heightReqs map[uint64]map[chan H]struct{} + knownHeights map[uint64]struct{} } // newHeightSub instantiates new heightSub. func newHeightSub[H header.Header[H]]() *heightSub[H] { return &heightSub[H]{ - heightReqs: make(map[uint64]map[chan H]struct{}), + heightReqs: make(map[uint64]map[chan H]struct{}), + knownHeights: map[uint64]struct{}{}, } } @@ -89,17 +91,13 @@ func (hs *heightSub[H]) Pub(headers ...H) { return } - height := hs.Height() from, to := headers[0].Height(), headers[ln-1].Height() - if height+1 != from && height != 0 { // height != 0 is needed to enable init from any height and not only 1 - log.Fatalf("PLEASE FILE A BUG REPORT: headers given to the heightSub are in the wrong order: expected %d, got %d", height+1, from) - return - } - hs.SetHeight(to) hs.heightReqsLk.Lock() defer hs.heightReqsLk.Unlock() + hs.tryAdvanceHeight(headers...) + // there is a common case where we Pub only header // in this case, we shouldn't loop over each heightReqs // and instead read from the map directly @@ -128,3 +126,26 @@ func (hs *heightSub[H]) Pub(headers ...H) { } } } + +func (hs *heightSub[H]) tryAdvanceHeight(headers ...H) { + curr := hs.Height() + + // collect all new heights. + for i := range headers { + h := headers[i].Height() + if h > curr { + hs.knownHeights[h] = struct{}{} + } + } + + // try advance heightSub.Height if we saw a relevant height before. + for len(hs.knownHeights) > 0 { + _, ok := hs.knownHeights[curr+1] + if !ok { + break + } + delete(hs.knownHeights, curr+1) + curr++ + } + hs.SetHeight(curr) +} diff --git a/store/heightsub_test.go b/store/heightsub_test.go index 3a48d950..64ef1804 100644 --- a/store/heightsub_test.go +++ b/store/heightsub_test.go @@ -47,6 +47,37 @@ func TestHeightSub(t *testing.T) { } } +func TestHeightSubNonAdjacement(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + hs := newHeightSub[*headertest.DummyHeader]() + + { + h := headertest.RandDummyHeader(t) + h.HeightI = 100 + hs.SetHeight(99) + hs.Pub(h) + } + + { + go func() { + // fixes flakiness on CI + time.Sleep(time.Millisecond) + + h1 := headertest.RandDummyHeader(t) + h1.HeightI = 200 + h2 := headertest.RandDummyHeader(t) + h2.HeightI = 300 + hs.Pub(h1, h2) + }() + + h, err := hs.Sub(ctx, 200) + assert.NoError(t, err) + assert.NotNil(t, h) + } +} + func TestHeightSubCancellation(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() diff --git a/store/store_test.go b/store/store_test.go index 53d40d55..e52ddb6c 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -141,6 +141,58 @@ func TestStore_Append_BadHeader(t *testing.T) { require.Error(t, err) } +func TestStore_Append_stableHeadWhenGaps(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + t.Cleanup(cancel) + + suite := headertest.NewTestSuite(t) + + ds := sync.MutexWrap(datastore.NewMapDatastore()) + store := NewTestStore(t, ctx, ds, suite.Head(), WithWriteBatchSize(4)) + + head, err := store.Head(ctx) + require.NoError(t, err) + assert.Equal(t, head.Hash(), suite.Head().Hash()) + + in := suite.GenDummyHeaders(5) + + err = store.Append(ctx, in...) + require.NoError(t, err) + // wait for batch to be written. + time.Sleep(100 * time.Millisecond) + + wantHead := in[4] // last header from incomming headers. + + head, err = store.Head(ctx) + require.NoError(t, err) + assert.Equal(t, head.Hash(), wantHead.Hash()) + + in = suite.GenDummyHeaders(10) + // make a gap + missedHeaders, in := in[:5], in[5:] + latestHead := in[len(in)-1] + + err = store.Append(ctx, in...) + require.NoError(t, err) + // wait for batch to be written. + time.Sleep(100 * time.Millisecond) + + // head is not advanced due to a gap. + head, err = store.Head(ctx) + require.NoError(t, err) + assert.Equal(t, head.Hash(), wantHead.Hash()) + + err = store.Append(ctx, missedHeaders...) + require.NoError(t, err) + // wait for batch to be written. + time.Sleep(100 * time.Millisecond) + + // after appending missing headers we're on a latest header. + head, err = store.Head(ctx) + require.NoError(t, err) + assert.Equal(t, head.Hash(), latestHead.Hash()) +} + // TestStore_GetRange tests possible combinations of requests and ensures that // the store can handle them adequately (even malformed requests) func TestStore_GetRange(t *testing.T) { From bad94caf5ed10dd1275900b9787639e8a8a958ff Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Wed, 26 Jun 2024 08:59:54 +0200 Subject: [PATCH 02/21] better test --- store/store_test.go | 64 +++++++++++++++++++++++++-------------------- 1 file changed, 35 insertions(+), 29 deletions(-) diff --git a/store/store_test.go b/store/store_test.go index e52ddb6c..4dc9ae96 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -154,43 +154,49 @@ func TestStore_Append_stableHeadWhenGaps(t *testing.T) { require.NoError(t, err) assert.Equal(t, head.Hash(), suite.Head().Hash()) - in := suite.GenDummyHeaders(5) + firstChunk := suite.GenDummyHeaders(5) + missedChunk := suite.GenDummyHeaders(5) + lastChunk := suite.GenDummyHeaders(5) - err = store.Append(ctx, in...) - require.NoError(t, err) - // wait for batch to be written. - time.Sleep(100 * time.Millisecond) + wantHead := firstChunk[len(firstChunk)-1] + latestHead := lastChunk[len(lastChunk)-1] - wantHead := in[4] // last header from incomming headers. + { + err := store.Append(ctx, firstChunk...) + require.NoError(t, err) + // wait for batch to be written. + time.Sleep(100 * time.Millisecond) - head, err = store.Head(ctx) - require.NoError(t, err) - assert.Equal(t, head.Hash(), wantHead.Hash()) + // head is advanced to the last known header. + head, err := store.Head(ctx) + require.NoError(t, err) + assert.Equal(t, head.Hash(), wantHead.Hash()) + } - in = suite.GenDummyHeaders(10) - // make a gap - missedHeaders, in := in[:5], in[5:] - latestHead := in[len(in)-1] + { + err := store.Append(ctx, lastChunk...) + require.NoError(t, err) + // wait for batch to be written. + time.Sleep(100 * time.Millisecond) - err = store.Append(ctx, in...) - require.NoError(t, err) - // wait for batch to be written. - time.Sleep(100 * time.Millisecond) + // head is not advanced due to a gap. + head, err := store.Head(ctx) + require.NoError(t, err) + assert.Equal(t, head.Hash(), wantHead.Hash()) + } - // head is not advanced due to a gap. - head, err = store.Head(ctx) - require.NoError(t, err) - assert.Equal(t, head.Hash(), wantHead.Hash()) + { - err = store.Append(ctx, missedHeaders...) - require.NoError(t, err) - // wait for batch to be written. - time.Sleep(100 * time.Millisecond) + err := store.Append(ctx, missedChunk...) + require.NoError(t, err) + // wait for batch to be written. + time.Sleep(100 * time.Millisecond) - // after appending missing headers we're on a latest header. - head, err = store.Head(ctx) - require.NoError(t, err) - assert.Equal(t, head.Hash(), latestHead.Hash()) + // after appending missing headers we're on the latest header. + head, err := store.Head(ctx) + require.NoError(t, err) + assert.Equal(t, head.Hash(), latestHead.Hash()) + } } // TestStore_GetRange tests possible combinations of requests and ensures that From 8b1bbc7fedb2de5649e2fde9b988a7ecefedde5b Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Wed, 26 Jun 2024 14:44:12 +0200 Subject: [PATCH 03/21] fix(store): properly update store head --- store/heightsub.go | 30 ++---------------------------- store/store.go | 21 +++++++++++++++++++-- store/store_test.go | 37 +++++++++++++++++++++++-------------- 3 files changed, 44 insertions(+), 44 deletions(-) diff --git a/store/heightsub.go b/store/heightsub.go index 002505a9..02d34eaf 100644 --- a/store/heightsub.go +++ b/store/heightsub.go @@ -19,14 +19,12 @@ type heightSub[H header.Header[H]] struct { height atomic.Uint64 heightReqsLk sync.Mutex heightReqs map[uint64]map[chan H]struct{} - knownHeights map[uint64]struct{} } // newHeightSub instantiates new heightSub. func newHeightSub[H header.Header[H]]() *heightSub[H] { return &heightSub[H]{ - heightReqs: make(map[uint64]map[chan H]struct{}), - knownHeights: map[uint64]struct{}{}, + heightReqs: make(map[uint64]map[chan H]struct{}), } } @@ -92,12 +90,11 @@ func (hs *heightSub[H]) Pub(headers ...H) { } from, to := headers[0].Height(), headers[ln-1].Height() + hs.SetHeight(to) hs.heightReqsLk.Lock() defer hs.heightReqsLk.Unlock() - hs.tryAdvanceHeight(headers...) - // there is a common case where we Pub only header // in this case, we shouldn't loop over each heightReqs // and instead read from the map directly @@ -126,26 +123,3 @@ func (hs *heightSub[H]) Pub(headers ...H) { } } } - -func (hs *heightSub[H]) tryAdvanceHeight(headers ...H) { - curr := hs.Height() - - // collect all new heights. - for i := range headers { - h := headers[i].Height() - if h > curr { - hs.knownHeights[h] = struct{}{} - } - } - - // try advance heightSub.Height if we saw a relevant height before. - for len(hs.knownHeights) > 0 { - _, ok := hs.knownHeights[curr+1] - if !ok { - break - } - delete(hs.knownHeights, curr+1) - curr++ - } - hs.SetHeight(curr) -} diff --git a/store/store.go b/store/store.go index 657a22c7..25c68241 100644 --- a/store/store.go +++ b/store/store.go @@ -1,9 +1,11 @@ package store import ( + "cmp" "context" "errors" "fmt" + "slices" "sync/atomic" "time" @@ -51,6 +53,8 @@ type Store[H header.Header[H]] struct { writesDn chan struct{} // writeHead maintains the current write head writeHead atomic.Pointer[H] + + knownHeights map[uint64]struct{} // pending keeps headers pending to be written in one batch pending *batch[H] @@ -108,6 +112,8 @@ func newStore[H header.Header[H]](ds datastore.Batching, opts ...Option) (*Store writesDn: make(chan struct{}), pending: newBatch[H](params.WriteBatchSize), Params: params, + + knownHeights: make(map[uint64]struct{}), }, nil } @@ -319,10 +325,15 @@ func (s *Store[H]) Append(ctx context.Context, headers ...H) error { head = *headPtr } + hightestHead := head + + slices.SortFunc(headers, func(a, b H) int { + return cmp.Compare(a.Height(), b.Height()) + }) + // collect valid headers verified := make([]H, 0, lh) for i, h := range headers { - err = head.Verify(h) if err != nil { var verErr *header.VerifyError @@ -344,11 +355,17 @@ func (s *Store[H]) Append(ctx context.Context, headers ...H) error { } verified = append(verified, h) head = h + + if hightestHead.Height()+1 == head.Height() { + hightestHead = head + } else { + s.knownHeights[head.Height()] = struct{}{} + } } onWrite := func() { newHead := verified[len(verified)-1] - s.writeHead.Store(&newHead) + s.writeHead.Store(&hightestHead) log.Infow("new head", "height", newHead.Height(), "hash", newHead.Hash()) s.metrics.newHead(newHead.Height()) } diff --git a/store/store_test.go b/store/store_test.go index 4dc9ae96..a7c1ce1b 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -155,11 +155,22 @@ func TestStore_Append_stableHeadWhenGaps(t *testing.T) { assert.Equal(t, head.Hash(), suite.Head().Hash()) firstChunk := suite.GenDummyHeaders(5) + for i := range firstChunk { + t.Log("firstChunk:", firstChunk[i].Height(), firstChunk[i].Hash()) + } missedChunk := suite.GenDummyHeaders(5) + for i := range missedChunk { + t.Log("missedChunk:", missedChunk[i].Height(), missedChunk[i].Hash()) + } lastChunk := suite.GenDummyHeaders(5) + for i := range lastChunk { + t.Log("lastChunk:", lastChunk[i].Height(), lastChunk[i].Hash()) + } wantHead := firstChunk[len(firstChunk)-1] + t.Log("wantHead", wantHead.Height(), wantHead.Hash()) latestHead := lastChunk[len(lastChunk)-1] + t.Log("latestHead", latestHead.Height(), latestHead.Hash()) { err := store.Append(ctx, firstChunk...) @@ -172,7 +183,6 @@ func TestStore_Append_stableHeadWhenGaps(t *testing.T) { require.NoError(t, err) assert.Equal(t, head.Hash(), wantHead.Hash()) } - { err := store.Append(ctx, lastChunk...) require.NoError(t, err) @@ -182,21 +192,20 @@ func TestStore_Append_stableHeadWhenGaps(t *testing.T) { // head is not advanced due to a gap. head, err := store.Head(ctx) require.NoError(t, err) + assert.Equal(t, head.Height(), wantHead.Height()) + t.Log("head", head.Height(), head.Hash()) assert.Equal(t, head.Hash(), wantHead.Hash()) } - - { - - err := store.Append(ctx, missedChunk...) - require.NoError(t, err) - // wait for batch to be written. - time.Sleep(100 * time.Millisecond) - - // after appending missing headers we're on the latest header. - head, err := store.Head(ctx) - require.NoError(t, err) - assert.Equal(t, head.Hash(), latestHead.Hash()) - } + // { + // err := store.Append(ctx, missedChunk...) + // require.NoError(t, err) + // // wait for batch to be written. + // time.Sleep(100 * time.Millisecond) + // // after appending missing headers we're on the latest header. + // head, err := store.Head(ctx) + // require.NoError(t, err) + // assert.Equal(t, head.Hash(), latestHead.Hash()) + // } } // TestStore_GetRange tests possible combinations of requests and ensures that From 8f62e7d84975e560796b9775d02131d7b363f761 Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Thu, 27 Jun 2024 11:58:39 +0200 Subject: [PATCH 04/21] make store better --- store/store.go | 57 +++++++++++++++++++++++++++++++------------------- 1 file changed, 36 insertions(+), 21 deletions(-) diff --git a/store/store.go b/store/store.go index 25c68241..d820aa60 100644 --- a/store/store.go +++ b/store/store.go @@ -54,7 +54,7 @@ type Store[H header.Header[H]] struct { // writeHead maintains the current write head writeHead atomic.Pointer[H] - knownHeights map[uint64]struct{} + knownHeaders map[uint64]H // pending keeps headers pending to be written in one batch pending *batch[H] @@ -113,7 +113,7 @@ func newStore[H header.Header[H]](ds datastore.Batching, opts ...Option) (*Store pending: newBatch[H](params.WriteBatchSize), Params: params, - knownHeights: make(map[uint64]struct{}), + knownHeaders: make(map[uint64]H), }, nil } @@ -173,24 +173,19 @@ func (s *Store[H]) Height() uint64 { return s.heightSub.Height() } +// Head returns the highest contiguous header written to the store. func (s *Store[H]) Head(ctx context.Context, _ ...header.HeadOption[H]) (H, error) { - head, err := s.GetByHeight(ctx, s.heightSub.Height()) - if err == nil { - return head, nil + headPtr := s.writeHead.Load() + if headPtr != nil { + return *headPtr, nil } - var zero H - head, err = s.readHead(ctx) - switch { - default: + head, err := s.readHead(ctx) + if err != nil { + var zero H return zero, err - case errors.Is(err, datastore.ErrNotFound), errors.Is(err, header.ErrNotFound): - return zero, header.ErrNoHead - case err == nil: - s.heightSub.SetHeight(head.Height()) - log.Infow("loaded head", "height", head.Height(), "hash", head.Hash()) - return head, nil } + return head, nil } func (s *Store[H]) Get(ctx context.Context, hash header.Hash) (H, error) { @@ -306,6 +301,9 @@ func (s *Store[H]) HasAt(_ context.Context, height uint64) bool { return height != uint64(0) && s.Height() >= height } +// Append the given headers to the store. Real write to the disk happens +// asynchronously and might fail without reporting error (just logging). +// TODO(cristaloleg): add retries to the flush worker. func (s *Store[H]) Append(ctx context.Context, headers ...H) error { lh := len(headers) if lh == 0 { @@ -325,7 +323,7 @@ func (s *Store[H]) Append(ctx context.Context, headers ...H) error { head = *headPtr } - hightestHead := head + continuousHead := head slices.SortFunc(headers, func(a, b H) int { return cmp.Compare(a.Height(), b.Height()) @@ -356,16 +354,17 @@ func (s *Store[H]) Append(ctx context.Context, headers ...H) error { verified = append(verified, h) head = h - if hightestHead.Height()+1 == head.Height() { - hightestHead = head + if continuousHead.Height()+1 == head.Height() { + continuousHead = head } else { - s.knownHeights[head.Height()] = struct{}{} + s.knownHeaders[head.Height()] = head } } onWrite := func() { - newHead := verified[len(verified)-1] - s.writeHead.Store(&hightestHead) + newHead := s.tryAdvanceHead(continuousHead) + s.writeHead.Store(&newHead) + log.Infow("new head", "height", newHead.Height(), "hash", newHead.Hash()) s.metrics.newHead(newHead.Height()) } @@ -510,6 +509,22 @@ func (s *Store[H]) get(ctx context.Context, hash header.Hash) ([]byte, error) { return data, nil } +// try advance heighest header if we saw a higher continuous before. +func (s *Store[H]) tryAdvanceHead(highestHead H) H { + curr := highestHead.Height() + + for len(s.knownHeaders) > 0 { + h, ok := s.knownHeaders[curr+1] + if !ok { + break + } + highestHead = h + delete(s.knownHeaders, curr+1) + curr++ + } + return highestHead +} + // indexTo saves mapping between header Height and Hash to the given batch. func indexTo[H header.Header[H]](ctx context.Context, batch datastore.Batch, headers ...H) error { for _, h := range headers { From 4b13167c6ea03d523acebd08c61355e6edcb7aee Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Thu, 27 Jun 2024 12:03:16 +0200 Subject: [PATCH 05/21] enable test properly --- store/store_test.go | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/store/store_test.go b/store/store_test.go index a7c1ce1b..9994ca2b 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -196,16 +196,17 @@ func TestStore_Append_stableHeadWhenGaps(t *testing.T) { t.Log("head", head.Height(), head.Hash()) assert.Equal(t, head.Hash(), wantHead.Hash()) } - // { - // err := store.Append(ctx, missedChunk...) - // require.NoError(t, err) - // // wait for batch to be written. - // time.Sleep(100 * time.Millisecond) - // // after appending missing headers we're on the latest header. - // head, err := store.Head(ctx) - // require.NoError(t, err) - // assert.Equal(t, head.Hash(), latestHead.Hash()) - // } + { + err := store.Append(ctx, missedChunk...) + require.NoError(t, err) + // wait for batch to be written. + time.Sleep(100 * time.Millisecond) + + // after appending missing headers we're on the latest header. + head, err := store.Head(ctx) + require.NoError(t, err) + assert.Equal(t, head.Hash(), latestHead.Hash()) + } } // TestStore_GetRange tests possible combinations of requests and ensures that From 2dbec9392cae12a64475029a4ecc86d32b5e4744 Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Thu, 27 Jun 2024 13:07:24 +0200 Subject: [PATCH 06/21] fixes --- store/heightsub.go | 11 ++++++++++- store/store.go | 10 +++++++++- store/store_test.go | 12 ++++++++++++ 3 files changed, 31 insertions(+), 2 deletions(-) diff --git a/store/heightsub.go b/store/heightsub.go index 02d34eaf..8c6a28da 100644 --- a/store/heightsub.go +++ b/store/heightsub.go @@ -34,8 +34,17 @@ func (hs *heightSub[H]) Height() uint64 { } // SetHeight sets the new head height for heightSub. +// Only higher then current height will be set. func (hs *heightSub[H]) SetHeight(height uint64) { - hs.height.Store(height) + for { + curr := hs.height.Load() + if curr >= height { + return + } + if hs.height.CompareAndSwap(curr, height) { + return + } + } } // Sub subscribes for a header of a given height. diff --git a/store/store.go b/store/store.go index d820aa60..71790838 100644 --- a/store/store.go +++ b/store/store.go @@ -170,7 +170,15 @@ func (s *Store[H]) Stop(ctx context.Context) error { } func (s *Store[H]) Height() uint64 { - return s.heightSub.Height() + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + head, err := s.Head(ctx) + if err != nil { + // TODO(cristaloleg): log? panic? retry? + return 0 + } + return head.Height() } // Head returns the highest contiguous header written to the store. diff --git a/store/store_test.go b/store/store_test.go index 9994ca2b..b98f1b8f 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -182,6 +182,10 @@ func TestStore_Append_stableHeadWhenGaps(t *testing.T) { head, err := store.Head(ctx) require.NoError(t, err) assert.Equal(t, head.Hash(), wantHead.Hash()) + + // check that store height is aligned with the head. + height := store.Height() + assert.Equal(t, height, head.Height()) } { err := store.Append(ctx, lastChunk...) @@ -195,6 +199,10 @@ func TestStore_Append_stableHeadWhenGaps(t *testing.T) { assert.Equal(t, head.Height(), wantHead.Height()) t.Log("head", head.Height(), head.Hash()) assert.Equal(t, head.Hash(), wantHead.Hash()) + + // check that store height is aligned with the head. + height := store.Height() + assert.Equal(t, height, head.Height()) } { err := store.Append(ctx, missedChunk...) @@ -206,6 +214,10 @@ func TestStore_Append_stableHeadWhenGaps(t *testing.T) { head, err := store.Head(ctx) require.NoError(t, err) assert.Equal(t, head.Hash(), latestHead.Hash()) + + // check that store height is aligned with the head. + height := store.Height() + assert.Equal(t, height, head.Height()) } } From b4738e4130b1bcb3be8b33b9cebdf1c636e16d77 Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Thu, 27 Jun 2024 14:05:51 +0200 Subject: [PATCH 07/21] make no-op instead of a panic --- sync/sync_head_test.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/sync/sync_head_test.go b/sync/sync_head_test.go index cc60e481..eecd3a35 100644 --- a/sync/sync_head_test.go +++ b/sync/sync_head_test.go @@ -2,6 +2,7 @@ package sync import ( "context" + "errors" "sync" "sync/atomic" "testing" @@ -121,13 +122,11 @@ func (t *wrappedGetter) Head(ctx context.Context, options ...header.HeadOption[* } func (t *wrappedGetter) Get(ctx context.Context, hash header.Hash) (*headertest.DummyHeader, error) { - // TODO implement me - panic("implement me") + return nil, errors.New("implement me") } func (t *wrappedGetter) GetByHeight(ctx context.Context, u uint64) (*headertest.DummyHeader, error) { - // TODO implement me - panic("implement me") + return nil, errors.New("implement me") } func (t *wrappedGetter) GetRangeByHeight( @@ -135,6 +134,5 @@ func (t *wrappedGetter) GetRangeByHeight( from *headertest.DummyHeader, to uint64, ) ([]*headertest.DummyHeader, error) { - // TODO implement me - panic("implement me") + return nil, errors.New("implement me") } From bc1a11170578a0be273cf920b28650f6dae35302 Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Thu, 27 Jun 2024 14:37:00 +0200 Subject: [PATCH 08/21] rework heightSub --- store/heightsub.go | 17 ++++++++--------- store/heightsub_test.go | 34 ++++++++++++++++++++++++++++++++-- store/store.go | 5 +++-- 3 files changed, 43 insertions(+), 13 deletions(-) diff --git a/store/heightsub.go b/store/heightsub.go index 8c6a28da..772933a4 100644 --- a/store/heightsub.go +++ b/store/heightsub.go @@ -28,14 +28,13 @@ func newHeightSub[H header.Header[H]]() *heightSub[H] { } } -// Height reports current height. -func (hs *heightSub[H]) Height() uint64 { - return hs.height.Load() +func (hs *heightSub[H]) isInited() bool { + return hs.height.Load() != 0 } -// SetHeight sets the new head height for heightSub. +// setHeight sets the new head height for heightSub. // Only higher then current height will be set. -func (hs *heightSub[H]) SetHeight(height uint64) { +func (hs *heightSub[H]) setHeight(height uint64) { for { curr := hs.height.Load() if curr >= height { @@ -52,12 +51,12 @@ func (hs *heightSub[H]) SetHeight(height uint64) { // and caller should get it elsewhere. func (hs *heightSub[H]) Sub(ctx context.Context, height uint64) (H, error) { var zero H - if hs.Height() >= height { + if hs.height.Load() >= height { return zero, errElapsedHeight } hs.heightReqsLk.Lock() - if hs.Height() >= height { + if hs.height.Load() >= height { // This is a rare case we have to account for. // The lock above can park a goroutine long enough for hs.height to change for a requested height, // leaving the request never fulfilled and the goroutine deadlocked. @@ -90,7 +89,7 @@ func (hs *heightSub[H]) Sub(ctx context.Context, height uint64) (H, error) { // Pub processes all the outstanding subscriptions matching the given headers. // Pub is only safe when called from one goroutine. -// For Pub to work correctly, heightSub has to be initialized with SetHeight +// For Pub to work correctly, heightSub has to be initialized with setHeight // so that given headers are contiguous to the height on heightSub. func (hs *heightSub[H]) Pub(headers ...H) { ln := len(headers) @@ -99,7 +98,7 @@ func (hs *heightSub[H]) Pub(headers ...H) { } from, to := headers[0].Height(), headers[ln-1].Height() - hs.SetHeight(to) + hs.setHeight(to) hs.heightReqsLk.Lock() defer hs.heightReqsLk.Unlock() diff --git a/store/heightsub_test.go b/store/heightsub_test.go index 64ef1804..88f4434b 100644 --- a/store/heightsub_test.go +++ b/store/heightsub_test.go @@ -20,7 +20,7 @@ func TestHeightSub(t *testing.T) { { h := headertest.RandDummyHeader(t) h.HeightI = 100 - hs.SetHeight(99) + hs.setHeight(99) hs.Pub(h) h, err := hs.Sub(ctx, 10) @@ -56,7 +56,7 @@ func TestHeightSubNonAdjacement(t *testing.T) { { h := headertest.RandDummyHeader(t) h.HeightI = 100 - hs.SetHeight(99) + hs.setHeight(99) hs.Pub(h) } @@ -78,6 +78,36 @@ func TestHeightSubNonAdjacement(t *testing.T) { } } +func TestHeightSub_monotonicHeight(t *testing.T) { + hs := newHeightSub[*headertest.DummyHeader]() + + { + h := headertest.RandDummyHeader(t) + h.HeightI = 100 + hs.setHeight(99) + hs.Pub(h) + } + + { + h1 := headertest.RandDummyHeader(t) + h1.HeightI = 200 + h2 := headertest.RandDummyHeader(t) + h2.HeightI = 300 + hs.Pub(h1, h2) + } + + { + + h1 := headertest.RandDummyHeader(t) + h1.HeightI = 120 + h2 := headertest.RandDummyHeader(t) + h2.HeightI = 130 + hs.Pub(h1, h2) + } + + assert.Equal(t, hs.height.Load(), uint64(300)) +} + func TestHeightSubCancellation(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() diff --git a/store/store.go b/store/store.go index 71790838..78c37043 100644 --- a/store/store.go +++ b/store/store.go @@ -53,7 +53,8 @@ type Store[H header.Header[H]] struct { writesDn chan struct{} // writeHead maintains the current write head writeHead atomic.Pointer[H] - + // knownHeaders tracks all processed headers + // to advance writeHead only over continuous headers. knownHeaders map[uint64]H // pending keeps headers pending to be written in one batch pending *batch[H] @@ -118,7 +119,7 @@ func newStore[H header.Header[H]](ds datastore.Batching, opts ...Option) (*Store } func (s *Store[H]) Init(ctx context.Context, initial H) error { - if s.heightSub.Height() != 0 { + if s.heightSub.isInited() { return errors.New("store already initialized") } // trust the given header as the initial head From 7fc0dfd98d466bafce875a678a853d6e7a61cf1a Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Thu, 27 Jun 2024 16:48:30 +0200 Subject: [PATCH 09/21] new solution --- store/store.go | 49 +++++++++++++++++++++++------------- store/store_test.go | 60 ++++++++++++++++++++++++++++++++++++--------- 2 files changed, 80 insertions(+), 29 deletions(-) diff --git a/store/store.go b/store/store.go index 78c37043..0be59a38 100644 --- a/store/store.go +++ b/store/store.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "slices" + "sync" "sync/atomic" "time" @@ -53,9 +54,12 @@ type Store[H header.Header[H]] struct { writesDn chan struct{} // writeHead maintains the current write head writeHead atomic.Pointer[H] + + knownHeadersLk sync.Mutex // knownHeaders tracks all processed headers // to advance writeHead only over continuous headers. knownHeaders map[uint64]H + // pending keeps headers pending to be written in one batch pending *batch[H] @@ -322,18 +326,18 @@ func (s *Store[H]) Append(ctx context.Context, headers ...H) error { var err error // take current write head to verify headers against var head H - headPtr := s.writeHead.Load() - if headPtr == nil { + if headPtr := s.writeHead.Load(); headPtr == nil { head, err = s.Head(ctx) if err != nil { return err } + // store header from the disk. + gotHead := head + s.writeHead.CompareAndSwap(nil, &gotHead) } else { head = *headPtr } - continuousHead := head - slices.SortFunc(headers, func(a, b H) int { return cmp.Compare(a.Height(), b.Height()) }) @@ -363,17 +367,15 @@ func (s *Store[H]) Append(ctx context.Context, headers ...H) error { verified = append(verified, h) head = h - if continuousHead.Height()+1 == head.Height() { - continuousHead = head - } else { + { + s.knownHeadersLk.Lock() s.knownHeaders[head.Height()] = head + s.knownHeadersLk.Unlock() } } onWrite := func() { - newHead := s.tryAdvanceHead(continuousHead) - s.writeHead.Store(&newHead) - + newHead := s.tryAdvanceHead() log.Infow("new head", "height", newHead.Height(), "hash", newHead.Hash()) s.metrics.newHead(newHead.Height()) } @@ -519,19 +521,32 @@ func (s *Store[H]) get(ctx context.Context, hash header.Hash) ([]byte, error) { } // try advance heighest header if we saw a higher continuous before. -func (s *Store[H]) tryAdvanceHead(highestHead H) H { - curr := highestHead.Height() +func (s *Store[H]) tryAdvanceHead() H { + s.knownHeadersLk.Lock() + defer s.knownHeadersLk.Unlock() + head := *s.writeHead.Load() + height := head.Height() + currHead := head + + // try to move to the next height. for len(s.knownHeaders) > 0 { - h, ok := s.knownHeaders[curr+1] + h, ok := s.knownHeaders[height+1] if !ok { break } - highestHead = h - delete(s.knownHeaders, curr+1) - curr++ + head = h + delete(s.knownHeaders, height+1) + height++ + } + + // if writeHead not set OR it's height is less then we found then update. + if currHead.Height() < head.Height() { + // we don't need CAS here because that's the only place + // where writeHead is updated, knownHeadersLk ensures 1 goroutine. + s.writeHead.Store(&head) } - return highestHead + return head } // indexTo saves mapping between header Height and Hash to the given batch. diff --git a/store/store_test.go b/store/store_test.go index b98f1b8f..fcaefa6e 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -2,6 +2,8 @@ package store import ( "context" + "math/rand" + stdsync "sync" "testing" "time" @@ -141,6 +143,52 @@ func TestStore_Append_BadHeader(t *testing.T) { require.Error(t, err) } +func TestStore_Append(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + t.Cleanup(cancel) + + suite := headertest.NewTestSuite(t) + + ds := sync.MutexWrap(datastore.NewMapDatastore()) + store := NewTestStore(t, ctx, ds, suite.Head(), WithWriteBatchSize(4)) + + head, err := store.Head(ctx) + require.NoError(t, err) + assert.Equal(t, head.Hash(), suite.Head().Hash()) + + const workers = 10 + const chunk = 5 + headers := suite.GenDummyHeaders(workers * chunk) + + errCh := make(chan error, workers) + var wg stdsync.WaitGroup + wg.Add(workers) + + for i := range workers { + go func() { + defer wg.Done() + // make every append happened in random order. + time.Sleep(time.Duration(rand.Intn(10)) * time.Millisecond) + + err := store.Append(ctx, headers[i*chunk:(i+1)*chunk]...) + errCh <- err + }() + } + + wg.Wait() + close(errCh) + for err := range errCh { + assert.NoError(t, err) + } + + // wait for batch to be written. + time.Sleep(100 * time.Millisecond) + + head, err = store.Head(ctx) + assert.NoError(t, err) + assert.Equal(t, head.Hash(), headers[len(headers)-1].Hash()) +} + func TestStore_Append_stableHeadWhenGaps(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) t.Cleanup(cancel) @@ -155,22 +203,11 @@ func TestStore_Append_stableHeadWhenGaps(t *testing.T) { assert.Equal(t, head.Hash(), suite.Head().Hash()) firstChunk := suite.GenDummyHeaders(5) - for i := range firstChunk { - t.Log("firstChunk:", firstChunk[i].Height(), firstChunk[i].Hash()) - } missedChunk := suite.GenDummyHeaders(5) - for i := range missedChunk { - t.Log("missedChunk:", missedChunk[i].Height(), missedChunk[i].Hash()) - } lastChunk := suite.GenDummyHeaders(5) - for i := range lastChunk { - t.Log("lastChunk:", lastChunk[i].Height(), lastChunk[i].Hash()) - } wantHead := firstChunk[len(firstChunk)-1] - t.Log("wantHead", wantHead.Height(), wantHead.Hash()) latestHead := lastChunk[len(lastChunk)-1] - t.Log("latestHead", latestHead.Height(), latestHead.Hash()) { err := store.Append(ctx, firstChunk...) @@ -197,7 +234,6 @@ func TestStore_Append_stableHeadWhenGaps(t *testing.T) { head, err := store.Head(ctx) require.NoError(t, err) assert.Equal(t, head.Height(), wantHead.Height()) - t.Log("head", head.Height(), head.Hash()) assert.Equal(t, head.Hash(), wantHead.Hash()) // check that store height is aligned with the head. From 742a2ba000df85212ef2e1d005663d49d36ab194 Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Fri, 28 Jun 2024 12:53:09 +0200 Subject: [PATCH 10/21] fix comments --- store/store.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/store/store.go b/store/store.go index 0be59a38..b7cd6c19 100644 --- a/store/store.go +++ b/store/store.go @@ -366,12 +366,7 @@ func (s *Store[H]) Append(ctx context.Context, headers ...H) error { } verified = append(verified, h) head = h - - { - s.knownHeadersLk.Lock() - s.knownHeaders[head.Height()] = head - s.knownHeadersLk.Unlock() - } + s.addKnownHeader(head) } onWrite := func() { @@ -520,6 +515,12 @@ func (s *Store[H]) get(ctx context.Context, hash header.Hash) ([]byte, error) { return data, nil } +func (s *Store[H]) addKnownHeader(h H) { + s.knownHeadersLk.Lock() + s.knownHeaders[h.Height()] = h + s.knownHeadersLk.Unlock() +} + // try advance heighest header if we saw a higher continuous before. func (s *Store[H]) tryAdvanceHead() H { s.knownHeadersLk.Lock() @@ -540,7 +541,7 @@ func (s *Store[H]) tryAdvanceHead() H { height++ } - // if writeHead not set OR it's height is less then we found then update. + // we found higher continuous header, so update. if currHead.Height() < head.Height() { // we don't need CAS here because that's the only place // where writeHead is updated, knownHeadersLk ensures 1 goroutine. From 9cb41db19f1ac72b0f5189799c9ab78efa69baa0 Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Tue, 23 Jul 2024 10:41:25 +0200 Subject: [PATCH 11/21] fix --- store/store.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/store/store.go b/store/store.go index b7cd6c19..5894fcea 100644 --- a/store/store.go +++ b/store/store.go @@ -180,8 +180,7 @@ func (s *Store[H]) Height() uint64 { head, err := s.Head(ctx) if err != nil { - // TODO(cristaloleg): log? panic? retry? - return 0 + panic(err) } return head.Height() } @@ -333,7 +332,7 @@ func (s *Store[H]) Append(ctx context.Context, headers ...H) error { } // store header from the disk. gotHead := head - s.writeHead.CompareAndSwap(nil, &gotHead) + s.writeHead.Store(&gotHead) } else { head = *headPtr } From 1a0ed3966d6978221885f51070c8d1b94ab17d52 Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Tue, 23 Jul 2024 11:55:40 +0200 Subject: [PATCH 12/21] panic less often --- store/store.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/store/store.go b/store/store.go index 5894fcea..73e0afcf 100644 --- a/store/store.go +++ b/store/store.go @@ -180,6 +180,10 @@ func (s *Store[H]) Height() uint64 { head, err := s.Head(ctx) if err != nil { + if errors.Is(err, context.Canceled) || + errors.Is(err, datastore.ErrNotFound) { + return 0 + } panic(err) } return head.Height() From 1d53e1a1ced36fc14e9a373a6fa560b9e5f41bf9 Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Tue, 30 Jul 2024 15:14:04 +0200 Subject: [PATCH 13/21] updated writeHead after a real write --- store/store.go | 53 ++++++++++++++++++++------------------------- store/store_test.go | 12 ++++++---- 2 files changed, 31 insertions(+), 34 deletions(-) diff --git a/store/store.go b/store/store.go index 73e0afcf..83d98494 100644 --- a/store/store.go +++ b/store/store.go @@ -201,6 +201,9 @@ func (s *Store[H]) Head(ctx context.Context, _ ...header.HeadOption[H]) (H, erro var zero H return zero, err } + + s.writeHead.CompareAndSwap(nil, &head) + return head, nil } @@ -326,19 +329,10 @@ func (s *Store[H]) Append(ctx context.Context, headers ...H) error { return nil } - var err error // take current write head to verify headers against - var head H - if headPtr := s.writeHead.Load(); headPtr == nil { - head, err = s.Head(ctx) - if err != nil { - return err - } - // store header from the disk. - gotHead := head - s.writeHead.Store(&gotHead) - } else { - head = *headPtr + head, err := s.Head(ctx) + if err != nil { + return err } slices.SortFunc(headers, func(a, b H) int { @@ -369,13 +363,6 @@ func (s *Store[H]) Append(ctx context.Context, headers ...H) error { } verified = append(verified, h) head = h - s.addKnownHeader(head) - } - - onWrite := func() { - newHead := s.tryAdvanceHead() - log.Infow("new head", "height", newHead.Height(), "hash", newHead.Hash()) - s.metrics.newHead(newHead.Height()) } // queue headers to be written on disk @@ -383,15 +370,14 @@ func (s *Store[H]) Append(ctx context.Context, headers ...H) error { case s.writes <- verified: // we return an error here after writing, // as there might be an invalid header in between of a given range - onWrite() return err default: s.metrics.writesQueueBlocked(ctx) } + // if the writes queue is full, we block until it is not select { case s.writes <- verified: - onWrite() return err case <-s.writesDn: return errStoppedStore @@ -430,6 +416,9 @@ func (s *Store[H]) flushLoop() { s.metrics.flush(ctx, time.Since(startTime), s.pending.Len(), true) continue } + + s.tryAdvanceHead(toFlush...) + s.metrics.flush(ctx, time.Since(startTime), s.pending.Len(), false) // reset pending s.pending.Reset() @@ -518,18 +507,21 @@ func (s *Store[H]) get(ctx context.Context, hash header.Hash) ([]byte, error) { return data, nil } -func (s *Store[H]) addKnownHeader(h H) { - s.knownHeadersLk.Lock() - s.knownHeaders[h.Height()] = h - s.knownHeadersLk.Unlock() -} - // try advance heighest header if we saw a higher continuous before. -func (s *Store[H]) tryAdvanceHead() H { +func (s *Store[H]) tryAdvanceHead(headers ...H) { + headPtr := s.writeHead.Load() + if headPtr == nil || len(headers) == 0 { + return + } + s.knownHeadersLk.Lock() defer s.knownHeadersLk.Unlock() - head := *s.writeHead.Load() + for _, h := range headers { + s.knownHeaders[h.Height()] = h + } + + head := *headPtr height := head.Height() currHead := head @@ -549,8 +541,9 @@ func (s *Store[H]) tryAdvanceHead() H { // we don't need CAS here because that's the only place // where writeHead is updated, knownHeadersLk ensures 1 goroutine. s.writeHead.Store(&head) + log.Infow("new head", "height", head.Height(), "hash", head.Hash()) + s.metrics.newHead(head.Height()) } - return head } // indexTo saves mapping between header Height and Hash to the given batch. diff --git a/store/store_test.go b/store/store_test.go index fcaefa6e..a0f2512f 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -1,6 +1,7 @@ package store import ( + "bytes" "context" "math/rand" stdsync "sync" @@ -22,7 +23,7 @@ func TestStore(t *testing.T) { suite := headertest.NewTestSuite(t) ds := sync.MutexWrap(datastore.NewMapDatastore()) - store := NewTestStore(t, ctx, ds, suite.Head()) + store := NewTestStore(t, ctx, ds, suite.Head(), WithWriteBatchSize(5)) head, err := store.Head(ctx) require.NoError(t, err) @@ -38,9 +39,12 @@ func TestStore(t *testing.T) { assert.Equal(t, h.Hash(), out[i].Hash()) } - head, err = store.Head(ctx) - require.NoError(t, err) - assert.Equal(t, out[len(out)-1].Hash(), head.Hash()) + // we need to wait for a flush + assert.Eventually(t, func() bool { + head, err = store.Head(ctx) + require.NoError(t, err) + return bytes.Equal(out[len(out)-1].Hash(), head.Hash()) + }, time.Second, 100*time.Millisecond) ok, err := store.Has(ctx, in[5].Hash()) require.NoError(t, err) From da710ebf911aba51e0fd56d0675ac6bcc86fe300 Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Wed, 31 Jul 2024 14:26:04 +0200 Subject: [PATCH 14/21] cleanups --- store/store.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/store/store.go b/store/store.go index 83d98494..9cb839f8 100644 --- a/store/store.go +++ b/store/store.go @@ -322,7 +322,6 @@ func (s *Store[H]) HasAt(_ context.Context, height uint64) bool { // Append the given headers to the store. Real write to the disk happens // asynchronously and might fail without reporting error (just logging). -// TODO(cristaloleg): add retries to the flush worker. func (s *Store[H]) Append(ctx context.Context, headers ...H) error { lh := len(headers) if lh == 0 { @@ -521,9 +520,9 @@ func (s *Store[H]) tryAdvanceHead(headers ...H) { s.knownHeaders[h.Height()] = h } - head := *headPtr - height := head.Height() - currHead := head + currHead := *headPtr + height := currHead.Height() + newHead := currHead // try to move to the next height. for len(s.knownHeaders) > 0 { @@ -531,18 +530,19 @@ func (s *Store[H]) tryAdvanceHead(headers ...H) { if !ok { break } - head = h + newHead = h delete(s.knownHeaders, height+1) height++ } - // we found higher continuous header, so update. - if currHead.Height() < head.Height() { + // we found higher continuous header - update. + if currHead.Height() < newHead.Height() { // we don't need CAS here because that's the only place // where writeHead is updated, knownHeadersLk ensures 1 goroutine. - s.writeHead.Store(&head) - log.Infow("new head", "height", head.Height(), "hash", head.Hash()) - s.metrics.newHead(head.Height()) + // NOTE: Store[H].Head also updates writeHead but only once when it's nil. + s.writeHead.Store(&newHead) + log.Infow("new head", "height", newHead.Height(), "hash", newHead.Hash()) + s.metrics.newHead(newHead.Height()) } } From 0c93588a01cf5a53f96da2e31e09524a052a7c51 Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Thu, 1 Aug 2024 11:59:51 +0200 Subject: [PATCH 15/21] fix syncer test --- sync/sync_test.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/sync/sync_test.go b/sync/sync_test.go index b9acb2d3..94bf5d3c 100644 --- a/sync/sync_test.go +++ b/sync/sync_test.go @@ -303,7 +303,7 @@ func TestSync_InvalidSyncTarget(t *testing.T) { head := suite.Head() // create a local store which is initialised at genesis height - localStore := newTestStore(t, ctx, head) + localStore := newTestStore(t, ctx, head, store.WithWriteBatchSize(10)) // create a peer which is already on height 100 remoteStore := headertest.NewStore(t, suite, 100) @@ -347,7 +347,14 @@ func TestSync_InvalidSyncTarget(t *testing.T) { // ensure syncer could only sync up to one header below the bad sync target h, err := localStore.Head(ctx) require.NoError(t, err) - require.Equal(t, maliciousHeader.Height()-1, h.Height()) + + // we need to wait for a flush + assert.Eventually(t, func() bool { + h, err = localStore.Head(ctx) + require.NoError(t, err) + + return maliciousHeader.Height()-1 == h.Height() + }, time.Second, 100*time.Millisecond) // manually change bad sync target to a good header in remote peer // store so it can re-serve it to syncer once it re-requests the height @@ -400,7 +407,7 @@ func (d *delayedGetter[H]) GetRangeByHeight(ctx context.Context, from H, to uint } // newTestStore creates initialized and started in memory header Store which is useful for testing. -func newTestStore(tb testing.TB, ctx context.Context, head *headertest.DummyHeader) header.Store[*headertest.DummyHeader] { +func newTestStore(tb testing.TB, ctx context.Context, head *headertest.DummyHeader, opts ...store.Option) header.Store[*headertest.DummyHeader] { ds := sync.MutexWrap(datastore.NewMapDatastore()) - return store.NewTestStore(tb, ctx, ds, head) + return store.NewTestStore(tb, ctx, ds, head, opts...) } From 7bb1d5c4833a489ea317e9ed561844904bc4940a Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Thu, 1 Aug 2024 13:51:20 +0200 Subject: [PATCH 16/21] fix syncer again --- sync/sync_test.go | 60 ++++++++++++++++++++++++++++++----------------- 1 file changed, 38 insertions(+), 22 deletions(-) diff --git a/sync/sync_test.go b/sync/sync_test.go index 94bf5d3c..0429ab4c 100644 --- a/sync/sync_test.go +++ b/sync/sync_test.go @@ -23,14 +23,14 @@ func TestSyncSimpleRequestingHead(t *testing.T) { suite := headertest.NewTestSuite(t) head := suite.Head() - remoteStore := newTestStore(t, ctx, head) + remoteStore := newTestStore(t, ctx, head, store.WithWriteBatchSize(1)) err := remoteStore.Append(ctx, suite.GenDummyHeaders(100)...) require.NoError(t, err) _, err = remoteStore.GetByHeight(ctx, 100) require.NoError(t, err) - localStore := newTestStore(t, ctx, head) + localStore := newTestStore(t, ctx, head, store.WithWriteBatchSize(1)) syncer, err := NewSyncer( local.NewExchange(remoteStore), localStore, @@ -47,19 +47,30 @@ func TestSyncSimpleRequestingHead(t *testing.T) { err = syncer.SyncWait(ctx) require.NoError(t, err) - exp, err := remoteStore.Head(ctx) - require.NoError(t, err) + // force sync to update underlying stores. + syncer.wantSync() - have, err := localStore.Head(ctx) - require.NoError(t, err) - assert.Equal(t, exp.Height(), have.Height()) - assert.Empty(t, syncer.pending.Head()) + // we need to wait for a flush + assert.Eventually(t, func() bool { + exp, err := remoteStore.Head(ctx) + require.NoError(t, err) - state := syncer.State() - assert.Equal(t, uint64(exp.Height()), state.Height) - assert.Equal(t, uint64(2), state.FromHeight) - assert.Equal(t, uint64(exp.Height()), state.ToHeight) - assert.True(t, state.Finished(), state) + have, err := localStore.Head(ctx) + require.NoError(t, err) + + state := syncer.State() + + ok := true + ok = ok && exp.Height() == have.Height() + ok = ok && syncer.pending.Head() == nil + + ok = ok && uint64(exp.Height()) == state.Height + ok = ok && uint64(2) == state.FromHeight + ok = ok && uint64(exp.Height()) == state.ToHeight + ok = ok && state.Finished() + + return ok + }, 2*time.Second, 100*time.Millisecond) } func TestDoSyncFullRangeFromExternalPeer(t *testing.T) { @@ -108,8 +119,8 @@ func TestSyncCatchUp(t *testing.T) { suite := headertest.NewTestSuite(t) head := suite.Head() - remoteStore := newTestStore(t, ctx, head) - localStore := newTestStore(t, ctx, head) + remoteStore := newTestStore(t, ctx, head, store.WithWriteBatchSize(1)) + localStore := newTestStore(t, ctx, head, store.WithWriteBatchSize(1)) syncer, err := NewSyncer( local.NewExchange(remoteStore), localStore, @@ -138,12 +149,17 @@ func TestSyncCatchUp(t *testing.T) { require.NoError(t, err) // 4. assert syncer caught-up - have, err := localStore.Head(ctx) - require.NoError(t, err) + // we need to wait for a flush + assert.Eventually(t, func() bool { + have, err := localStore.Head(ctx) + require.NoError(t, err) - assert.Equal(t, have.Height(), incomingHead.Height()) - assert.Equal(t, exp.Height()+1, have.Height()) // plus one as we didn't add last header to remoteStore - assert.Empty(t, syncer.pending.Head()) + ok := true + ok = ok && have.Height() == incomingHead.Height() + ok = ok && exp.Height()+1 == have.Height() // plus one as we didn't add last header to remoteStore + ok = ok && syncer.pending.Head() == nil + return ok + }, time.Second, 100*time.Millisecond) state := syncer.State() assert.Equal(t, uint64(exp.Height()+1), state.Height) @@ -210,7 +226,7 @@ func TestSyncPendingRangesWithMisses(t *testing.T) { require.NoError(t, err) assert.Equal(t, exp.Height(), have.Height()) - assert.Empty(t, syncer.pending.Head()) // assert all cache from pending is used + assert.Nil(t, syncer.pending.Head()) // assert all cache from pending is used } // TestSyncer_FindHeadersReturnsCorrectRange ensures that `findHeaders` returns @@ -303,7 +319,7 @@ func TestSync_InvalidSyncTarget(t *testing.T) { head := suite.Head() // create a local store which is initialised at genesis height - localStore := newTestStore(t, ctx, head, store.WithWriteBatchSize(10)) + localStore := newTestStore(t, ctx, head, store.WithWriteBatchSize(1)) // create a peer which is already on height 100 remoteStore := headertest.NewStore(t, suite, 100) From 61a6b319dbad5dccd83f7fefd8546943cbc3b0e4 Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Mon, 7 Oct 2024 14:25:53 +0200 Subject: [PATCH 17/21] no more knownHeaders map --- store/store.go | 56 ++++++++++++++++----------------------------- store/store_test.go | 2 +- 2 files changed, 21 insertions(+), 37 deletions(-) diff --git a/store/store.go b/store/store.go index 526b195f..9afbfe78 100644 --- a/store/store.go +++ b/store/store.go @@ -6,7 +6,6 @@ import ( "errors" "fmt" "slices" - "sync" "sync/atomic" "time" @@ -55,11 +54,6 @@ type Store[H header.Header[H]] struct { // writeHead maintains the current write head writeHead atomic.Pointer[H] - knownHeadersLk sync.Mutex - // knownHeaders tracks all processed headers - // to advance writeHead only over continuous headers. - knownHeaders map[uint64]H - // pending keeps headers pending to be written in one batch pending *batch[H] @@ -117,8 +111,6 @@ func newStore[H header.Header[H]](ds datastore.Batching, opts ...Option) (*Store writesDn: make(chan struct{}), pending: newBatch[H](params.WriteBatchSize), Params: params, - - knownHeaders: make(map[uint64]H), }, nil } @@ -423,7 +415,7 @@ func (s *Store[H]) flushLoop() { time.Sleep(sleep) } - s.tryAdvanceHead(toFlush...) + s.tryAdvanceHead(ctx, toFlush...) s.metrics.flush(ctx, time.Since(startTime), s.pending.Len(), false) // reset pending @@ -513,43 +505,35 @@ func (s *Store[H]) get(ctx context.Context, hash header.Hash) ([]byte, error) { return data, nil } -// try advance heighest header if we saw a higher continuous before. -func (s *Store[H]) tryAdvanceHead(headers ...H) { - headPtr := s.writeHead.Load() - if headPtr == nil || len(headers) == 0 { +// try advance heighest writeHead based on passed or already written headers. +func (s *Store[H]) tryAdvanceHead(ctx context.Context, headers ...H) { + writeHead := s.writeHead.Load() + if writeHead == nil || len(headers) == 0 { return } - s.knownHeadersLk.Lock() - defer s.knownHeadersLk.Unlock() + currHeight := (*writeHead).Height() - for _, h := range headers { - s.knownHeaders[h.Height()] = h + // advance based on passed headers. + for i := 0; i < len(headers); i++ { + if headers[i].Height() != currHeight+1 { + break + } + s.writeHead.Store(&headers[i]) + currHeight++ } - currHead := *headPtr - height := currHead.Height() - newHead := currHead + ctx, cancel := context.WithTimeout(ctx, time.Second) + defer cancel() - // try to move to the next height. - for len(s.knownHeaders) > 0 { - h, ok := s.knownHeaders[height+1] - if !ok { + // advance based on already written headers. + for { + newHead, err := s.GetByHeight(ctx, currHeight+1) + if err != nil { break } - newHead = h - delete(s.knownHeaders, height+1) - height++ - } - - // we found higher continuous header - update. - if currHead.Height() < newHead.Height() { - // we don't need CAS here because that's the only place - // where writeHead is updated, knownHeadersLk ensures 1 goroutine. - // NOTE: Store[H].Head also updates writeHead but only once when it's nil. s.writeHead.Store(&newHead) - log.Infow("new head", "height", newHead.Height(), "hash", newHead.Hash()) - s.metrics.newHead(newHead.Height()) + currHeight++ } } diff --git a/store/store_test.go b/store/store_test.go index a0f2512f..ef9d0172 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -248,7 +248,7 @@ func TestStore_Append_stableHeadWhenGaps(t *testing.T) { err := store.Append(ctx, missedChunk...) require.NoError(t, err) // wait for batch to be written. - time.Sleep(100 * time.Millisecond) + time.Sleep(time.Second) // after appending missing headers we're on the latest header. head, err := store.Head(ctx) From a197550bc0397f4abcd6e9bbc28f6b061937d350 Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Wed, 9 Oct 2024 10:37:25 +0200 Subject: [PATCH 18/21] introduct getByHeight --- store/store.go | 12 +++++++++--- store/store_test.go | 3 +++ 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/store/store.go b/store/store.go index 9afbfe78..d7e700c1 100644 --- a/store/store.go +++ b/store/store.go @@ -243,12 +243,16 @@ func (s *Store[H]) GetByHeight(ctx context.Context, height uint64) (H, error) { return h, nil } + return s.getByHeight(ctx, height) +} + +func (s *Store[H]) getByHeight(ctx context.Context, height uint64) (H, error) { + var zero H hash, err := s.heightIndex.HashByHeight(ctx, height) if err != nil { if errors.Is(err, datastore.ErrNotFound) { return zero, header.ErrNotFound } - return zero, err } @@ -519,7 +523,8 @@ func (s *Store[H]) tryAdvanceHead(ctx context.Context, headers ...H) { if headers[i].Height() != currHeight+1 { break } - s.writeHead.Store(&headers[i]) + newHead := headers[i] + s.writeHead.Store(&newHead) currHeight++ } @@ -528,10 +533,11 @@ func (s *Store[H]) tryAdvanceHead(ctx context.Context, headers ...H) { // advance based on already written headers. for { - newHead, err := s.GetByHeight(ctx, currHeight+1) + h, err := s.getByHeight(ctx, currHeight+1) if err != nil { break } + newHead := h s.writeHead.Store(&newHead) currHeight++ } diff --git a/store/store_test.go b/store/store_test.go index ef9d0172..7d123cf9 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -190,6 +190,7 @@ func TestStore_Append(t *testing.T) { head, err = store.Head(ctx) assert.NoError(t, err) + assert.Equal(t, head.Height(), headers[len(headers)-1].Height()) assert.Equal(t, head.Hash(), headers[len(headers)-1].Hash()) } @@ -222,6 +223,7 @@ func TestStore_Append_stableHeadWhenGaps(t *testing.T) { // head is advanced to the last known header. head, err := store.Head(ctx) require.NoError(t, err) + assert.Equal(t, head.Height(), wantHead.Height()) assert.Equal(t, head.Hash(), wantHead.Hash()) // check that store height is aligned with the head. @@ -253,6 +255,7 @@ func TestStore_Append_stableHeadWhenGaps(t *testing.T) { // after appending missing headers we're on the latest header. head, err := store.Head(ctx) require.NoError(t, err) + assert.Equal(t, head.Height(), latestHead.Height()) assert.Equal(t, head.Hash(), latestHead.Hash()) // check that store height is aligned with the head. From 09d798e59c4863067b3558e17cd0646cca16e947 Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Tue, 15 Oct 2024 12:18:59 +0200 Subject: [PATCH 19/21] fix review suggestions --- store/heightsub.go | 7 ++++++- store/heightsub_test.go | 1 + store/store.go | 4 +++- 3 files changed, 10 insertions(+), 2 deletions(-) diff --git a/store/heightsub.go b/store/heightsub.go index 772933a4..a27c3b30 100644 --- a/store/heightsub.go +++ b/store/heightsub.go @@ -3,6 +3,7 @@ package store import ( "context" "errors" + "fmt" "sync" "sync/atomic" @@ -33,7 +34,7 @@ func (hs *heightSub[H]) isInited() bool { } // setHeight sets the new head height for heightSub. -// Only higher then current height will be set. +// Only higher than current height can be set. func (hs *heightSub[H]) setHeight(height uint64) { for { curr := hs.height.Load() @@ -98,6 +99,10 @@ func (hs *heightSub[H]) Pub(headers ...H) { } from, to := headers[0].Height(), headers[ln-1].Height() + if from >= to { + panic(fmt.Sprintf("from must be lower than to, have: %d and %d", from, to)) + } + hs.setHeight(to) hs.heightReqsLk.Lock() diff --git a/store/heightsub_test.go b/store/heightsub_test.go index 88f4434b..92eecec4 100644 --- a/store/heightsub_test.go +++ b/store/heightsub_test.go @@ -47,6 +47,7 @@ func TestHeightSub(t *testing.T) { } } +// Test heightSub can accept non-adj headers without a problem. func TestHeightSubNonAdjacement(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() diff --git a/store/store.go b/store/store.go index d7e700c1..0a0acaae 100644 --- a/store/store.go +++ b/store/store.go @@ -173,6 +173,7 @@ func (s *Store[H]) Height() uint64 { head, err := s.Head(ctx) if err != nil { if errors.Is(err, context.Canceled) || + errors.Is(err, context.DeadlineExceeded) || errors.Is(err, datastore.ErrNotFound) { return 0 } @@ -528,7 +529,8 @@ func (s *Store[H]) tryAdvanceHead(ctx context.Context, headers ...H) { currHeight++ } - ctx, cancel := context.WithTimeout(ctx, time.Second) + // TODO(cristaloleg): benchmark this timeout or make it dynamic. + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() // advance based on already written headers. From e552672e3079a979d68a80004414f71c4cd29e37 Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Tue, 15 Oct 2024 12:30:09 +0200 Subject: [PATCH 20/21] fix --- store/heightsub.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/store/heightsub.go b/store/heightsub.go index a27c3b30..c312f4d2 100644 --- a/store/heightsub.go +++ b/store/heightsub.go @@ -99,7 +99,7 @@ func (hs *heightSub[H]) Pub(headers ...H) { } from, to := headers[0].Height(), headers[ln-1].Height() - if from >= to { + if from > to { panic(fmt.Sprintf("from must be lower than to, have: %d and %d", from, to)) } From 4572354a79371f6f005d473691749addafd12e4e Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Thu, 17 Oct 2024 17:58:23 +0200 Subject: [PATCH 21/21] review suggestion --- sync/sync_test.go | 44 ++++++++++++++++++++++++++++---------------- 1 file changed, 28 insertions(+), 16 deletions(-) diff --git a/sync/sync_test.go b/sync/sync_test.go index 0429ab4c..78d89425 100644 --- a/sync/sync_test.go +++ b/sync/sync_test.go @@ -59,17 +59,24 @@ func TestSyncSimpleRequestingHead(t *testing.T) { require.NoError(t, err) state := syncer.State() - - ok := true - ok = ok && exp.Height() == have.Height() - ok = ok && syncer.pending.Head() == nil - - ok = ok && uint64(exp.Height()) == state.Height - ok = ok && uint64(2) == state.FromHeight - ok = ok && uint64(exp.Height()) == state.ToHeight - ok = ok && state.Finished() - - return ok + switch { + case exp.Height() != have.Height(): + return false + case syncer.pending.Head() != nil: + return false + + case uint64(exp.Height()) != state.Height: + return false + case uint64(2) != state.FromHeight: + return false + + case uint64(exp.Height()) != state.ToHeight: + return false + case !state.Finished(): + return false + default: + return true + } }, 2*time.Second, 100*time.Millisecond) } @@ -154,11 +161,16 @@ func TestSyncCatchUp(t *testing.T) { have, err := localStore.Head(ctx) require.NoError(t, err) - ok := true - ok = ok && have.Height() == incomingHead.Height() - ok = ok && exp.Height()+1 == have.Height() // plus one as we didn't add last header to remoteStore - ok = ok && syncer.pending.Head() == nil - return ok + switch { + case have.Height() != incomingHead.Height(): + return false + case exp.Height()+1 != have.Height(): // plus one as we didn't add last header to remoteStore + return false + case syncer.pending.Head() != nil: + return false + default: + return true + } }, time.Second, 100*time.Millisecond) state := syncer.State()