From cf578f3f4ece96d214c9a525c93015f1c432b872 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Mon, 3 Jun 2024 12:52:38 -0700 Subject: [PATCH] Implement certificate exchange This implements a basic certificate exchange protocol (which still needs tests and is probably slightly broken at the moment). Importantly, it's missing the ability to fetch power table deltas for validating future instances (beyond the latest certificate). My plan is to implement this as a somewhat separate protocol (likely re-using a lot of the same machinery). However: 1. That protocol is only needed for observer nodes. Active participants in the network will follow the EC chain and will learn these power tables through the EC chain. 2. That protocol won't need as much guessing because we'll _know_ which power tables should be available given the latest certificate we've received. The large remaining TODOs are tests and design documentation. The entire protocol has been in constant flux so... I'm sure there are some inconsistencies... --- certexchange/client.go | 142 +++++++++++++++ certexchange/gen.go | 239 ++++++++++++++++++++++++ certexchange/polling/peerTracker.go | 269 ++++++++++++++++++++++++++++ certexchange/polling/poller.go | 133 ++++++++++++++ certexchange/polling/predictor.go | 108 +++++++++++ certexchange/polling/subscriber.go | 238 ++++++++++++++++++++++++ certexchange/protocol.go | 28 +++ certexchange/server.go | 125 +++++++++++++ gen/main.go | 9 + go.mod | 2 +- 10 files changed, 1292 insertions(+), 1 deletion(-) create mode 100644 certexchange/client.go create mode 100644 certexchange/gen.go create mode 100644 certexchange/polling/peerTracker.go create mode 100644 certexchange/polling/poller.go create mode 100644 certexchange/polling/predictor.go create mode 100644 certexchange/polling/subscriber.go create mode 100644 certexchange/protocol.go create mode 100644 certexchange/server.go diff --git a/certexchange/client.go b/certexchange/client.go new file mode 100644 index 00000000..45540038 --- /dev/null +++ b/certexchange/client.go @@ -0,0 +1,142 @@ +package certexchange + +import ( + "bufio" + "context" + "fmt" + "io" + "runtime/debug" + "time" + + "github.com/filecoin-project/go-f3" + "github.com/filecoin-project/go-f3/certs" + "github.com/filecoin-project/go-f3/gpbft" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" +) + +// We've estimated the max power table size to be less than 1MiB: +// +// 1. For 10k participants. +// 2. <100 bytes per entry (key + id + power) +const maxPowerTableSize = 1024 * 1024 + +// Client is a libp2p certificate exchange client for requesting finality certificates from specific +// peers. +type Client struct { + Host host.Host + NetworkName gpbft.NetworkName + RequestTimeout time.Duration + + Log f3.Logger +} + +func resetOnCancel(ctx context.Context, s network.Stream) func() { + errCh := make(chan error, 1) + cancel := context.AfterFunc(ctx, func() { + errCh <- s.Reset() + close(errCh) + }) + return func() error { + if cancel() { + _ = s.Reset() + } else { + _ = <-errCh + } + } +} + +func (c *Client) withDeadline(ctx context.Context) (context.Context, context.CancelFunc) { + if c.RequestTimeout > 0 { + return context.WithTimeout(ctx, c.RequestTimeout) + } + return ctx, func() {} +} + +// Request finality certificates from the specified peer. Returned finality certificates start at +// the requested instance number and are sequential, but are otherwise unvalidated. +func (c *Client) Request(ctx context.Context, p peer.ID, req *Request) (_rh *ResponseHeader, _ch <-chan *certs.FinalityCertificate, _err error) { + defer func() { + if perr := recover(); perr != nil { + _err = fmt.Errorf("panicked requesting certificates from peer %s: %v\n%s", p, perr, string(debug.Stack())) + c.Log.Error(_err) + } + }() + + ctx, cancel := c.withDeadline(ctx) + defer cancel() + + proto := FetchProtocolName(c.NetworkName) + stream, err := c.Host.NewStream(ctx, p, proto) + if err != nil { + return nil, nil, err + } + defer resetOnCancel(ctx, stream)() + + if deadline, ok := ctx.Deadline(); ok { + if err := stream.SetDeadline(deadline); err != nil { + return nil, nil, err + } + } + + br := &io.LimitedReader{R: bufio.NewReader(stream), N: 100} + bw := bufio.NewWriter(stream) + + if err := req.MarshalCBOR(bw); err != nil { + c.Log.Debugf("failed to marshal certificate exchange request to peer %s: %w", p, err) + return nil, nil, err + } + if err := bw.Flush(); err != nil { + return nil, nil, err + } + if err := stream.CloseWrite(); err != nil { + return nil, nil, err + } + + var resp ResponseHeader + if req.IncludePowerTable { + br.N = maxPowerTableSize + } + err = resp.UnmarshalCBOR(br) + if err != nil { + c.Log.Debugf("failed to unmarshal certificate exchange response header from peer %s: %w", p, err) + return nil, nil, err + } + + ch := make(chan *certs.FinalityCertificate, 1) + // copy this in case the caller decides to re-use the request object... + request := *req + go func() { + defer func() { + if perr := recover(); perr != nil { + c.Log.Errorf("panicked while receiving certificates from peer %s: %v\n%s", p, perr, string(debug.Stack())) + } + }() + defer close(ch) + for i := uint64(0); request.Limit == 0 || i < request.Limit; i++ { + cert := new(certs.FinalityCertificate) + + // We'll read at most 1MiB per certificate. They generally shouldn't be that + // large, but large power deltas could get close. + br.N = maxPowerTableSize + err := cert.UnmarshalCBOR(br) + if err != nil { + c.Log.Debugf("failed to unmarshal certificate from peer %s: %w", p, err) + return + } + // One quick sanity check. The rest will be validated by the caller. + if cert.GPBFTInstance != request.FirstInstance+i { + c.Log.Warnf("received out-of-order certificate from peer %s", p) + return + } + + select { + case <-ctx.Done(): + return + case ch <- cert: + } + } + }() + return &resp, ch, nil +} diff --git a/certexchange/gen.go b/certexchange/gen.go new file mode 100644 index 00000000..52ceb479 --- /dev/null +++ b/certexchange/gen.go @@ -0,0 +1,239 @@ +// Code generated by github.com/whyrusleeping/cbor-gen. DO NOT EDIT. + +package certexchange + +import ( + "fmt" + "io" + "math" + "sort" + + gpbft "github.com/filecoin-project/go-f3/gpbft" + cid "github.com/ipfs/go-cid" + cbg "github.com/whyrusleeping/cbor-gen" + xerrors "golang.org/x/xerrors" +) + +var _ = xerrors.Errorf +var _ = cid.Undef +var _ = math.E +var _ = sort.Sort + +var lengthBufRequest = []byte{131} + +func (t *Request) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + + cw := cbg.NewCborWriter(w) + + if _, err := cw.Write(lengthBufRequest); err != nil { + return err + } + + // t.FirstInstance (uint64) (uint64) + + if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(t.FirstInstance)); err != nil { + return err + } + + // t.Limit (uint64) (uint64) + + if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(t.Limit)); err != nil { + return err + } + + // t.IncludePowerTable (bool) (bool) + if err := cbg.WriteBool(w, t.IncludePowerTable); err != nil { + return err + } + return nil +} + +func (t *Request) UnmarshalCBOR(r io.Reader) (err error) { + *t = Request{} + + cr := cbg.NewCborReader(r) + + maj, extra, err := cr.ReadHeader() + if err != nil { + return err + } + defer func() { + if err == io.EOF { + err = io.ErrUnexpectedEOF + } + }() + + if maj != cbg.MajArray { + return fmt.Errorf("cbor input should be of type array") + } + + if extra != 3 { + return fmt.Errorf("cbor input had wrong number of fields") + } + + // t.FirstInstance (uint64) (uint64) + + { + + maj, extra, err = cr.ReadHeader() + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.FirstInstance = uint64(extra) + + } + // t.Limit (uint64) (uint64) + + { + + maj, extra, err = cr.ReadHeader() + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.Limit = uint64(extra) + + } + // t.IncludePowerTable (bool) (bool) + + maj, extra, err = cr.ReadHeader() + if err != nil { + return err + } + if maj != cbg.MajOther { + return fmt.Errorf("booleans must be major type 7") + } + switch extra { + case 20: + t.IncludePowerTable = false + case 21: + t.IncludePowerTable = true + default: + return fmt.Errorf("booleans are either major type 7, value 20 or 21 (got %d)", extra) + } + return nil +} + +var lengthBufResponseHeader = []byte{130} + +func (t *ResponseHeader) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + + cw := cbg.NewCborWriter(w) + + if _, err := cw.Write(lengthBufResponseHeader); err != nil { + return err + } + + // t.PendingInstance (uint64) (uint64) + + if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(t.PendingInstance)); err != nil { + return err + } + + // t.PowerTable ([]gpbft.PowerEntry) (slice) + if len(t.PowerTable) > 8192 { + return xerrors.Errorf("Slice value in field t.PowerTable was too long") + } + + if err := cw.WriteMajorTypeHeader(cbg.MajArray, uint64(len(t.PowerTable))); err != nil { + return err + } + for _, v := range t.PowerTable { + if err := v.MarshalCBOR(cw); err != nil { + return err + } + + } + return nil +} + +func (t *ResponseHeader) UnmarshalCBOR(r io.Reader) (err error) { + *t = ResponseHeader{} + + cr := cbg.NewCborReader(r) + + maj, extra, err := cr.ReadHeader() + if err != nil { + return err + } + defer func() { + if err == io.EOF { + err = io.ErrUnexpectedEOF + } + }() + + if maj != cbg.MajArray { + return fmt.Errorf("cbor input should be of type array") + } + + if extra != 2 { + return fmt.Errorf("cbor input had wrong number of fields") + } + + // t.PendingInstance (uint64) (uint64) + + { + + maj, extra, err = cr.ReadHeader() + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.PendingInstance = uint64(extra) + + } + // t.PowerTable ([]gpbft.PowerEntry) (slice) + + maj, extra, err = cr.ReadHeader() + if err != nil { + return err + } + + if extra > 8192 { + return fmt.Errorf("t.PowerTable: array too large (%d)", extra) + } + + if maj != cbg.MajArray { + return fmt.Errorf("expected cbor array") + } + + if extra > 0 { + t.PowerTable = make([]gpbft.PowerEntry, extra) + } + + for i := 0; i < int(extra); i++ { + { + var maj byte + var extra uint64 + var err error + _ = maj + _ = extra + _ = err + + { + + if err := t.PowerTable[i].UnmarshalCBOR(cr); err != nil { + return xerrors.Errorf("unmarshaling t.PowerTable[i]: %w", err) + } + + } + + } + } + return nil +} diff --git a/certexchange/polling/peerTracker.go b/certexchange/polling/peerTracker.go new file mode 100644 index 00000000..54077160 --- /dev/null +++ b/certexchange/polling/peerTracker.go @@ -0,0 +1,269 @@ +package polling + +import ( + "cmp" + "container/heap" + "math/rand/v2" + "slices" + + "github.com/libp2p/go-libp2p/core/peer" +) + +const ( + hitMissSlidingWindow = 10 + maxBackoffExponent = 8 + // The default number of requests to make. + defaultRequests = 8 + // The minimum number of requests to make. If we pick fewer than this number of peers, we'll + // randomly sample known peers to find more. + minRequests = 4 + // The maximum number of requests to make, even if all of our peers appear to be unreliable. + maxRequests = 32 +) + +type peerState int + +const ( + peerEvil peerState = iota - 1 + peerInactive + peerActive +) + +// TODO: Track latency and connectedness. +type peerRecord struct { + sequentialFailures int + + // Sliding windows of hits/misses (0-10 each). If either would exceed 10, we subtract 1 from + // both (where 0 is the floor). + // + // - We use sliding windows to give more weight to recent hits/misses. + // - We don't use a simple weighted moving average because that doesn't track how "sure" we + // are of the measurement. + hits, misses int + + state peerState +} + +type backoffHeap []*backoffRecord + +type backoffRecord struct { + peer peer.ID + // Delay until a round. XXX maybe do this in terms of wall-clock time? + delayUntil int +} + +type peerTracker struct { + // TODO: garbage collect this. + peers map[peer.ID]*peerRecord + // TODO: Limit the number of active peers. + active []peer.ID + backoff backoffHeap + round int +} + +func (r *peerRecord) Cmp(other *peerRecord) int { + if c := cmp.Compare(r.state, other.state); c != 0 { + return c + } + rateA, countA := r.hitRate() + rateB, countB := other.hitRate() + + if c := cmp.Compare(rateA, rateB); c != 0 { + return c + } + if c := cmp.Compare(countA, countB); c != 0 { + return c + } + return 0 +} + +// Len implements heap.Interface. +func (b *backoffHeap) Len() int { + return len(*b) +} + +// Less implements heap.Interface. +func (b *backoffHeap) Less(i int, j int) bool { + return (*b)[i].delayUntil < (*b)[j].delayUntil +} + +// Pop implements heap.Interface. +func (b *backoffHeap) Pop() any { + return (*b)[len(*b)-1] +} + +// Push implements heap.Interface. +func (b *backoffHeap) Push(x any) { + *b = append(*b, x.(*backoffRecord)) +} + +// Swap implements heap.Interface. +func (b *backoffHeap) Swap(i int, j int) { + (*b)[i], (*b)[j] = (*b)[j], (*b)[i] +} + +// Records a failed request and returns how many rounds we should avoid picking this peer for. +func (r *peerRecord) recordFailure() int { + r.sequentialFailures++ + r.state = peerInactive + return 1 << min(r.sequentialFailures, maxBackoffExponent) +} + +func (r *peerRecord) recordHit() { + r.sequentialFailures = 0 + if r.hits < hitMissSlidingWindow { + r.hits++ + } else if r.misses > 0 { + r.misses-- + } +} + +func (r *peerRecord) recordMiss() { + r.sequentialFailures = 0 + if r.misses < hitMissSlidingWindow { + r.misses++ + } else if r.hits > 0 { + r.hits-- + } +} + +// Return the hit rate and the +func (r *peerRecord) hitRate() (float64, int) { + total := r.hits + r.misses + // set the default rate such that we we ask `defaultRequests` peers by default. + rate := float64(1) / defaultRequests + if total > 0 { + rate = float64(r.hits) / float64(total) + } + return rate, total + +} + +func (t *peerTracker) getOrCreate(p peer.ID) *peerRecord { + r, ok := t.peers[p] + if !ok { + r = new(peerRecord) + t.peers[p] = r + } + return r +} + +func (t *peerTracker) recordInvalid(p peer.ID) { + t.getOrCreate(p).state = peerEvil +} + +func (t *peerTracker) recordMiss(p peer.ID) { + t.getOrCreate(p).recordMiss() +} + +func (t *peerTracker) recordFailure(p peer.ID) { + // When we fail to query a peer, backoff that peer. + r := &backoffRecord{ + peer: p, + delayUntil: t.round + t.getOrCreate(p).recordFailure(), + } + heap.Push(&t.backoff, r) +} + +func (t *peerTracker) recordHit(p peer.ID) { + t.getOrCreate(p).recordHit() +} + +func (t *peerTracker) makeActive(p peer.ID) { + r := t.getOrCreate(p) + if r.state != peerInactive { + return + } + r.state = peerActive + t.active = append(t.active, p) +} + +func (t *peerTracker) peerSeen(p peer.ID) { + if _, ok := t.peers[p]; !ok { + t.peers[p] = &peerRecord{state: peerActive} + t.active = append(t.active, p) + } +} + +// Suggest a number of peers from which to request new certificates based on their historical +// record. +// +// TODO: Add a multiplier if we're not making progress. +func (t *peerTracker) suggestPeers() []peer.ID { + // XXX: this should be a param. + const targetProbability = 1.1 + + // Advance the round and move peers from backoff to active, if necessary. + t.round++ + for t.backoff.Len() > 0 { + r := t.backoff[0] + if r.delayUntil > t.round { + break + } + heap.Pop(&t.backoff) + t.makeActive(r.peer) + } + + // Sort from best to worst. + slices.SortFunc(t.active, func(a, b peer.ID) int { + return t.getOrCreate(b).Cmp(t.getOrCreate(a)) + }) + // Trim off any inactive/evil peers from the end, they'll be sorted last. + for l := len(t.active); l > 0 && t.getOrCreate(t.active[l-1]).state != peerActive; l-- { + t.active = t.active[:l] + } + var prob float64 + var peerCount int + for _, p := range t.active { + hitRate, _ := t.getOrCreate(p).hitRate() + // If we believe this and all the rest of the peers are useless, choose the rest of + // the peers randomly. + if hitRate == 0 { + break + } + + prob += hitRate + peerCount++ + if peerCount >= maxRequests { + break + } + // Keep going till we're 110% sure. + if prob >= targetProbability { + break + } + } + + chosen := t.active[:peerCount:peerCount] + + if peerCount == len(t.active) { + // We've chosen all peers, nothing else we can do. + } else if prob < targetProbability { + // If we failed to reach the target probability, choose randomly from the remaining + // peers. + chosen = append(chosen, choose(t.active[peerCount:], maxRequests-peerCount)...) + } else if peerCount < minRequests { + // If we reached the target probability but didn't reach the number of minimum + // requests, pick a few more peers to fill us out. + chosen = append(chosen, choose(t.active[peerCount:], minRequests-peerCount)...) + } + + return chosen +} + +var _ heap.Interface = new(backoffHeap) + +func choose[T any](items []T, count int) []T { + if len(items) <= count { + return items + } + + // Knuth 3.4.2S. Could use rand.Perm, but that would allocate a large array. + // There are more efficient algorithms for small sample sizes, but they're complex. + chosen := make([]T, 0, count) + for t := 0; len(chosen) < cap(chosen); t++ { + if rand.IntN(len(items)-t) < cap(chosen)-len(chosen) { + chosen = append(chosen, items[t]) + } + } + return chosen +} diff --git a/certexchange/polling/poller.go b/certexchange/polling/poller.go new file mode 100644 index 00000000..f979ea4e --- /dev/null +++ b/certexchange/polling/poller.go @@ -0,0 +1,133 @@ +package polling + +import ( + "context" + + "github.com/filecoin-project/go-f3/certexchange" + "github.com/filecoin-project/go-f3/certs" + "github.com/filecoin-project/go-f3/certstore" + "github.com/filecoin-project/go-f3/gpbft" + "github.com/libp2p/go-libp2p/core/peer" +) + +// A Poller will poll specific peers on-demand to try to advance the current GPBFT instance. +type Poller struct { + *certexchange.Client + + Store *certstore.Store + SignatureVerifier gpbft.Verifier + PowerTable gpbft.PowerEntries + NextInstance uint64 +} + +// NewPoller constructs a new certificate poller and initializes it from the passed certificate store. +func NewPoller(ctx context.Context, client *certexchange.Client, store *certstore.Store, verifier gpbft.Verifier) (*Poller, error) { + var nextInstance uint64 + if latest := store.Latest(); latest != nil { + nextInstance = latest.GPBFTInstance + 1 + } + pt, err := store.GetPowerTable(ctx, nextInstance) + if err != nil { + return nil, err + } + return &Poller{ + Client: client, + Store: store, + SignatureVerifier: verifier, + NextInstance: nextInstance, + PowerTable: pt, + }, nil +} + +type PollResult int + +const ( + PollMiss PollResult = iota + PollHit + PollFailed + PollIllegal +) + +// CatchUp attempts to advance to the latest instance from the certificate store without making any +// network requests. It returns the number of instances we advanced. +func (p *Poller) CatchUp(ctx context.Context) (uint64, error) { + latest := p.Store.Latest() + if latest == nil { + return 0, nil + } + + next := latest.GPBFTInstance + 1 + progress := next - p.NextInstance + + if progress == 0 { + return 0, nil + } + + pt, err := p.Store.GetPowerTable(ctx, next) + if err != nil { + return 0, err + } + p.PowerTable = pt + p.NextInstance = next + return progress, nil +} + +// Poll polls a specific peer, possibly multiple times, in order to advance the instance as much as +// possible. It returns: +// +// 1. A PollResult indicating the outcome: miss, hit, failed, illegal. +// 2. An error if something went wrong internally (e.g., the certificate store returned an error). +func (p *Poller) Poll(ctx context.Context, peer peer.ID) (PollResult, error) { + var result PollResult + for { + // Requests take time, so always try to catch-up between requests in case there has + // been some "local" action from the GPBFT instance. + if _, err := p.CatchUp(ctx); err != nil { + return PollFailed, err + } + + resp, ch, err := p.Request(ctx, peer, &certexchange.Request{ + FirstInstance: p.NextInstance, + Limit: maxRequestLength, + IncludePowerTable: false, + }) + if err != nil { + return PollFailed, nil + } + + // If they're caught up, record it as a hit. Otherwise, if they have nothing + // to give us, move on. + if resp.PendingInstance >= p.NextInstance { + result = PollHit + } + + received := 0 + for cert := range ch { + // TODO: consider batching verification, it's slightly faster. + next, _, pt, err := certs.ValidateFinalityCertificates( + p.SignatureVerifier, p.NetworkName, p.PowerTable, p.NextInstance, nil, + *cert, + ) + if err != nil { + return PollIllegal, nil + } + if err := p.Store.Put(ctx, cert); err != nil { + return PollHit, err + } + p.NextInstance = next + p.PowerTable = pt + received++ + } + + // Try again if they're claiming to have more instances (and gave me at + // least one). + if resp.PendingInstance <= p.NextInstance { + return result, nil + } else if received == 0 { + // If they give me no certificates but claim to have more, treat this as a + // failure (could be a connection failure, etc). + return PollFailed, nil + } + + } +} diff --git a/certexchange/polling/predictor.go b/certexchange/polling/predictor.go new file mode 100644 index 00000000..8b570a5f --- /dev/null +++ b/certexchange/polling/predictor.go @@ -0,0 +1,108 @@ +package polling + +import ( + "time" +) + +const ( + maxBackoffMultiplier = 10 + minExploreDistance = 100 * time.Millisecond +) + +func newPredictor(targetAccuracy float64, minInterval, defaultInterval, maxInterval time.Duration) *predictor { + return &predictor{ + minInterval: minInterval, + maxInterval: maxInterval, + interval: defaultInterval, + exploreDistance: defaultInterval / 2, + } +} + +// An interval predictor that tries to predict the time between instances. It can't predict the time +// an instance will be available, but it'll keep adjusting the interval until we receive one +// instance per interval. +type predictor struct { + minInterval, maxInterval time.Duration + + next time.Time + interval time.Duration + wasIncreasing bool + exploreDistance time.Duration + + backoff time.Duration +} + +// Update the predictor. The one argument indicates how many certificates we received since the last +// update. +// +// - 2+ -> interval is too long. +// - 1 -> interval is perfect. +// - 0 -> interval is too short. +// +// We don't actually know the _offset_... but whatever. We can keep up +/- one instance and that's +// fine (especially because of the power table lag, etc.). +func (p *predictor) update(progress uint64) time.Duration { + if p.backoff > 0 { + if progress > 0 { + p.backoff = 0 + } + } else if progress != 1 { + // If we've made too much progress (interval too long) or made no progress (interval + // too short), explore to find the right interval. + + // We halve the explore distance when we switch directions and double it whenever we + // need to keep moving in the same direction repeatedly. + if p.wasIncreasing == (progress > 1) { + p.exploreDistance /= 2 + } else { + p.exploreDistance *= 2 + } + + // Make sure the explore distance doesn't get too short/long. + if p.exploreDistance < minExploreDistance { + p.exploreDistance = minExploreDistance + } else if p.exploreDistance > p.maxInterval/2 { + p.exploreDistance = p.maxInterval / 2 + } + + // Then update the interval. + if progress == 0 { + // If we fail to make progress, enter "backoff" mode. We'll leave backoff + // mode next time we receive a certificate. Otherwise, we'd end up quickly + // skewing our belief of the correct interval e.g., due to a skipped + // instance. + p.backoff = p.interval + p.interval += p.exploreDistance + p.wasIncreasing = true + } else { + p.interval -= p.exploreDistance + p.wasIncreasing = false + } + + // Clamp between min/max + if p.interval < p.minInterval { + p.interval = p.minInterval + } else if p.interval > p.maxInterval { + p.interval = p.maxInterval + } + } + + // Apply either the backoff or predicted the interval. + if p.backoff > 0 { + p.backoff = min(2*p.backoff, maxBackoffMultiplier*p.maxInterval) + p.next = p.next.Add(p.backoff) + } else { + p.next = p.next.Add(p.interval) + } + + // Polling takes time. If we run behind, predict that we should poll again immediately. We + // enforce a minimum interval above so this shouldn't be too often. + now := time.Now() + prediction := p.next.Sub(now) + if prediction < 0 { + p.next = now + return 0 + } + + return prediction +} diff --git a/certexchange/polling/subscriber.go b/certexchange/polling/subscriber.go new file mode 100644 index 00000000..798b698a --- /dev/null +++ b/certexchange/polling/subscriber.go @@ -0,0 +1,238 @@ +package polling + +import ( + "context" + "fmt" + "slices" + "sync" + "time" + + "github.com/libp2p/go-libp2p/core/event" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/protocol" + + "github.com/filecoin-project/go-f3/certexchange" + "github.com/filecoin-project/go-f3/certstore" + "github.com/filecoin-project/go-f3/gpbft" +) + +const maxRequestLength = 256 + +// A polling Subscriber will continuously poll the network for new finality certificates. +type Subscriber struct { + certexchange.Client + + Store *certstore.Store + SignatureVerifier gpbft.Verifier + InitialPollInterval time.Duration + MaximumPollInterval time.Duration + MinimumPollInterval time.Duration + + peerTracker peerTracker + + wg sync.WaitGroup + + ctx context.Context + stop context.CancelFunc +} + +func (s *Subscriber) Start() error { + s.wg.Add(1) + s.ctx, s.stop = context.WithCancel(context.Background()) + go func() { + defer s.wg.Done() + + ctx, cancel := context.WithCancel(s.ctx) + defer cancel() + if err := s.run(ctx); err != nil && s.ctx.Err() != nil { + s.Log.Errorf("polling certificate exchange subscriber exited early: %w", err) + } + }() + + return nil +} + +func (s *Subscriber) Stop() error { + s.stop() + s.wg.Wait() + + return nil +} + +// Discover new peers. +func (s *Subscriber) libp2pDiscover(ctx context.Context) (<-chan peer.ID, error) { + out := make(chan peer.ID, 256) + discoveryEvents, err := s.Host.EventBus().Subscribe([]any{ + new(event.EvtPeerIdentificationCompleted), + new(event.EvtPeerProtocolsUpdated), + }) + if err != nil { + return nil, err + } + + targetProtocol := certexchange.FetchProtocolName(s.NetworkName) + + // Mark already connected peers as "seen". + for _, p := range s.Host.Network().Peers() { + if proto, err := s.Host.Peerstore().FirstSupportedProtocol(p, targetProtocol); err == nil && proto == targetProtocol { + s.peerTracker.peerSeen(p) + } + } + + // Then start listening for new peers + s.wg.Add(1) + go func() { + defer s.wg.Done() + defer discoveryEvents.Close() + for { + var ( + evt any + ok bool + ) + select { + case evt, ok = <-discoveryEvents.Out(): + case <-ctx.Done(): + } + if !ok { + return + } + + var protos []protocol.ID + var peer peer.ID + switch e := evt.(type) { + case *event.EvtPeerIdentificationCompleted: + protos = e.Protocols + peer = e.Peer + case *event.EvtPeerProtocolsUpdated: + protos = e.Added + peer = e.Peer + default: + continue + } + if slices.Contains(protos, targetProtocol) { + // If the channel is full, ignore newly discovered peers. We + // likely have enough anyways and we'll drain the channel + // eventually. + select { + case out <- peer: + default: + } + } + } + }() + return out, nil +} + +func (s *Subscriber) run(ctx context.Context) error { + timer := time.NewTimer(s.InitialPollInterval) + defer timer.Stop() + + discoveredPeers, err := s.libp2pDiscover(ctx) + if err != nil { + return err + } + + predictor := newPredictor( + 0.05, + s.MinimumPollInterval, + s.InitialPollInterval, + s.MaximumPollInterval, + ) + + poller, err := NewPoller(ctx, &s.Client, s.Store, s.SignatureVerifier) + if err != nil { + return err + } + + for ctx.Err() == nil { + var err error + + // Always handle newly discovered peers and new certificates from the certificate + // store _first_. Then check the timer to see if we should poll. + select { + case p := <-discoveredPeers: + s.peerTracker.peerSeen(p) + default: + select { + case p := <-discoveredPeers: + s.peerTracker.peerSeen(p) + case <-timer.C: + // First, see if we made progress locally. If we have, update + // interval prediction based on that local progress. If our interval + // was accurate, we'll keep predicting the same interval and we'll + // never make any network requests. If we stop making local + // progress, we'll start making network requests again. + var progress uint64 + progress, err = poller.CatchUp(ctx) + if err != nil { + break + } + if progress > 0 { + timer.Reset(predictor.update(progress)) + break + } + + progress, err = s.poll(ctx, poller) + if err != nil { + break + } + timer.Reset(predictor.update(progress)) + case <-ctx.Done(): + return ctx.Err() + } + } + + if err != nil { + return err + } + } + return ctx.Err() +} + +func (s *Subscriber) poll(ctx context.Context, poller *Poller) (uint64, error) { + var ( + misses []peer.ID + hits []peer.ID + ) + + start := poller.NextInstance + for _, peer := range s.peerTracker.suggestPeers() { + oldInstance := poller.NextInstance + res, err := poller.Poll(ctx, peer) + if err != nil { + return poller.NextInstance - start, err + } + // If we manage to advance, all old "hits" are actually misses. + if oldInstance < poller.NextInstance { + misses = append(misses, hits...) + hits = hits[:0] + } + + switch res { + case PollMiss: + misses = append(misses, peer) + case PollHit: + hits = append(hits, peer) + case PollFailed: + s.peerTracker.recordFailure(peer) + case PollIllegal: + s.peerTracker.recordInvalid(peer) + default: + panic(fmt.Sprintf("unexpected polling.PollResult: %#v", res)) + } + } + + // If we've made progress, record hits/misses. Otherwise, we just have to assume that we + // asked too soon. + progress := poller.NextInstance - start + if progress > 0 { + for _, p := range misses { + s.peerTracker.recordMiss(p) + } + for _, p := range hits { + s.peerTracker.recordHit(p) + } + } + + return progress, nil +} diff --git a/certexchange/protocol.go b/certexchange/protocol.go new file mode 100644 index 00000000..0ea08299 --- /dev/null +++ b/certexchange/protocol.go @@ -0,0 +1,28 @@ +package certexchange + +import ( + "github.com/filecoin-project/go-f3/gpbft" + "github.com/libp2p/go-libp2p/core/protocol" +) + +func FetchProtocolName(nn gpbft.NetworkName) protocol.ID { + return protocol.ID("/f3/certexch/get/1/" + string(nn)) +} + +type Request struct { + // First instance to fetch. + FirstInstance uint64 + // Max number of instances to fetch. The server may respond with fewer certificates than + // requested, even if more are available. + Limit uint64 + // Include the full power table needed to validate the first finality certificate. + // Checked by the user against their last finality certificate. + IncludePowerTable bool +} + +type ResponseHeader struct { + // The next instance to be finalized. This is 0 when no instances have been finalized. + PendingInstance uint64 + // Power table, if requested, or empty. + PowerTable []gpbft.PowerEntry +} diff --git a/certexchange/server.go b/certexchange/server.go new file mode 100644 index 00000000..483f0eb6 --- /dev/null +++ b/certexchange/server.go @@ -0,0 +1,125 @@ +package certexchange + +import ( + "bufio" + "context" + "errors" + "fmt" + "runtime/debug" + "time" + + "github.com/filecoin-project/go-f3" + "github.com/filecoin-project/go-f3/certstore" + "github.com/filecoin-project/go-f3/gpbft" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/network" +) + +const maxResponseLen = 256 + +// Server is libp2p a certificate exchange server. +type Server struct { + // Request timeouts. If non-zero, requests will be canceled after the specified duration. + RequestTimeout time.Duration + NetworkName gpbft.NetworkName + Host host.Host + Store *certstore.Store + Log f3.Logger + + cancel context.CancelFunc +} + +func (s *Server) withDeadline(ctx context.Context) (context.Context, context.CancelFunc) { + if s.RequestTimeout > 0 { + return context.WithTimeout(ctx, s.RequestTimeout) + } + return ctx, func() {} +} + +func (s *Server) handleRequest(ctx context.Context, stream network.Stream) (_err error) { + defer func() { + if perr := recover(); perr != nil { + _err = fmt.Errorf("panicked in server response: %v", perr) + s.Log.Errorf("%s\n%s", string(debug.Stack())) + } + }() + + if deadline, ok := ctx.Deadline(); ok { + if err := stream.SetDeadline(deadline); err != nil { + return err + } + } + + br := bufio.NewReader(stream) + bw := bufio.NewWriter(stream) + + // Request has no variable-length fields, so we don't need a limited reader. + var req Request + if err := req.UnmarshalCBOR(br); err != nil { + s.Log.Debugf("failed to read request from stream: %w", err) + return err + } + + limit := req.Limit + if limit == 0 || limit > maxResponseLen { + limit = maxResponseLen + } + var resp ResponseHeader + if latest := s.Store.Latest(); latest != nil { + resp.PendingInstance = latest.GPBFTInstance + 1 + } + + if resp.PendingInstance <= req.FirstInstance { + pt, err := s.Store.GetPowerTable(ctx, req.FirstInstance) + if err != nil { + s.Log.Errorf("failed to load power table: %w", err) + return err + } + resp.PowerTable = pt + } + + if err := resp.MarshalCBOR(bw); err != nil { + s.Log.Debugf("failed to write header to stream: %w", err) + return err + } + + if resp.PendingInstance > req.FirstInstance { + certs, err := s.Store.GetRange(ctx, req.FirstInstance, req.FirstInstance+limit) + if err == nil || errors.Is(err, certstore.ErrCertNotFound) { + for i := range certs { + if err := certs[i].MarshalCBOR(bw); err != nil { + s.Log.Debugf("failed to write certificate to stream: %w", err) + return err + } + } + } else { + s.Log.Errorf("failed to load finality certificates: %w", err) + } + } + return bw.Flush() +} + +// Start the server. +func (s *Server) Start() error { + ctx, cancel := context.WithCancel(context.Background()) + s.cancel = cancel + s.Host.SetStreamHandler(FetchProtocolName(s.NetworkName), func(stream network.Stream) { + ctx, cancel := s.withDeadline(ctx) + defer cancel() + + if err := s.handleRequest(ctx, stream); err != nil { + _ = stream.Reset() + } else { + _ = stream.Close() + } + + }) + return nil +} + +// Stop the server. +func (s *Server) Stop() error { + s.Host.RemoveStreamHandler(FetchProtocolName(s.NetworkName)) + s.cancel() + return nil +} diff --git a/gen/main.go b/gen/main.go index 4fc610f9..2a418bc1 100644 --- a/gen/main.go +++ b/gen/main.go @@ -4,6 +4,7 @@ import ( "fmt" "os" + "github.com/filecoin-project/go-f3/certexchange" "github.com/filecoin-project/go-f3/certs" "github.com/filecoin-project/go-f3/gpbft" gen "github.com/whyrusleeping/cbor-gen" @@ -34,4 +35,12 @@ func main() { fmt.Println(err) os.Exit(1) } + err = gen.WriteTupleEncodersToFile("../certexchange/gen.go", "certexchange", + certexchange.Request{}, + certexchange.ResponseHeader{}, + ) + if err != nil { + fmt.Println(err) + os.Exit(1) + } } diff --git a/go.mod b/go.mod index 212389a3..bd815d4d 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/filecoin-project/go-f3 -go 1.21 +go 1.22 require ( github.com/Kubuxu/go-broadcast v0.0.0-20240621161059-1a8c90734cd6