Skip to content

Commit

Permalink
make store better
Browse files Browse the repository at this point in the history
  • Loading branch information
cristaloleg committed Jun 27, 2024
1 parent 8b1bbc7 commit 8f62e7d
Showing 1 changed file with 36 additions and 21 deletions.
57 changes: 36 additions & 21 deletions store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type Store[H header.Header[H]] struct {
// writeHead maintains the current write head
writeHead atomic.Pointer[H]

knownHeights map[uint64]struct{}
knownHeaders map[uint64]H
// pending keeps headers pending to be written in one batch
pending *batch[H]

Expand Down Expand Up @@ -113,7 +113,7 @@ func newStore[H header.Header[H]](ds datastore.Batching, opts ...Option) (*Store
pending: newBatch[H](params.WriteBatchSize),
Params: params,

knownHeights: make(map[uint64]struct{}),
knownHeaders: make(map[uint64]H),
}, nil
}

Expand Down Expand Up @@ -173,24 +173,19 @@ func (s *Store[H]) Height() uint64 {
return s.heightSub.Height()
}

// Head returns the highest contiguous header written to the store.
func (s *Store[H]) Head(ctx context.Context, _ ...header.HeadOption[H]) (H, error) {
head, err := s.GetByHeight(ctx, s.heightSub.Height())
if err == nil {
return head, nil
headPtr := s.writeHead.Load()
if headPtr != nil {
return *headPtr, nil
}

var zero H
head, err = s.readHead(ctx)
switch {
default:
head, err := s.readHead(ctx)
if err != nil {
var zero H
return zero, err
case errors.Is(err, datastore.ErrNotFound), errors.Is(err, header.ErrNotFound):
return zero, header.ErrNoHead
case err == nil:
s.heightSub.SetHeight(head.Height())
log.Infow("loaded head", "height", head.Height(), "hash", head.Hash())
return head, nil
}
return head, nil
}

func (s *Store[H]) Get(ctx context.Context, hash header.Hash) (H, error) {
Expand Down Expand Up @@ -306,6 +301,9 @@ func (s *Store[H]) HasAt(_ context.Context, height uint64) bool {
return height != uint64(0) && s.Height() >= height
}

// Append the given headers to the store. Real write to the disk happens
// asynchronously and might fail without reporting error (just logging).
// TODO(cristaloleg): add retries to the flush worker.
func (s *Store[H]) Append(ctx context.Context, headers ...H) error {
lh := len(headers)
if lh == 0 {
Expand All @@ -325,7 +323,7 @@ func (s *Store[H]) Append(ctx context.Context, headers ...H) error {
head = *headPtr
}

hightestHead := head
continuousHead := head

slices.SortFunc(headers, func(a, b H) int {
return cmp.Compare(a.Height(), b.Height())
Expand Down Expand Up @@ -356,16 +354,17 @@ func (s *Store[H]) Append(ctx context.Context, headers ...H) error {
verified = append(verified, h)
head = h

if hightestHead.Height()+1 == head.Height() {
hightestHead = head
if continuousHead.Height()+1 == head.Height() {
continuousHead = head
} else {
s.knownHeights[head.Height()] = struct{}{}
s.knownHeaders[head.Height()] = head
}
}

onWrite := func() {
newHead := verified[len(verified)-1]
s.writeHead.Store(&hightestHead)
newHead := s.tryAdvanceHead(continuousHead)
s.writeHead.Store(&newHead)

log.Infow("new head", "height", newHead.Height(), "hash", newHead.Hash())
s.metrics.newHead(newHead.Height())
}
Expand Down Expand Up @@ -510,6 +509,22 @@ func (s *Store[H]) get(ctx context.Context, hash header.Hash) ([]byte, error) {
return data, nil
}

// try advance heighest header if we saw a higher continuous before.
func (s *Store[H]) tryAdvanceHead(highestHead H) H {
curr := highestHead.Height()

for len(s.knownHeaders) > 0 {
h, ok := s.knownHeaders[curr+1]
if !ok {
break
}
highestHead = h
delete(s.knownHeaders, curr+1)
curr++
}
return highestHead
}

// indexTo saves mapping between header Height and Hash to the given batch.
func indexTo[H header.Header[H]](ctx context.Context, batch datastore.Batch, headers ...H) error {
for _, h := range headers {
Expand Down

0 comments on commit 8f62e7d

Please sign in to comment.