Skip to content

Commit

Permalink
use local content when syncing if it exists
Browse files Browse the repository at this point in the history
  • Loading branch information
ross-pure committed May 3, 2022
1 parent f6106a6 commit 88596e8
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 45 deletions.
115 changes: 70 additions & 45 deletions aw.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ func (peer *Peer) Run(ctx context.Context) error {
break LOOP

case <-ticker.C:
peer.Opts.Logger.Debug("peer discovery event", zap.Int("len", len(peer.events)), zap.Int("cap", cap(peer.events)))
peer.events <- peerDiscoveryEvent
}
}
Expand Down Expand Up @@ -382,6 +383,7 @@ func (peer *Peer) Link(remote id.Signatory) error {
errorResponder: responder,
}

peer.Opts.Logger.Debug("link event", zap.Int("len", len(peer.events)), zap.Int("cap", cap(peer.events)))
select {
case peer.events <- event:
return <-responder
Expand All @@ -397,6 +399,7 @@ func (peer *Peer) Unlink(remote id.Signatory) error {
id: remote,
}

peer.Opts.Logger.Debug("unlink event", zap.Int("len", len(peer.events)), zap.Int("cap", cap(peer.events)))
select {
case peer.events <- event:
return nil
Expand All @@ -409,6 +412,7 @@ func (peer *Peer) Unlink(remote id.Signatory) error {
func (peer *Peer) Sync(ctx context.Context, contentID []byte, hint *id.Signatory) ([]byte, error) {
event, errResponder, responder := syncEvent(ctx, contentID, hint)

peer.Opts.Logger.Debug("sync event", zap.Int("len", len(peer.events)), zap.Int("cap", cap(peer.events)))
select {
case peer.events <- event:

Expand All @@ -422,6 +426,7 @@ func (peer *Peer) Sync(ctx context.Context, contentID []byte, hint *id.Signatory
func (peer *Peer) SyncNonBlocking(ctx context.Context, contentID []byte, hint *id.Signatory) ([]byte, error) {
event, errResponder, responder := syncEvent(ctx, contentID, hint)

peer.Opts.Logger.Debug("sync nb event", zap.Int("len", len(peer.events)), zap.Int("cap", cap(peer.events)))
select {
case peer.events <- event:

Expand Down Expand Up @@ -469,6 +474,7 @@ func syncResponse(ctx context.Context, errResponder chan error, responder chan [
func (peer *Peer) Gossip(ctx context.Context, contentID []byte, subnet *id.Hash) error {
event := gossipEvent(contentID, subnet)

peer.Opts.Logger.Debug("gossip event", zap.Int("len", len(peer.events)), zap.Int("cap", cap(peer.events)))
select {
case peer.events <- event:
return nil
Expand All @@ -481,6 +487,7 @@ func (peer *Peer) Gossip(ctx context.Context, contentID []byte, subnet *id.Hash)
func (peer *Peer) GossipNonBlocking(contentID []byte, subnet *id.Hash) error {
event := gossipEvent(contentID, subnet)

peer.Opts.Logger.Debug("gossip nb event", zap.Int("len", len(peer.events)), zap.Int("cap", cap(peer.events)))
select {
case peer.events <- event:
return nil
Expand Down Expand Up @@ -511,6 +518,7 @@ func gossipEvent(contentID []byte, subnet *id.Hash) event {
func (peer *Peer) Send(ctx context.Context, data []byte, remote id.Signatory) error {
event, errResponder := sendEvent(data, remote)

peer.Opts.Logger.Debug("send event", zap.Int("len", len(peer.events)), zap.Int("cap", cap(peer.events)))
select {
case peer.events <- event:

Expand All @@ -530,6 +538,7 @@ func (peer *Peer) Send(ctx context.Context, data []byte, remote id.Signatory) er
func (peer *Peer) SendNonBlocking(data []byte, remote id.Signatory) error {
event, errResponder := sendEvent(data, remote)

peer.Opts.Logger.Debug("send nb event", zap.Int("len", len(peer.events)), zap.Int("cap", cap(peer.events)))
select {
case peer.events <- event:

Expand Down Expand Up @@ -577,6 +586,7 @@ func (peer *Peer) listenerHandler(conn net.Conn) {
connection: conn,
gcmSession: gcmSession,
}
peer.Opts.Logger.Debug("incoming connection event", zap.Int("len", len(peer.events)), zap.Int("cap", cap(peer.events)))
peer.events <- newConnectionEvent
}()
}
Expand Down Expand Up @@ -667,22 +677,32 @@ func (peer *Peer) handleEvent(e event) {
contentID := string(message.Data)

if !peer.filter.filter(remote, message) {
var content []byte
if len(message.Data) != 0 && len(message.SyncData) != 0 {
oldContent, alreadySeenContent := peer.ContentResolver.QueryContent(message.Data)

if alreadySeenContent {
content = oldContent
} else {
peer.ContentResolver.InsertContent(message.Data, message.SyncData)

// NOTE(ross): When inserting content, the content may
// be deemed to be invalid depending on the content
// resolver implementation. It is therefore up to the
// caller of `Sync` to make sure that the returned
// content is valid in the context of their program.
content = message.SyncData
}
}

if pendingSync, ok := peer.pendingSyncs[contentID]; ok {
for _, responder := range pendingSync.responders {
responder <- message.SyncData
responder <- content
}

delete(peer.pendingSyncs, contentID)
}

if len(message.Data) != 0 && len(message.SyncData) != 0 {
_, alreadySeenContent := peer.ContentResolver.QueryContent(message.Data)

if !alreadySeenContent {
peer.ContentResolver.InsertContent(message.Data, message.SyncData)
}
}

if gossipSubnet, ok := peer.gossipSubnets[contentID]; ok {
pushMessage := wire.Msg{
Version: wire.MsgVersion1,
Expand Down Expand Up @@ -789,53 +809,57 @@ func (peer *Peer) handleEvent(e event) {

contentID := e.message.Data

if pending, ok := peer.pendingSyncs[string(contentID)]; ok {
peer.Opts.Logger.Debug("sync for content id exists")
if uint(len(pending.responders)) >= peer.Opts.MaxActiveSyncsForSameContent {
e.errorResponder <- ErrTooManySyncsForSameContent
} else {
pending.responders = append(pending.responders, e.messageResponder)

peer.Opts.Logger.Debug("added pending responder for content id sync", zap.Int("current pending responders", len(pending.responders)))
}
if content, ok := peer.ContentResolver.QueryContent(contentID); ok {
e.messageResponder <- content
} else {
peer.Opts.Logger.Debug("sync for content id does not exist")

peer.filter.allow(contentID)
if pending, ok := peer.pendingSyncs[string(contentID)]; ok {
peer.Opts.Logger.Debug("sync for content id exists")
if uint(len(pending.responders)) >= peer.Opts.MaxActiveSyncsForSameContent {
e.errorResponder <- ErrTooManySyncsForSameContent
} else {
pending.responders = append(pending.responders, e.messageResponder)

if uint(len(peer.pendingSyncs)) >= peer.Opts.MaxPendingSyncs {
e.errorResponder <- ErrTooManyPendingSyncs
} else {
pending := pendingSync{
ctx: e.ctx,
responders: []chan<- []byte{e.messageResponder},
peer.Opts.Logger.Debug("added pending responder for content id sync", zap.Int("current pending responders", len(pending.responders)))
}
} else {
peer.Opts.Logger.Debug("sync for content id does not exist")

peer.pendingSyncs[string(contentID)] = pending
peer.filter.allow(contentID)

peer.Opts.Logger.Debug("added pending sync for content id", zap.Int("current pending syncs", len(peer.pendingSyncs)))
if uint(len(peer.pendingSyncs)) >= peer.Opts.MaxPendingSyncs {
e.errorResponder <- ErrTooManyPendingSyncs
} else {
pending := pendingSync{
ctx: e.ctx,
responders: []chan<- []byte{e.messageResponder},
}

peer.pendingSyncs[string(contentID)] = pending

peer.Opts.Logger.Debug("added pending sync for content id", zap.Int("current pending syncs", len(peer.pendingSyncs)))
}
}
}

peers := peer.PeerTable.RandomPeers(peer.Opts.GossipAlpha)
if e.hint != nil {
peers = append([]id.Signatory{*e.hint}, peers...)
}
peers := peer.PeerTable.RandomPeers(peer.Opts.GossipAlpha)
if e.hint != nil {
peers = append([]id.Signatory{*e.hint}, peers...)
}

message := e.message
warnThreshold := len(peers) / 2
numErrors := 0
for _, recipient := range peers {
if err := peer.handleSendMessage(recipient, message); err != nil {
if e.hint != nil && recipient.Equal(e.hint) {
peer.Opts.Logger.Warn("unable to sync from hinted peer", zap.Error(err))
message := e.message
warnThreshold := len(peers) / 2
numErrors := 0
for _, recipient := range peers {
if err := peer.handleSendMessage(recipient, message); err != nil {
if e.hint != nil && recipient.Equal(e.hint) {
peer.Opts.Logger.Warn("unable to sync from hinted peer", zap.Error(err))
}
numErrors++
}
numErrors++
}
}

if numErrors > warnThreshold {
peer.Opts.Logger.Warn("low sync gossip success rate", zap.String("proportion of successful sends", fmt.Sprintf("%v/%v", len(peers)-numErrors, len(peers))))
if numErrors > warnThreshold {
peer.Opts.Logger.Warn("low sync gossip success rate", zap.String("proportion of successful sends", fmt.Sprintf("%v/%v", len(peers)-numErrors, len(peers))))
}
}

case readerDropped:
Expand Down Expand Up @@ -1015,6 +1039,7 @@ func (peer *Peer) handleEvent(e event) {
}

if err == nil && decisionDecoded[0] == keepAliveTrue {
peer.Opts.Logger.Debug("keep alive event", zap.Int("len", len(peer.events)), zap.Int("cap", cap(peer.events)))
peer.events <- event{
ty: keepAlive,
id: remote,
Expand Down
35 changes: 35 additions & 0 deletions aw_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,41 @@ var _ = Describe("Peer", func() {
Expect(receivedData).To(Equal(content))
})

It("should return the same content if it is already present", func() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

opts := defaultOptions(logger)

peer1 := newPeerAndListen(ctx, opts)
peer2 := newPeerAndListen(ctx, opts)
peers := []*aw.Peer{peer1, peer2}

connectAllPeers(peers)

go peer1.Run(ctx)
go peer2.Run(ctx)

linkAllPeers(peers)

contentID := []byte("id")
rightContent := []byte("right")
wrongContent := []byte("wrong")

// NOTE: In practice this could occur if a malicious node sent one
// content for the first sync and different content for the second
// sync (but with the same content ID).
peer1.ContentResolver.InsertContent(contentID, rightContent)
peer2.ContentResolver.InsertContent(contentID, wrongContent)

syncCtx, syncCancel := context.WithTimeout(ctx, time.Second)
receivedData, err := peer1.Sync(syncCtx, contentID, nil)
syncCancel()

Expect(err).To(BeNil())
Expect(receivedData).To(Equal(rightContent))
})

Context("many peers and only one has the content", func() {
It("should sync the content with no hint", func() {
ctx, cancel := context.WithCancel(context.Background())
Expand Down

0 comments on commit 88596e8

Please sign in to comment.