diff --git a/aw.go b/aw.go index e9093ce..52d9a1e 100644 --- a/aw.go +++ b/aw.go @@ -826,7 +826,7 @@ func (peer *Peer) handleEvent(e event) { peer.filter.allow(contentID) - if uint(len(peer.pendingSyncs)) >= peer.Opts.MaxPendingSyncs { + if numPendingSyncs(peer.pendingSyncs) >= peer.Opts.MaxPendingSyncs { e.errorResponder <- ErrTooManyPendingSyncs } else { pending := pendingSync{ @@ -1697,3 +1697,12 @@ func write( } } } + +func numPendingSyncs(pendingSyncs map[string]pendingSync) uint { + for id, pSync := range pendingSyncs { + if pSync.ctx.Err() != nil { + delete(pendingSyncs, id) + } + } + return uint(len(pendingSyncs)) +} diff --git a/aw_test.go b/aw_test.go index 69de39f..99c1b00 100644 --- a/aw_test.go +++ b/aw_test.go @@ -350,6 +350,41 @@ var _ = Describe("Peer", func() { Expect(receivedData).To(Equal(rightContent)) }) + It("should expire old pending syncs", func() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + opts := defaultOptions(logger) + opts.MaxPendingSyncs = 10 + + peer1 := newPeerAndListen(ctx, opts) + peer2 := newPeerAndListen(ctx, opts) + peers := []*aw.Peer{peer1, peer2} + + connectAllPeers(peers) + + go peer1.Run(ctx) + go peer2.Run(ctx) + + linkAllPeers(peers) + + successID := []byte("success") + peer2.ContentResolver.InsertContent(successID, []byte("content")) + + for i := 0; i < int(opts.MaxPendingSyncs); i++ { + contentID := []byte(fmt.Sprintf("%v", i)) + syncCtx, syncCancel := context.WithTimeout(ctx, 1*time.Millisecond) + peer1.Sync(syncCtx, contentID, nil) + syncCancel() + } + + syncCtx, syncCancel := context.WithTimeout(ctx, 10*time.Millisecond) + _, err := peer1.Sync(syncCtx, successID, nil) + syncCancel() + + Expect(err).ToNot(HaveOccurred()) + }) + Context("many peers and only one has the content", func() { It("should sync the content with no hint", func() { ctx, cancel := context.WithCancel(context.Background())