From 8f62e7d84975e560796b9775d02131d7b363f761 Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Thu, 27 Jun 2024 11:58:39 +0200 Subject: [PATCH] make store better --- store/store.go | 57 +++++++++++++++++++++++++++++++------------------- 1 file changed, 36 insertions(+), 21 deletions(-) diff --git a/store/store.go b/store/store.go index 25c68241..d820aa60 100644 --- a/store/store.go +++ b/store/store.go @@ -54,7 +54,7 @@ type Store[H header.Header[H]] struct { // writeHead maintains the current write head writeHead atomic.Pointer[H] - knownHeights map[uint64]struct{} + knownHeaders map[uint64]H // pending keeps headers pending to be written in one batch pending *batch[H] @@ -113,7 +113,7 @@ func newStore[H header.Header[H]](ds datastore.Batching, opts ...Option) (*Store pending: newBatch[H](params.WriteBatchSize), Params: params, - knownHeights: make(map[uint64]struct{}), + knownHeaders: make(map[uint64]H), }, nil } @@ -173,24 +173,19 @@ func (s *Store[H]) Height() uint64 { return s.heightSub.Height() } +// Head returns the highest contiguous header written to the store. func (s *Store[H]) Head(ctx context.Context, _ ...header.HeadOption[H]) (H, error) { - head, err := s.GetByHeight(ctx, s.heightSub.Height()) - if err == nil { - return head, nil + headPtr := s.writeHead.Load() + if headPtr != nil { + return *headPtr, nil } - var zero H - head, err = s.readHead(ctx) - switch { - default: + head, err := s.readHead(ctx) + if err != nil { + var zero H return zero, err - case errors.Is(err, datastore.ErrNotFound), errors.Is(err, header.ErrNotFound): - return zero, header.ErrNoHead - case err == nil: - s.heightSub.SetHeight(head.Height()) - log.Infow("loaded head", "height", head.Height(), "hash", head.Hash()) - return head, nil } + return head, nil } func (s *Store[H]) Get(ctx context.Context, hash header.Hash) (H, error) { @@ -306,6 +301,9 @@ func (s *Store[H]) HasAt(_ context.Context, height uint64) bool { return height != uint64(0) && s.Height() >= height } +// Append the given headers to the store. Real write to the disk happens +// asynchronously and might fail without reporting error (just logging). +// TODO(cristaloleg): add retries to the flush worker. func (s *Store[H]) Append(ctx context.Context, headers ...H) error { lh := len(headers) if lh == 0 { @@ -325,7 +323,7 @@ func (s *Store[H]) Append(ctx context.Context, headers ...H) error { head = *headPtr } - hightestHead := head + continuousHead := head slices.SortFunc(headers, func(a, b H) int { return cmp.Compare(a.Height(), b.Height()) @@ -356,16 +354,17 @@ func (s *Store[H]) Append(ctx context.Context, headers ...H) error { verified = append(verified, h) head = h - if hightestHead.Height()+1 == head.Height() { - hightestHead = head + if continuousHead.Height()+1 == head.Height() { + continuousHead = head } else { - s.knownHeights[head.Height()] = struct{}{} + s.knownHeaders[head.Height()] = head } } onWrite := func() { - newHead := verified[len(verified)-1] - s.writeHead.Store(&hightestHead) + newHead := s.tryAdvanceHead(continuousHead) + s.writeHead.Store(&newHead) + log.Infow("new head", "height", newHead.Height(), "hash", newHead.Hash()) s.metrics.newHead(newHead.Height()) } @@ -510,6 +509,22 @@ func (s *Store[H]) get(ctx context.Context, hash header.Hash) ([]byte, error) { return data, nil } +// try advance heighest header if we saw a higher continuous before. +func (s *Store[H]) tryAdvanceHead(highestHead H) H { + curr := highestHead.Height() + + for len(s.knownHeaders) > 0 { + h, ok := s.knownHeaders[curr+1] + if !ok { + break + } + highestHead = h + delete(s.knownHeaders, curr+1) + curr++ + } + return highestHead +} + // indexTo saves mapping between header Height and Hash to the given batch. func indexTo[H header.Header[H]](ctx context.Context, batch datastore.Batch, headers ...H) error { for _, h := range headers {