Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor!: Merge GetVerifiedRange and GetRangeByHeight in Getter interface and remove redundant endpoint #117

Merged
merged 6 commits into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 14 additions & 10 deletions headertest/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,20 @@ func (m *Store[H]) GetByHeight(ctx context.Context, height uint64) (H, error) {
return m.Headers[height], nil
}

func (m *Store[H]) GetRangeByHeight(ctx context.Context, from, to uint64) ([]H, error) {
headers := make([]H, to-from)
func (m *Store[H]) GetRange(ctx context.Context, from, to uint64) ([]H, error) {
return m.getRangeByHeight(ctx, from, to)
}

// GetRangeByHeight returns headers in range [from; to).
func (m *Store[H]) GetRangeByHeight(ctx context.Context, fromHead H, to uint64) ([]H, error) {
from := fromHead.Height() + 1
return m.getRangeByHeight(ctx, from, to)
}

func (m *Store[H]) getRangeByHeight(ctx context.Context, from, to uint64) ([]H, error) {
amount := to - from
headers := make([]H, amount)

// As the requested range is [from; to),
// check that (to-1) height in request is less than
// the biggest header height in store.
Expand All @@ -79,14 +91,6 @@ func (m *Store[H]) GetRangeByHeight(ctx context.Context, from, to uint64) ([]H,
return headers, nil
}

func (m *Store[H]) GetVerifiedRange(
ctx context.Context,
h H,
to uint64,
) ([]H, error) {
return m.GetRangeByHeight(ctx, uint64(h.Height())+1, to)
}

func (m *Store[H]) Has(context.Context, header.Hash) (bool, error) {
return false, nil
}
Expand Down
11 changes: 6 additions & 5 deletions interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ type Store[H Header[H]] interface {
// It returns the amount of successfully applied headers,
// so caller can understand what given header was invalid, if any.
Append(context.Context, ...H) error

// GetRange returns the range [from:to).
GetRange(context.Context, uint64, uint64) ([]H, error)
}

// Getter contains the behavior necessary for a component to retrieve
Expand All @@ -106,12 +109,10 @@ type Getter[H Header[H]] interface {
// GetByHeight returns the Header corresponding to the given block height.
GetByHeight(context.Context, uint64) (H, error)

// GetRangeByHeight returns the given range of Headers.
GetRangeByHeight(ctx context.Context, from, amount uint64) ([]H, error)

// GetVerifiedRange requests the header range from the provided Header and
// GetRangeByHeight requests the header range from the provided Header and
// verifies that the returned headers are adjacent to each other.
GetVerifiedRange(ctx context.Context, from H, amount uint64) ([]H, error)
// Expected to return the range [from.Height()+1:to).
GetRangeByHeight(ctx context.Context, from H, to uint64) ([]H, error)
}

// Head contains the behavior necessary for a component to retrieve
Expand Down
11 changes: 2 additions & 9 deletions local/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,9 @@ func (l *Exchange[H]) GetByHeight(ctx context.Context, height uint64) (H, error)
return l.store.GetByHeight(ctx, height)
}

func (l *Exchange[H]) GetRangeByHeight(ctx context.Context, origin, amount uint64) ([]H, error) {
if amount == 0 {
return nil, nil
}
return l.store.GetRangeByHeight(ctx, origin, origin+amount)
}

func (l *Exchange[H]) GetVerifiedRange(ctx context.Context, from H, amount uint64,
func (l *Exchange[H]) GetRangeByHeight(ctx context.Context, from H, to uint64,
) ([]H, error) {
return l.store.GetVerifiedRange(ctx, from, uint64(from.Height())+amount+1)
return l.store.GetRangeByHeight(ctx, from, to)
}

func (l *Exchange[H]) Get(ctx context.Context, hash header.Hash) (H, error) {
Expand Down
26 changes: 5 additions & 21 deletions p2p/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,36 +213,20 @@ func (ex *Exchange[H]) GetByHeight(ctx context.Context, height uint64) (H, error
return headers[0], nil
}

// GetRangeByHeight performs a request for the given range of Headers
// to the network. Note that the Headers must be verified thereafter.
func (ex *Exchange[H]) GetRangeByHeight(ctx context.Context, from, amount uint64) ([]H, error) {
if amount == 0 {
return make([]H, 0), nil
}
if amount > header.MaxRangeRequestSize {
return nil, header.ErrHeadersLimitExceeded
}
session := newSession[H](ex.ctx, ex.host, ex.peerTracker, ex.protocolID, ex.Params.RangeRequestTimeout)
defer session.close()
return session.getRangeByHeight(ctx, from, amount, ex.Params.MaxHeadersPerRangeRequest)
}

// GetVerifiedRange performs a request for the given range of Headers to the network and
// GetRangeByHeight performs a request for the given range of Headers to the network and
// ensures that returned headers are correct against the passed one.
func (ex *Exchange[H]) GetVerifiedRange(
func (ex *Exchange[H]) GetRangeByHeight(
ctx context.Context,
from H,
amount uint64,
to uint64,
) ([]H, error) {
if amount == 0 {
return make([]H, 0), nil
}
session := newSession[H](
ex.ctx, ex.host, ex.peerTracker, ex.protocolID, ex.Params.RangeRequestTimeout, withValidation(from),
)
defer session.close()
// we request the next header height that we don't have: `fromHead`+1
return session.getRangeByHeight(ctx, uint64(from.Height())+1, amount, ex.Params.MaxHeadersPerRangeRequest)
amount := to - (from.Height() + 1)
return session.getRangeByHeight(ctx, from.Height()+1, amount, ex.Params.MaxHeadersPerRangeRequest)
renaynay marked this conversation as resolved.
Show resolved Hide resolved
}

// Get performs a request for the Header by the given hash corresponding
Expand Down
83 changes: 50 additions & 33 deletions p2p/exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,36 +166,46 @@ func TestExchange_RequestHeader(t *testing.T) {
assert.Equal(t, store.Headers[5].Hash(), header.Hash())
}

func TestExchange_RequestHeaders(t *testing.T) {
func TestExchange_GetRangeByHeight(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

hosts := createMocknet(t, 2)
exchg, store := createP2PExAndServer(t, hosts[0], hosts[1])

from, err := store.GetByHeight(ctx, 1)
require.NoError(t, err)

firstHeaderInRangeHeight := from.Height() + 1
lastHeaderInRangeHeight := uint64(4)
to := lastHeaderInRangeHeight + 1
expectedLenHeaders := to - firstHeaderInRangeHeight // expected amount

// perform expected request
gotHeaders, err := exchg.GetRangeByHeight(context.Background(), 1, 5)
gotHeaders, err := exchg.GetRangeByHeight(ctx, from, to)
require.NoError(t, err)

assert.Len(t, gotHeaders, int(expectedLenHeaders))
assert.Equal(t, firstHeaderInRangeHeight, gotHeaders[0].Height())
assert.Equal(t, lastHeaderInRangeHeight, gotHeaders[len(gotHeaders)-1].Height())

for _, got := range gotHeaders {
assert.Equal(t, store.Headers[got.Height()].Height(), got.Height())
assert.Equal(t, store.Headers[got.Height()].Hash(), got.Hash())
}
}

func TestExchange_RequestVerifiedHeaders(t *testing.T) {
func TestExchange_GetRangeByHeight_FailsVerification(t *testing.T) {
hosts := createMocknet(t, 2)
exchg, store := createP2PExAndServer(t, hosts[0], hosts[1])
// perform expected request
h := store.Headers[1]
_, err := exchg.GetVerifiedRange(context.Background(), h, 3)
require.NoError(t, err)
}

func TestExchange_RequestVerifiedHeadersFails(t *testing.T) {
hosts := createMocknet(t, 2)
exchg, store := createP2PExAndServer(t, hosts[0], hosts[1])
store.Headers[2] = store.Headers[3]
store.Headers[3].VerifyFailure = true // force a verification failure on the 3rd header
// perform expected request
h := store.Headers[1]
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500)
t.Cleanup(cancel)
_, err := exchg.GetVerifiedRange(ctx, h, 3)

// requests range (1:4)
_, err := exchg.GetRangeByHeight(ctx, h, 4)
assert.Error(t, err)

// ensure that peer was added to the blacklist
Expand All @@ -209,7 +219,7 @@ func TestExchange_RequestVerifiedHeadersFails(t *testing.T) {
func TestExchange_RequestFullRangeHeaders(t *testing.T) {
// create mocknet with 5 peers
hosts := createMocknet(t, 5)
store := headertest.NewStore[*headertest.DummyHeader](t, headertest.NewTestSuite(t), int(header.MaxRangeRequestSize))
store := headertest.NewStore[*headertest.DummyHeader](t, headertest.NewTestSuite(t), int(header.MaxRangeRequestSize)+1)
connGater, err := conngater.NewBasicConnectionGater(sync.MutexWrap(datastore.NewMapDatastore()))
require.NoError(t, err)

Expand Down Expand Up @@ -238,28 +248,23 @@ func TestExchange_RequestFullRangeHeaders(t *testing.T) {

ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
t.Cleanup(cancel)
// request headers from 1 to totalAmount(80)
headers, err := exchange.GetRangeByHeight(ctx, 1, header.MaxRangeRequestSize)

gen, err := store.GetByHeight(ctx, 1)
require.NoError(t, err)
require.Len(t, headers, int(header.MaxRangeRequestSize))
}

// TestExchange_RequestHeadersLimitExceeded tests that the Exchange instance will return
// header.ErrHeadersLimitExceeded if the requested range will be move than MaxRangeRequestSize.
func TestExchange_RequestHeadersLimitExceeded(t *testing.T) {
hosts := createMocknet(t, 2)
exchg, _ := createP2PExAndServer(t, hosts[0], hosts[1])
_, err := exchg.GetRangeByHeight(context.Background(), 1, 600)
require.Error(t, err)
require.ErrorAs(t, err, &header.ErrHeadersLimitExceeded)
// request the max amount of headers
to := gen.Height() + header.MaxRangeRequestSize + 1 // add 1 to account for `from` being inclusive
headers, err := exchange.GetRangeByHeight(ctx, gen, to)
require.NoError(t, err)
assert.Len(t, headers, int(header.MaxRangeRequestSize))
}

// TestExchange_RequestHeadersFromAnotherPeer tests that the Exchange instance will request range
// from another peer with lower score after receiving header.ErrNotFound
func TestExchange_RequestHeadersFromAnotherPeer(t *testing.T) {
hosts := createMocknet(t, 3)
// create client + server(it does not have needed headers)
exchg, _ := createP2PExAndServer(t, hosts[0], hosts[1])
exchg, store := createP2PExAndServer(t, hosts[0], hosts[1])
// create one more server(with more headers in the store)
serverSideEx, err := NewExchangeServer[*headertest.DummyHeader](
hosts[2], headertest.NewStore[*headertest.DummyHeader](t, headertest.NewTestSuite(t), 10),
Expand All @@ -273,7 +278,11 @@ func TestExchange_RequestHeadersFromAnotherPeer(t *testing.T) {
exchg.peerTracker.peerLk.Lock()
exchg.peerTracker.trackedPeers[hosts[2].ID()] = &peerStat{peerID: hosts[2].ID(), peerScore: 20}
exchg.peerTracker.peerLk.Unlock()
_, err = exchg.GetRangeByHeight(context.Background(), 5, 3)

h, err := store.GetByHeight(context.Background(), 5)
require.NoError(t, err)

_, err = exchg.GetRangeByHeight(context.Background(), h, 8)
require.NoError(t, err)
// ensure that peerScore for the second peer is changed
newPeerScore := exchg.peerTracker.trackedPeers[hosts[2].ID()].score()
Expand Down Expand Up @@ -472,7 +481,7 @@ func TestExchange_RequestHeadersFromAnotherPeerWhenTimeout(t *testing.T) {
dial(swarm1, swarm2)

// create client + server(it does not have needed headers)
exchg, _ := createP2PExAndServer(t, host0, host1)
exchg, store := createP2PExAndServer(t, host0, host1)
exchg.Params.RangeRequestTimeout = time.Millisecond * 100
// create one more server(with more headers in the store)
serverSideEx, err := NewExchangeServer[*headertest.DummyHeader](
Expand All @@ -490,7 +499,11 @@ func TestExchange_RequestHeadersFromAnotherPeerWhenTimeout(t *testing.T) {
exchg.peerTracker.peerLk.Lock()
exchg.peerTracker.trackedPeers[host2.ID()] = &peerStat{peerID: host2.ID(), peerScore: 200}
exchg.peerTracker.peerLk.Unlock()
_, err = exchg.GetRangeByHeight(context.Background(), 1, 3)

gen, err := store.GetByHeight(context.Background(), 1)
require.NoError(t, err)

_, err = exchg.GetRangeByHeight(context.Background(), gen, 3)
require.NoError(t, err)
newPeerScore := exchg.peerTracker.trackedPeers[host1.ID()].score()
assert.NotEqual(t, newPeerScore, prevScore)
Expand All @@ -500,7 +513,7 @@ func TestExchange_RequestHeadersFromAnotherPeerWhenTimeout(t *testing.T) {
// from server, Exchange will re-request remaining headers from another peer
func TestExchange_RequestPartialRange(t *testing.T) {
hosts := createMocknet(t, 3)
exchg, _ := createP2PExAndServer(t, hosts[0], hosts[1])
exchg, store := createP2PExAndServer(t, hosts[0], hosts[1])

// create one more server(with more headers in the store)
serverSideEx, err := NewExchangeServer[*headertest.DummyHeader](
Expand All @@ -518,7 +531,11 @@ func TestExchange_RequestPartialRange(t *testing.T) {
// reducing peerScore of the second server, so our exchange will request host[1] first.
exchg.peerTracker.trackedPeers[hosts[2].ID()] = &peerStat{peerID: hosts[2].ID(), peerScore: 50}
exchg.peerTracker.peerLk.Unlock()
h, err := exchg.GetRangeByHeight(ctx, 1, 8)

gen, err := store.GetByHeight(context.Background(), 1)
require.NoError(t, err)

h, err := exchg.GetRangeByHeight(ctx, gen, 8)
require.NotNil(t, h)
require.NoError(t, err)

Expand Down
4 changes: 2 additions & 2 deletions p2p/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (serv *ExchangeServer[H]) handleRequest(from, to uint64) ([]H, error) {
}

// might be a case when store hasn't synced yet to the requested range
if uint64(head.Height()) < from {
if head.Height() < from {
span.SetStatus(codes.Error, header.ErrNotFound.Error())
log.Debugw("server: requested headers not stored",
"from", from,
Expand All @@ -229,7 +229,7 @@ func (serv *ExchangeServer[H]) handleRequest(from, to uint64) ([]H, error) {
to = uint64(head.Height()) + 1
}

headersByRange, err := serv.store.GetRangeByHeight(ctx, from, to)
headersByRange, err := serv.store.GetRange(ctx, from, to)
renaynay marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
span.SetStatus(codes.Error, err.Error())
if errors.Is(err, context.DeadlineExceeded) {
Expand Down
21 changes: 21 additions & 0 deletions p2p/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/ipfs/go-datastore"
"github.com/stretchr/testify/require"

"github.com/celestiaorg/go-header"
"github.com/celestiaorg/go-header/headertest"
"github.com/celestiaorg/go-header/store"
)
Expand All @@ -30,3 +31,23 @@ func TestExchangeServer_handleRequestTimeout(t *testing.T) {
_, err = server.handleRequest(1, 200)
require.Error(t, err)
}

func TestExchangeServer_errorsOnLargeRequest(t *testing.T) {
peer := createMocknet(t, 1)
s, err := store.NewStore[*headertest.DummyHeader](datastore.NewMapDatastore())
require.NoError(t, err)
server, err := NewExchangeServer[*headertest.DummyHeader](
peer[0],
s,
WithNetworkID[ServerParameters](networkID),
)
require.NoError(t, err)
err = server.Start(context.Background())
require.NoError(t, err)
t.Cleanup(func() {
server.Stop(context.Background()) //nolint:errcheck
})

_, err = server.handleRequest(1, header.MaxRangeRequestSize*2)
require.Error(t, err)
}
2 changes: 1 addition & 1 deletion p2p/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func (s *session[H]) doRequest(
responseLn := uint64(len(h))
// ensure that we received the correct amount of headers.
if responseLn < req.Amount {
from := uint64(h[responseLn-1].Height())
from := h[responseLn-1].Height()
amount := req.Amount - responseLn

select {
Expand Down
Loading