Skip to content

Commit

Permalink
no more knownHeaders map
Browse files Browse the repository at this point in the history
  • Loading branch information
cristaloleg committed Oct 7, 2024
1 parent 7bb1d5c commit 61a6b31
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 37 deletions.
56 changes: 20 additions & 36 deletions store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"errors"
"fmt"
"slices"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -55,11 +54,6 @@ type Store[H header.Header[H]] struct {
// writeHead maintains the current write head
writeHead atomic.Pointer[H]

knownHeadersLk sync.Mutex
// knownHeaders tracks all processed headers
// to advance writeHead only over continuous headers.
knownHeaders map[uint64]H

// pending keeps headers pending to be written in one batch
pending *batch[H]

Expand Down Expand Up @@ -117,8 +111,6 @@ func newStore[H header.Header[H]](ds datastore.Batching, opts ...Option) (*Store
writesDn: make(chan struct{}),
pending: newBatch[H](params.WriteBatchSize),
Params: params,

knownHeaders: make(map[uint64]H),
}, nil
}

Expand Down Expand Up @@ -423,7 +415,7 @@ func (s *Store[H]) flushLoop() {
time.Sleep(sleep)
}

s.tryAdvanceHead(toFlush...)
s.tryAdvanceHead(ctx, toFlush...)

s.metrics.flush(ctx, time.Since(startTime), s.pending.Len(), false)
// reset pending
Expand Down Expand Up @@ -513,43 +505,35 @@ func (s *Store[H]) get(ctx context.Context, hash header.Hash) ([]byte, error) {
return data, nil
}

// try advance heighest header if we saw a higher continuous before.
func (s *Store[H]) tryAdvanceHead(headers ...H) {
headPtr := s.writeHead.Load()
if headPtr == nil || len(headers) == 0 {
// try advance heighest writeHead based on passed or already written headers.
func (s *Store[H]) tryAdvanceHead(ctx context.Context, headers ...H) {
writeHead := s.writeHead.Load()
if writeHead == nil || len(headers) == 0 {
return
}

s.knownHeadersLk.Lock()
defer s.knownHeadersLk.Unlock()
currHeight := (*writeHead).Height()

for _, h := range headers {
s.knownHeaders[h.Height()] = h
// advance based on passed headers.
for i := 0; i < len(headers); i++ {
if headers[i].Height() != currHeight+1 {
break
}
s.writeHead.Store(&headers[i])
currHeight++
}

currHead := *headPtr
height := currHead.Height()
newHead := currHead
ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()

// try to move to the next height.
for len(s.knownHeaders) > 0 {
h, ok := s.knownHeaders[height+1]
if !ok {
// advance based on already written headers.
for {
newHead, err := s.GetByHeight(ctx, currHeight+1)
if err != nil {
break
}
newHead = h
delete(s.knownHeaders, height+1)
height++
}

// we found higher continuous header - update.
if currHead.Height() < newHead.Height() {
// we don't need CAS here because that's the only place
// where writeHead is updated, knownHeadersLk ensures 1 goroutine.
// NOTE: Store[H].Head also updates writeHead but only once when it's nil.
s.writeHead.Store(&newHead)
log.Infow("new head", "height", newHead.Height(), "hash", newHead.Hash())
s.metrics.newHead(newHead.Height())
currHeight++
}
}

Expand Down
2 changes: 1 addition & 1 deletion store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func TestStore_Append_stableHeadWhenGaps(t *testing.T) {
err := store.Append(ctx, missedChunk...)
require.NoError(t, err)
// wait for batch to be written.
time.Sleep(100 * time.Millisecond)
time.Sleep(time.Second)

// after appending missing headers we're on the latest header.
head, err := store.Head(ctx)
Expand Down

0 comments on commit 61a6b31

Please sign in to comment.