diff --git a/aw.go b/aw.go index 37fc4ec..e9093ce 100644 --- a/aw.go +++ b/aw.go @@ -344,6 +344,7 @@ func (peer *Peer) Run(ctx context.Context) error { break LOOP case <-ticker.C: + peer.Opts.Logger.Debug("peer discovery event", zap.Int("len", len(peer.events)), zap.Int("cap", cap(peer.events))) peer.events <- peerDiscoveryEvent } } @@ -382,6 +383,7 @@ func (peer *Peer) Link(remote id.Signatory) error { errorResponder: responder, } + peer.Opts.Logger.Debug("link event", zap.Int("len", len(peer.events)), zap.Int("cap", cap(peer.events))) select { case peer.events <- event: return <-responder @@ -397,6 +399,7 @@ func (peer *Peer) Unlink(remote id.Signatory) error { id: remote, } + peer.Opts.Logger.Debug("unlink event", zap.Int("len", len(peer.events)), zap.Int("cap", cap(peer.events))) select { case peer.events <- event: return nil @@ -409,6 +412,7 @@ func (peer *Peer) Unlink(remote id.Signatory) error { func (peer *Peer) Sync(ctx context.Context, contentID []byte, hint *id.Signatory) ([]byte, error) { event, errResponder, responder := syncEvent(ctx, contentID, hint) + peer.Opts.Logger.Debug("sync event", zap.Int("len", len(peer.events)), zap.Int("cap", cap(peer.events))) select { case peer.events <- event: @@ -422,6 +426,7 @@ func (peer *Peer) Sync(ctx context.Context, contentID []byte, hint *id.Signatory func (peer *Peer) SyncNonBlocking(ctx context.Context, contentID []byte, hint *id.Signatory) ([]byte, error) { event, errResponder, responder := syncEvent(ctx, contentID, hint) + peer.Opts.Logger.Debug("sync nb event", zap.Int("len", len(peer.events)), zap.Int("cap", cap(peer.events))) select { case peer.events <- event: @@ -469,6 +474,7 @@ func syncResponse(ctx context.Context, errResponder chan error, responder chan [ func (peer *Peer) Gossip(ctx context.Context, contentID []byte, subnet *id.Hash) error { event := gossipEvent(contentID, subnet) + peer.Opts.Logger.Debug("gossip event", zap.Int("len", len(peer.events)), zap.Int("cap", cap(peer.events))) select { case peer.events <- event: return nil @@ -481,6 +487,7 @@ func (peer *Peer) Gossip(ctx context.Context, contentID []byte, subnet *id.Hash) func (peer *Peer) GossipNonBlocking(contentID []byte, subnet *id.Hash) error { event := gossipEvent(contentID, subnet) + peer.Opts.Logger.Debug("gossip nb event", zap.Int("len", len(peer.events)), zap.Int("cap", cap(peer.events))) select { case peer.events <- event: return nil @@ -511,6 +518,7 @@ func gossipEvent(contentID []byte, subnet *id.Hash) event { func (peer *Peer) Send(ctx context.Context, data []byte, remote id.Signatory) error { event, errResponder := sendEvent(data, remote) + peer.Opts.Logger.Debug("send event", zap.Int("len", len(peer.events)), zap.Int("cap", cap(peer.events))) select { case peer.events <- event: @@ -530,6 +538,7 @@ func (peer *Peer) Send(ctx context.Context, data []byte, remote id.Signatory) er func (peer *Peer) SendNonBlocking(data []byte, remote id.Signatory) error { event, errResponder := sendEvent(data, remote) + peer.Opts.Logger.Debug("send nb event", zap.Int("len", len(peer.events)), zap.Int("cap", cap(peer.events))) select { case peer.events <- event: @@ -577,6 +586,7 @@ func (peer *Peer) listenerHandler(conn net.Conn) { connection: conn, gcmSession: gcmSession, } + peer.Opts.Logger.Debug("incoming connection event", zap.Int("len", len(peer.events)), zap.Int("cap", cap(peer.events))) peer.events <- newConnectionEvent }() } @@ -667,22 +677,32 @@ func (peer *Peer) handleEvent(e event) { contentID := string(message.Data) if !peer.filter.filter(remote, message) { + var content []byte + if len(message.Data) != 0 && len(message.SyncData) != 0 { + oldContent, alreadySeenContent := peer.ContentResolver.QueryContent(message.Data) + + if alreadySeenContent { + content = oldContent + } else { + peer.ContentResolver.InsertContent(message.Data, message.SyncData) + + // NOTE(ross): When inserting content, the content may + // be deemed to be invalid depending on the content + // resolver implementation. It is therefore up to the + // caller of `Sync` to make sure that the returned + // content is valid in the context of their program. + content = message.SyncData + } + } + if pendingSync, ok := peer.pendingSyncs[contentID]; ok { for _, responder := range pendingSync.responders { - responder <- message.SyncData + responder <- content } delete(peer.pendingSyncs, contentID) } - if len(message.Data) != 0 && len(message.SyncData) != 0 { - _, alreadySeenContent := peer.ContentResolver.QueryContent(message.Data) - - if !alreadySeenContent { - peer.ContentResolver.InsertContent(message.Data, message.SyncData) - } - } - if gossipSubnet, ok := peer.gossipSubnets[contentID]; ok { pushMessage := wire.Msg{ Version: wire.MsgVersion1, @@ -789,53 +809,57 @@ func (peer *Peer) handleEvent(e event) { contentID := e.message.Data - if pending, ok := peer.pendingSyncs[string(contentID)]; ok { - peer.Opts.Logger.Debug("sync for content id exists") - if uint(len(pending.responders)) >= peer.Opts.MaxActiveSyncsForSameContent { - e.errorResponder <- ErrTooManySyncsForSameContent - } else { - pending.responders = append(pending.responders, e.messageResponder) - - peer.Opts.Logger.Debug("added pending responder for content id sync", zap.Int("current pending responders", len(pending.responders))) - } + if content, ok := peer.ContentResolver.QueryContent(contentID); ok { + e.messageResponder <- content } else { - peer.Opts.Logger.Debug("sync for content id does not exist") - - peer.filter.allow(contentID) + if pending, ok := peer.pendingSyncs[string(contentID)]; ok { + peer.Opts.Logger.Debug("sync for content id exists") + if uint(len(pending.responders)) >= peer.Opts.MaxActiveSyncsForSameContent { + e.errorResponder <- ErrTooManySyncsForSameContent + } else { + pending.responders = append(pending.responders, e.messageResponder) - if uint(len(peer.pendingSyncs)) >= peer.Opts.MaxPendingSyncs { - e.errorResponder <- ErrTooManyPendingSyncs - } else { - pending := pendingSync{ - ctx: e.ctx, - responders: []chan<- []byte{e.messageResponder}, + peer.Opts.Logger.Debug("added pending responder for content id sync", zap.Int("current pending responders", len(pending.responders))) } + } else { + peer.Opts.Logger.Debug("sync for content id does not exist") - peer.pendingSyncs[string(contentID)] = pending + peer.filter.allow(contentID) - peer.Opts.Logger.Debug("added pending sync for content id", zap.Int("current pending syncs", len(peer.pendingSyncs))) + if uint(len(peer.pendingSyncs)) >= peer.Opts.MaxPendingSyncs { + e.errorResponder <- ErrTooManyPendingSyncs + } else { + pending := pendingSync{ + ctx: e.ctx, + responders: []chan<- []byte{e.messageResponder}, + } + + peer.pendingSyncs[string(contentID)] = pending + + peer.Opts.Logger.Debug("added pending sync for content id", zap.Int("current pending syncs", len(peer.pendingSyncs))) + } } - } - peers := peer.PeerTable.RandomPeers(peer.Opts.GossipAlpha) - if e.hint != nil { - peers = append([]id.Signatory{*e.hint}, peers...) - } + peers := peer.PeerTable.RandomPeers(peer.Opts.GossipAlpha) + if e.hint != nil { + peers = append([]id.Signatory{*e.hint}, peers...) + } - message := e.message - warnThreshold := len(peers) / 2 - numErrors := 0 - for _, recipient := range peers { - if err := peer.handleSendMessage(recipient, message); err != nil { - if e.hint != nil && recipient.Equal(e.hint) { - peer.Opts.Logger.Warn("unable to sync from hinted peer", zap.Error(err)) + message := e.message + warnThreshold := len(peers) / 2 + numErrors := 0 + for _, recipient := range peers { + if err := peer.handleSendMessage(recipient, message); err != nil { + if e.hint != nil && recipient.Equal(e.hint) { + peer.Opts.Logger.Warn("unable to sync from hinted peer", zap.Error(err)) + } + numErrors++ } - numErrors++ } - } - if numErrors > warnThreshold { - peer.Opts.Logger.Warn("low sync gossip success rate", zap.String("proportion of successful sends", fmt.Sprintf("%v/%v", len(peers)-numErrors, len(peers)))) + if numErrors > warnThreshold { + peer.Opts.Logger.Warn("low sync gossip success rate", zap.String("proportion of successful sends", fmt.Sprintf("%v/%v", len(peers)-numErrors, len(peers)))) + } } case readerDropped: @@ -1015,6 +1039,7 @@ func (peer *Peer) handleEvent(e event) { } if err == nil && decisionDecoded[0] == keepAliveTrue { + peer.Opts.Logger.Debug("keep alive event", zap.Int("len", len(peer.events)), zap.Int("cap", cap(peer.events))) peer.events <- event{ ty: keepAlive, id: remote, diff --git a/aw_test.go b/aw_test.go index a502f2c..69de39f 100644 --- a/aw_test.go +++ b/aw_test.go @@ -315,6 +315,41 @@ var _ = Describe("Peer", func() { Expect(receivedData).To(Equal(content)) }) + It("should return the same content if it is already present", func() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + opts := defaultOptions(logger) + + peer1 := newPeerAndListen(ctx, opts) + peer2 := newPeerAndListen(ctx, opts) + peers := []*aw.Peer{peer1, peer2} + + connectAllPeers(peers) + + go peer1.Run(ctx) + go peer2.Run(ctx) + + linkAllPeers(peers) + + contentID := []byte("id") + rightContent := []byte("right") + wrongContent := []byte("wrong") + + // NOTE: In practice this could occur if a malicious node sent one + // content for the first sync and different content for the second + // sync (but with the same content ID). + peer1.ContentResolver.InsertContent(contentID, rightContent) + peer2.ContentResolver.InsertContent(contentID, wrongContent) + + syncCtx, syncCancel := context.WithTimeout(ctx, time.Second) + receivedData, err := peer1.Sync(syncCtx, contentID, nil) + syncCancel() + + Expect(err).To(BeNil()) + Expect(receivedData).To(Equal(rightContent)) + }) + Context("many peers and only one has the content", func() { It("should sync the content with no hint", func() { ctx, cancel := context.WithCancel(context.Background())