Skip to content

Commit

Permalink
delete expired pending syncs
Browse files Browse the repository at this point in the history
  • Loading branch information
ross-pure committed May 11, 2022
1 parent f2c79db commit ecdbbb0
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 1 deletion.
11 changes: 10 additions & 1 deletion aw.go
Original file line number Diff line number Diff line change
Expand Up @@ -826,7 +826,7 @@ func (peer *Peer) handleEvent(e event) {

peer.filter.allow(contentID)

if uint(len(peer.pendingSyncs)) >= peer.Opts.MaxPendingSyncs {
if numPendingSyncs(peer.pendingSyncs) >= peer.Opts.MaxPendingSyncs {
e.errorResponder <- ErrTooManyPendingSyncs
} else {
pending := pendingSync{
Expand Down Expand Up @@ -1697,3 +1697,12 @@ func write(
}
}
}

func numPendingSyncs(pendingSyncs map[string]pendingSync) uint {
for id, pSync := range pendingSyncs {
if pSync.ctx.Err() != nil {
delete(pendingSyncs, id)
}
}
return uint(len(pendingSyncs))
}
40 changes: 40 additions & 0 deletions aw_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,46 @@ var _ = Describe("Peer", func() {
Expect(receivedData).To(Equal(rightContent))
})

It("should expire old pending syncs", func() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

opts := defaultOptions(logger)
opts.MaxPendingSyncs = 10

peer1 := newPeerAndListen(ctx, opts)
peer2 := newPeerAndListen(ctx, opts)
peers := []*aw.Peer{peer1, peer2}

connectAllPeers(peers)

go peer1.Run(ctx)
go peer2.Run(ctx)

linkAllPeers(peers)

successID := []byte("success")
peer2.ContentResolver.InsertContent(successID, []byte("content"))

for i := 0; i < int(opts.MaxPendingSyncs); i++ {
i := i
go func() {
contentID := []byte(fmt.Sprintf("%v", i))
syncCtx, syncCancel := context.WithTimeout(ctx, 100*time.Millisecond)
peer1.Sync(syncCtx, contentID, nil)
syncCancel()
}()
}

time.Sleep(100 * time.Millisecond)

syncCtx, syncCancel := context.WithTimeout(ctx, 100*time.Millisecond)
_, err := peer1.Sync(syncCtx, successID, nil)
syncCancel()

Expect(err).ToNot(HaveOccurred())
})

Context("many peers and only one has the content", func() {
It("should sync the content with no hint", func() {
ctx, cancel := context.WithCancel(context.Background())
Expand Down

0 comments on commit ecdbbb0

Please sign in to comment.