Skip to content

Commit

Permalink
Implement certificate exchange (#378)
Browse files Browse the repository at this point in the history
* Implement certificate exchange

This implements a basic certificate exchange protocol (which still needs
tests and is probably slightly broken at the moment).

Importantly, it's missing the ability to fetch power table deltas for
validating future instances (beyond the latest certificate). My plan is
to implement this as a somewhat separate protocol (likely re-using a lot
of the same machinery). However:

1. That protocol is only needed for observer nodes. Active participants
in the network will follow the EC chain and will learn these power
tables through the EC chain.
2. That protocol won't need as much guessing because we'll _know_ which
power tables should be available given the latest certificate we've
received.

The large remaining TODOs are tests and design documentation. The entire
protocol has been in constant flux so... I'm sure there are some
inconsistencies...

* revert golang version bump

* certexchange: []PowerEntry -> PowerEntries

* certexchange: better handle setup errors

* fix generated files

* remove unused parameter

* remove preferential new-peer handling

It was necessary when we subscribed to new certificates, but not anymore.

* remove time from the interval predictor

It makes it harder to test.

* basic predictor test

* test and improve predictor convergence

* improve convergence test

* test hit/miss tracking

* improve initialization

* test peer tracker

* fix include power-table bug

* remove special-casing for zero limit

* basic client/server test

* don't return instances beyond pending, even if we have them

* two final protocol tests

* fix request cancellation logic

* poll result stringer

* test poller

* fixes and test subscriber

* lints

* remove test logging

* fix lints

* fix peer-tracker off-by-one bug

and tests

* don't require local stringer install

* remove test-only method

* use a mock clock

* improve timing reliability

* improved logging

* improve test reliability

* cleanup

* implement GC

* latency tracking

* improve reliability of subscribe tests

* fix comment

* fix build

* spelling

* fix heap usage

* test heap

* make the subscribe test pass more reliably
  • Loading branch information
Stebalien authored Jul 4, 2024
1 parent ab0e2ed commit 345d165
Show file tree
Hide file tree
Showing 21 changed files with 2,483 additions and 75 deletions.
142 changes: 142 additions & 0 deletions certexchange/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package certexchange

import (
"bufio"
"context"
"fmt"
"io"
"runtime/debug"
"time"

"github.com/filecoin-project/go-f3"
"github.com/filecoin-project/go-f3/certs"
"github.com/filecoin-project/go-f3/gpbft"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
)

// We've estimated the max power table size to be less than 1MiB:
//
// 1. For 10k participants.
// 2. <100 bytes per entry (key + id + power)
const maxPowerTableSize = 1024 * 1024

// Client is a libp2p certificate exchange client for requesting finality certificates from specific
// peers.
type Client struct {
Host host.Host
NetworkName gpbft.NetworkName
RequestTimeout time.Duration

Log f3.Logger
}

func (c *Client) withDeadline(ctx context.Context) (context.Context, context.CancelFunc) {
if c.RequestTimeout > 0 {
return context.WithTimeout(ctx, c.RequestTimeout)
}
return context.WithCancel(ctx)
}

// Request finality certificates from the specified peer. Returned finality certificates start at
// the requested instance number and are sequential, but are otherwise unvalidated.
func (c *Client) Request(ctx context.Context, p peer.ID, req *Request) (_rh *ResponseHeader, _ch <-chan *certs.FinalityCertificate, _err error) {
defer func() {
if perr := recover(); perr != nil {
_err = fmt.Errorf("panicked requesting certificates from peer %s: %v\n%s", p, perr, string(debug.Stack()))
c.Log.Error(_err)
}
}()

ctx, cancel := c.withDeadline(ctx)
defer func() {
if cancel != nil {
cancel()
}
}()

proto := FetchProtocolName(c.NetworkName)
stream, err := c.Host.NewStream(ctx, p, proto)
if err != nil {
return nil, nil, err
}
// Reset the stream if the parent context is canceled. We never call the returned stop
// function because we call the cancel function returned by `withDeadline` (which cancels
// the entire context tree).
context.AfterFunc(ctx, func() { _ = stream.Reset() })

if deadline, ok := ctx.Deadline(); ok {
if err := stream.SetDeadline(deadline); err != nil {
return nil, nil, err
}
}

br := &io.LimitedReader{R: bufio.NewReader(stream), N: 100}
bw := bufio.NewWriter(stream)

if err := req.MarshalCBOR(bw); err != nil {
c.Log.Debugf("failed to marshal certificate exchange request to peer %s: %w", p, err)
return nil, nil, err
}
if err := bw.Flush(); err != nil {
return nil, nil, err
}
if err := stream.CloseWrite(); err != nil {
return nil, nil, err
}

var resp ResponseHeader
if req.IncludePowerTable {
br.N = maxPowerTableSize
}
err = resp.UnmarshalCBOR(br)
if err != nil {
c.Log.Debugf("failed to unmarshal certificate exchange response header from peer %s: %w", p, err)
return nil, nil, err
}

ch := make(chan *certs.FinalityCertificate, 1)
// copy this in case the caller decides to re-use the request object...
request := *req

// Copy/replace the cancel func so exiting the request doesn't cancel it.
cancelReq := cancel
cancel = nil
go func() {
defer func() {
if perr := recover(); perr != nil {
c.Log.Errorf("panicked while receiving certificates from peer %s: %v\n%s", p, perr, string(debug.Stack()))
}
cancelReq()
close(ch)
}()
for i := uint64(0); i < request.Limit; i++ {
cert := new(certs.FinalityCertificate)

// We'll read at most 1MiB per certificate. They generally shouldn't be that
// large, but large power deltas could get close.
br.N = maxPowerTableSize
err := cert.UnmarshalCBOR(br)
switch err {
case nil:
case io.EOF:
return
default:
c.Log.Debugf("failed to unmarshal certificate from peer %s: %w", p, err)
return
}
// One quick sanity check. The rest will be validated by the caller.
if cert.GPBFTInstance != request.FirstInstance+i {
c.Log.Warnf("received out-of-order certificate from peer %s", p)
return
}

select {
case <-ctx.Done():
return
case ch <- cert:
}
}
}()
return &resp, ch, nil
}
239 changes: 239 additions & 0 deletions certexchange/gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions certexchange/polling/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package polling

import (
"github.com/benbjohnson/clock"
)

var clk clock.Clock = clock.New()
Loading

0 comments on commit 345d165

Please sign in to comment.