From ecdbbb00dc9ef84e630a03bf03a0ec2d42e15332 Mon Sep 17 00:00:00 2001 From: ross Date: Wed, 11 May 2022 15:54:12 +1000 Subject: [PATCH 1/2] delete expired pending syncs --- aw.go | 11 ++++++++++- aw_test.go | 40 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 50 insertions(+), 1 deletion(-) diff --git a/aw.go b/aw.go index e9093ce..52d9a1e 100644 --- a/aw.go +++ b/aw.go @@ -826,7 +826,7 @@ func (peer *Peer) handleEvent(e event) { peer.filter.allow(contentID) - if uint(len(peer.pendingSyncs)) >= peer.Opts.MaxPendingSyncs { + if numPendingSyncs(peer.pendingSyncs) >= peer.Opts.MaxPendingSyncs { e.errorResponder <- ErrTooManyPendingSyncs } else { pending := pendingSync{ @@ -1697,3 +1697,12 @@ func write( } } } + +func numPendingSyncs(pendingSyncs map[string]pendingSync) uint { + for id, pSync := range pendingSyncs { + if pSync.ctx.Err() != nil { + delete(pendingSyncs, id) + } + } + return uint(len(pendingSyncs)) +} diff --git a/aw_test.go b/aw_test.go index 69de39f..6e84f8e 100644 --- a/aw_test.go +++ b/aw_test.go @@ -350,6 +350,46 @@ var _ = Describe("Peer", func() { Expect(receivedData).To(Equal(rightContent)) }) + It("should expire old pending syncs", func() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + opts := defaultOptions(logger) + opts.MaxPendingSyncs = 10 + + peer1 := newPeerAndListen(ctx, opts) + peer2 := newPeerAndListen(ctx, opts) + peers := []*aw.Peer{peer1, peer2} + + connectAllPeers(peers) + + go peer1.Run(ctx) + go peer2.Run(ctx) + + linkAllPeers(peers) + + successID := []byte("success") + peer2.ContentResolver.InsertContent(successID, []byte("content")) + + for i := 0; i < int(opts.MaxPendingSyncs); i++ { + i := i + go func() { + contentID := []byte(fmt.Sprintf("%v", i)) + syncCtx, syncCancel := context.WithTimeout(ctx, 100*time.Millisecond) + peer1.Sync(syncCtx, contentID, nil) + syncCancel() + }() + } + + time.Sleep(100 * time.Millisecond) + + syncCtx, syncCancel := context.WithTimeout(ctx, 100*time.Millisecond) + _, err := peer1.Sync(syncCtx, successID, nil) + syncCancel() + + Expect(err).ToNot(HaveOccurred()) + }) + Context("many peers and only one has the content", func() { It("should sync the content with no hint", func() { ctx, cancel := context.WithCancel(context.Background()) From 532e566e8a5913f485afe5ed90e7192e8cc0f814 Mon Sep 17 00:00:00 2001 From: ross Date: Wed, 11 May 2022 16:08:32 +1000 Subject: [PATCH 2/2] remove raciness from test --- aw_test.go | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/aw_test.go b/aw_test.go index 6e84f8e..99c1b00 100644 --- a/aw_test.go +++ b/aw_test.go @@ -372,18 +372,13 @@ var _ = Describe("Peer", func() { peer2.ContentResolver.InsertContent(successID, []byte("content")) for i := 0; i < int(opts.MaxPendingSyncs); i++ { - i := i - go func() { - contentID := []byte(fmt.Sprintf("%v", i)) - syncCtx, syncCancel := context.WithTimeout(ctx, 100*time.Millisecond) - peer1.Sync(syncCtx, contentID, nil) - syncCancel() - }() + contentID := []byte(fmt.Sprintf("%v", i)) + syncCtx, syncCancel := context.WithTimeout(ctx, 1*time.Millisecond) + peer1.Sync(syncCtx, contentID, nil) + syncCancel() } - time.Sleep(100 * time.Millisecond) - - syncCtx, syncCancel := context.WithTimeout(ctx, 100*time.Millisecond) + syncCtx, syncCancel := context.WithTimeout(ctx, 10*time.Millisecond) _, err := peer1.Sync(syncCtx, successID, nil) syncCancel()