Skip to content

Commit

Permalink
feat(sync, store)!: Remove adjacency requirement from Store.Append (#186
Browse files Browse the repository at this point in the history
)

## Overview

Things that gonna change after we drop adjacency requirement from
`Store.Append`:
- Obviously dropping `ErrNonAdjacent` (breaking change).
- `Store.Append` comment is **already** outdated (looks like there were
2 return params)
- `store.Store.Append` will not verify for adjacency (obviously)
- `Syncer.setSubjectiveHead` will not verify for `ErrNonAdjacent`
- adjacency check is happening in `sync` instead of `store`.

Fixes #32
  • Loading branch information
cristaloleg authored Jun 10, 2024
1 parent 5872766 commit 72d6506
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 25 deletions.
15 changes: 0 additions & 15 deletions interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package header
import (
"context"
"errors"
"fmt"

pubsub "github.com/libp2p/go-libp2p-pubsub"
)
Expand Down Expand Up @@ -59,16 +58,6 @@ var (
ErrHeadersLimitExceeded = errors.New("header/p2p: header limit per 1 request exceeded")
)

// ErrNonAdjacent is returned when Store is appended with a header not adjacent to the stored head.
type ErrNonAdjacent struct {
Head uint64
Attempted uint64
}

func (ena *ErrNonAdjacent) Error() string {
return fmt.Sprintf("header/store: non-adjacent: head %d, attempted %d", ena.Head, ena.Attempted)
}

// Store encompasses the behavior necessary to store and retrieve Headers
// from a node's local storage.
type Store[H Header[H]] interface {
Expand All @@ -88,10 +77,6 @@ type Store[H Header[H]] interface {
HasAt(context.Context, uint64) bool

// Append stores and verifies the given Header(s).
// It requires them to be adjacent and in ascending order,
// as it applies them contiguously on top of the current head height.
// It returns the amount of successfully applied headers,
// so caller can understand what given header was invalid, if any.
Append(context.Context, ...H) error

// GetRange returns the range [from:to).
Expand Down
11 changes: 2 additions & 9 deletions store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,14 +322,6 @@ func (s *Store[H]) Append(ctx context.Context, headers ...H) error {
// collect valid headers
verified := make([]H, 0, lh)
for i, h := range headers {
// currently store requires all headers to be appended sequentially and adjacently
// TODO(@Wondertan): Further pruning friendly Store design should reevaluate this requirement
if h.Height() != head.Height()+1 {
return &header.ErrNonAdjacent{
Head: head.Height(),
Attempted: h.Height(),
}
}

err = head.Verify(h)
if err != nil {
Expand All @@ -350,7 +342,8 @@ func (s *Store[H]) Append(ctx context.Context, headers ...H) error {
// otherwise, stop the loop and apply headers appeared to be valid
break
}
verified, head = append(verified, h), h
verified = append(verified, h)
head = h
}

onWrite := func() {
Expand Down
2 changes: 1 addition & 1 deletion sync/sync_head.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (s *Syncer[H]) setSubjectiveHead(ctx context.Context, netHead H) {
// * Remove ErrNonAdjacent
// * Remove writeHead from the canonical store implementation
err := s.store.Append(ctx, netHead)
var nonAdj *header.ErrNonAdjacent
var nonAdj *errNonAdjacent
if err != nil && !errors.As(err, &nonAdj) {
// might be a storage error or something else, but we can still try to continue processing netHead
log.Errorw("storing new network header",
Expand Down
31 changes: 31 additions & 0 deletions sync/sync_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,23 @@ package sync

import (
"context"
"errors"
"fmt"
"sync/atomic"

"github.com/celestiaorg/go-header"
)

// errNonAdjacent is returned when syncer is appended with a header not adjacent to the stored head.
type errNonAdjacent struct {
Head uint64
Attempted uint64
}

func (ena *errNonAdjacent) Error() string {
return fmt.Sprintf("sync: non-adjacent: head %d, attempted %d", ena.Head, ena.Attempted)
}

// syncStore is a Store wrapper that provides synchronization over writes and reads
// for Head of underlying Store. Useful for Stores that do not guarantee synchrony between Append
// and Head method.
Expand All @@ -31,6 +43,25 @@ func (s *syncStore[H]) Head(ctx context.Context) (H, error) {
}

func (s *syncStore[H]) Append(ctx context.Context, headers ...H) error {
if len(headers) == 0 {
return nil
}

head, err := s.Head(ctx)
if err != nil && !errors.Is(err, context.Canceled) {
panic(err)
}

for _, h := range headers {
if h.Height() != head.Height()+1 {
return &errNonAdjacent{
Head: head.Height(),
Attempted: h.Height(),
}
}
head = h
}

if err := s.Store.Append(ctx, headers...); err != nil {
return err
}
Expand Down

0 comments on commit 72d6506

Please sign in to comment.