Skip to content

Commit

Permalink
rework heightSub
Browse files Browse the repository at this point in the history
  • Loading branch information
cristaloleg committed Jun 27, 2024
1 parent b4738e4 commit bc1a111
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 13 deletions.
17 changes: 8 additions & 9 deletions store/heightsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,13 @@ func newHeightSub[H header.Header[H]]() *heightSub[H] {
}
}

// Height reports current height.
func (hs *heightSub[H]) Height() uint64 {
return hs.height.Load()
func (hs *heightSub[H]) isInited() bool {
return hs.height.Load() != 0
}

// SetHeight sets the new head height for heightSub.
// setHeight sets the new head height for heightSub.
// Only higher then current height will be set.
func (hs *heightSub[H]) SetHeight(height uint64) {
func (hs *heightSub[H]) setHeight(height uint64) {
for {
curr := hs.height.Load()
if curr >= height {
Expand All @@ -52,12 +51,12 @@ func (hs *heightSub[H]) SetHeight(height uint64) {
// and caller should get it elsewhere.
func (hs *heightSub[H]) Sub(ctx context.Context, height uint64) (H, error) {
var zero H
if hs.Height() >= height {
if hs.height.Load() >= height {
return zero, errElapsedHeight
}

hs.heightReqsLk.Lock()
if hs.Height() >= height {
if hs.height.Load() >= height {
// This is a rare case we have to account for.
// The lock above can park a goroutine long enough for hs.height to change for a requested height,
// leaving the request never fulfilled and the goroutine deadlocked.
Expand Down Expand Up @@ -90,7 +89,7 @@ func (hs *heightSub[H]) Sub(ctx context.Context, height uint64) (H, error) {

// Pub processes all the outstanding subscriptions matching the given headers.
// Pub is only safe when called from one goroutine.
// For Pub to work correctly, heightSub has to be initialized with SetHeight
// For Pub to work correctly, heightSub has to be initialized with setHeight
// so that given headers are contiguous to the height on heightSub.
func (hs *heightSub[H]) Pub(headers ...H) {
ln := len(headers)
Expand All @@ -99,7 +98,7 @@ func (hs *heightSub[H]) Pub(headers ...H) {
}

from, to := headers[0].Height(), headers[ln-1].Height()
hs.SetHeight(to)
hs.setHeight(to)

hs.heightReqsLk.Lock()
defer hs.heightReqsLk.Unlock()
Expand Down
34 changes: 32 additions & 2 deletions store/heightsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestHeightSub(t *testing.T) {
{
h := headertest.RandDummyHeader(t)
h.HeightI = 100
hs.SetHeight(99)
hs.setHeight(99)
hs.Pub(h)

h, err := hs.Sub(ctx, 10)
Expand Down Expand Up @@ -56,7 +56,7 @@ func TestHeightSubNonAdjacement(t *testing.T) {
{
h := headertest.RandDummyHeader(t)
h.HeightI = 100
hs.SetHeight(99)
hs.setHeight(99)
hs.Pub(h)
}

Expand All @@ -78,6 +78,36 @@ func TestHeightSubNonAdjacement(t *testing.T) {
}
}

func TestHeightSub_monotonicHeight(t *testing.T) {
hs := newHeightSub[*headertest.DummyHeader]()

{
h := headertest.RandDummyHeader(t)
h.HeightI = 100
hs.setHeight(99)
hs.Pub(h)
}

{
h1 := headertest.RandDummyHeader(t)
h1.HeightI = 200
h2 := headertest.RandDummyHeader(t)
h2.HeightI = 300
hs.Pub(h1, h2)
}

{

h1 := headertest.RandDummyHeader(t)
h1.HeightI = 120
h2 := headertest.RandDummyHeader(t)
h2.HeightI = 130
hs.Pub(h1, h2)
}

assert.Equal(t, hs.height.Load(), uint64(300))
}

func TestHeightSubCancellation(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
Expand Down
5 changes: 3 additions & 2 deletions store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ type Store[H header.Header[H]] struct {
writesDn chan struct{}
// writeHead maintains the current write head
writeHead atomic.Pointer[H]

// knownHeaders tracks all processed headers
// to advance writeHead only over continuous headers.
knownHeaders map[uint64]H
// pending keeps headers pending to be written in one batch
pending *batch[H]
Expand Down Expand Up @@ -118,7 +119,7 @@ func newStore[H header.Header[H]](ds datastore.Batching, opts ...Option) (*Store
}

func (s *Store[H]) Init(ctx context.Context, initial H) error {
if s.heightSub.Height() != 0 {
if s.heightSub.isInited() {
return errors.New("store already initialized")
}
// trust the given header as the initial head
Expand Down

0 comments on commit bc1a111

Please sign in to comment.