Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement certificate exchange #378

Merged
merged 44 commits into from
Jul 4, 2024
Merged
Show file tree
Hide file tree
Changes from 40 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
d42630d
Implement certificate exchange
Stebalien Jun 3, 2024
e0083a0
revert golang version bump
Stebalien Jun 27, 2024
adbf3b3
certexchange: []PowerEntry -> PowerEntries
Stebalien Jun 27, 2024
9435cb5
certexchange: better handle setup errors
Stebalien Jun 27, 2024
c2a53ec
fix generated files
Stebalien Jun 27, 2024
75692d8
remove unused parameter
Stebalien Jun 27, 2024
046577d
remove preferential new-peer handling
Stebalien Jun 28, 2024
a8fdf91
remove time from the interval predictor
Stebalien Jun 28, 2024
139089c
basic predictor test
Stebalien Jun 28, 2024
5a3dbe0
test and improve predictor convergence
Stebalien Jun 28, 2024
e9a2f26
improve convergence test
Stebalien Jun 28, 2024
1b159ad
test hit/miss tracking
Stebalien Jun 28, 2024
665d845
improve initialization
Stebalien Jun 28, 2024
ce583da
test peer tracker
Stebalien Jun 28, 2024
a010357
fix include power-table bug
Stebalien Jul 1, 2024
9d60913
remove special-casing for zero limit
Stebalien Jul 1, 2024
81427cf
basic client/server test
Stebalien Jul 1, 2024
0aa7d56
don't return instances beyond pending, even if we have them
Stebalien Jul 1, 2024
84ea978
two final protocol tests
Stebalien Jul 1, 2024
661145b
fix request cancellation logic
Stebalien Jul 1, 2024
35026c4
poll result stringer
Stebalien Jul 1, 2024
44cd17c
test poller
Stebalien Jul 1, 2024
7f0e6a0
fixes and test subscriber
Stebalien Jul 1, 2024
0d47580
lints
Stebalien Jul 1, 2024
4219e2d
remove test logging
Stebalien Jul 2, 2024
be373d0
fix lints
Stebalien Jul 2, 2024
0a09e4f
fix peer-tracker off-by-one bug
Stebalien Jul 2, 2024
a87bfe9
don't require local stringer install
Stebalien Jul 4, 2024
8fa98b0
Merge branch 'main' into steb/certexchange
Stebalien Jul 4, 2024
bec6c3d
remove test-only method
Stebalien Jul 4, 2024
5ca20b8
use a mock clock
Stebalien Jul 4, 2024
89a32fa
improve timing reliability
Stebalien Jul 4, 2024
13e3283
improved logging
Stebalien Jul 4, 2024
a0daf56
improve test reliability
Stebalien Jul 4, 2024
4bdf36e
cleanup
Stebalien Jul 4, 2024
33e8117
implement GC
Stebalien Jul 4, 2024
f78af6e
latency tracking
Stebalien Jul 4, 2024
e4086e2
improve reliability of subscribe tests
Stebalien Jul 4, 2024
dd747a7
fix comment
Stebalien Jul 4, 2024
d0f2fd6
fix build
Stebalien Jul 4, 2024
8f386bf
spelling
Stebalien Jul 4, 2024
6dd8e20
fix heap usage
Stebalien Jul 4, 2024
f0ab0fa
test heap
Stebalien Jul 4, 2024
c050802
make the subscribe test pass more reliably
Stebalien Jul 4, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 142 additions & 0 deletions certexchange/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package certexchange

import (
"bufio"
"context"
"fmt"
"io"
"runtime/debug"
"time"

"github.com/filecoin-project/go-f3"
"github.com/filecoin-project/go-f3/certs"
"github.com/filecoin-project/go-f3/gpbft"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
)

// We've estimated the max power table size to be less than 1MiB:
//
// 1. For 10k participants.
// 2. <100 bytes per entry (key + id + power)
const maxPowerTableSize = 1024 * 1024

// Client is a libp2p certificate exchange client for requesting finality certificates from specific
// peers.
type Client struct {
Host host.Host
NetworkName gpbft.NetworkName
RequestTimeout time.Duration

Log f3.Logger
}

func (c *Client) withDeadline(ctx context.Context) (context.Context, context.CancelFunc) {
if c.RequestTimeout > 0 {
return context.WithTimeout(ctx, c.RequestTimeout)

Check warning on line 36 in certexchange/client.go

View check run for this annotation

Codecov / codecov/patch

certexchange/client.go#L36

Added line #L36 was not covered by tests
}
return context.WithCancel(ctx)
}

// Request finality certificates from the specified peer. Returned finality certificates start at
// the requested instance number and are sequential, but are otherwise unvalidated.
func (c *Client) Request(ctx context.Context, p peer.ID, req *Request) (_rh *ResponseHeader, _ch <-chan *certs.FinalityCertificate, _err error) {
defer func() {
if perr := recover(); perr != nil {
_err = fmt.Errorf("panicked requesting certificates from peer %s: %v\n%s", p, perr, string(debug.Stack()))
c.Log.Error(_err)

Check warning on line 47 in certexchange/client.go

View check run for this annotation

Codecov / codecov/patch

certexchange/client.go#L46-L47

Added lines #L46 - L47 were not covered by tests
}
}()

ctx, cancel := c.withDeadline(ctx)
defer func() {
if cancel != nil {
cancel()
}
}()

proto := FetchProtocolName(c.NetworkName)
stream, err := c.Host.NewStream(ctx, p, proto)
if err != nil {
return nil, nil, err

Check warning on line 61 in certexchange/client.go

View check run for this annotation

Codecov / codecov/patch

certexchange/client.go#L61

Added line #L61 was not covered by tests
}
// Reset the stream if the parent context is canceled. We never call the returned stop
// function because we call the cancel function returned by `withDeadline` (which cancels
// the entire context tree).
context.AfterFunc(ctx, func() { _ = stream.Reset() })

if deadline, ok := ctx.Deadline(); ok {
if err := stream.SetDeadline(deadline); err != nil {
return nil, nil, err

Check warning on line 70 in certexchange/client.go

View check run for this annotation

Codecov / codecov/patch

certexchange/client.go#L69-L70

Added lines #L69 - L70 were not covered by tests
}
}

br := &io.LimitedReader{R: bufio.NewReader(stream), N: 100}
bw := bufio.NewWriter(stream)

if err := req.MarshalCBOR(bw); err != nil {
c.Log.Debugf("failed to marshal certificate exchange request to peer %s: %w", p, err)
return nil, nil, err

Check warning on line 79 in certexchange/client.go

View check run for this annotation

Codecov / codecov/patch

certexchange/client.go#L78-L79

Added lines #L78 - L79 were not covered by tests
}
if err := bw.Flush(); err != nil {
return nil, nil, err

Check warning on line 82 in certexchange/client.go

View check run for this annotation

Codecov / codecov/patch

certexchange/client.go#L82

Added line #L82 was not covered by tests
}
if err := stream.CloseWrite(); err != nil {
return nil, nil, err
}

var resp ResponseHeader
if req.IncludePowerTable {
br.N = maxPowerTableSize
}
err = resp.UnmarshalCBOR(br)
if err != nil {
c.Log.Debugf("failed to unmarshal certificate exchange response header from peer %s: %w", p, err)
return nil, nil, err

Check warning on line 95 in certexchange/client.go

View check run for this annotation

Codecov / codecov/patch

certexchange/client.go#L94-L95

Added lines #L94 - L95 were not covered by tests
}

ch := make(chan *certs.FinalityCertificate, 1)
// copy this in case the caller decides to re-use the request object...
request := *req

// Copy/replace the cancel func so exiting the request doesn't cancel it.
cancelReq := cancel
cancel = nil
go func() {
defer func() {
if perr := recover(); perr != nil {
c.Log.Errorf("panicked while receiving certificates from peer %s: %v\n%s", p, perr, string(debug.Stack()))

Check warning on line 108 in certexchange/client.go

View check run for this annotation

Codecov / codecov/patch

certexchange/client.go#L108

Added line #L108 was not covered by tests
}
cancelReq()
close(ch)
}()
for i := uint64(0); i < request.Limit; i++ {
cert := new(certs.FinalityCertificate)

// We'll read at most 1MiB per certificate. They generally shouldn't be that
// large, but large power deltas could get close.
br.N = maxPowerTableSize
err := cert.UnmarshalCBOR(br)
switch err {
case nil:
case io.EOF:
return
default:
c.Log.Debugf("failed to unmarshal certificate from peer %s: %w", p, err)
return
}
// One quick sanity check. The rest will be validated by the caller.
if cert.GPBFTInstance != request.FirstInstance+i {
c.Log.Warnf("received out-of-order certificate from peer %s", p)
return

Check warning on line 131 in certexchange/client.go

View check run for this annotation

Codecov / codecov/patch

certexchange/client.go#L130-L131

Added lines #L130 - L131 were not covered by tests
}

select {
case <-ctx.Done():
return

Check warning on line 136 in certexchange/client.go

View check run for this annotation

Codecov / codecov/patch

certexchange/client.go#L135-L136

Added lines #L135 - L136 were not covered by tests
case ch <- cert:
}
}
}()
return &resp, ch, nil
}
239 changes: 239 additions & 0 deletions certexchange/gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions certexchange/polling/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package polling

import (
"github.com/benbjohnson/clock"
)

var clk clock.Clock = clock.New()
Loading
Loading