Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: re-add livesync #4554

Merged
merged 7 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 92 additions & 73 deletions pkg/puller/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ import (
"sync"
"time"

"github.com/ethersphere/bee/pkg/intervalstore"
"github.com/ethersphere/bee/pkg/log"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/puller/intervalstore"
"github.com/ethersphere/bee/pkg/pullsync"
"github.com/ethersphere/bee/pkg/rate"
"github.com/ethersphere/bee/pkg/storage"
Expand Down Expand Up @@ -95,7 +95,7 @@ func New(
blockLister: blockLister,
rate: rate.New(DefaultHistRateWindow),
cancel: func() { /* Noop, since the context is initialized in the Start(). */ },
limiter: ratelimit.NewLimiter(ratelimit.Every(time.Second/2), int(swarm.MaxBins)), // allows for 2 syncs per second, max bins bursts
limiter: ratelimit.NewLimiter(ratelimit.Every(time.Second/4), int(swarm.MaxBins)), // allows for 2 syncs per second, max bins bursts
}

return p
Expand Down Expand Up @@ -129,25 +129,28 @@ func (p *Puller) manage(ctx context.Context) {
p.syncPeersMtx.Lock()
defer p.syncPeersMtx.Unlock()

// peersDisconnected is used to mark and prune peers that are no longer connected.
peersDisconnected := make(map[string]*syncPeer)
for _, peer := range p.syncPeers {
peersDisconnected[peer.address.ByteString()] = peer
}

newRadius := p.radius.StorageRadius()

// reset all intervals below the new radius to resync:
// 1. previously evicted chunks
// 2. previously ignored chunks due to a higher radius
if newRadius < prevRadius {
for _, peer := range p.syncPeers {
p.disconnectPeer(peer.address)
}
err := p.resetIntervals(prevRadius)
if err != nil {
p.logger.Debug("reset lower sync radius failed", "error", err)
}
}
prevRadius = newRadius

// peersDisconnected is used to mark and prune peers that are no longer connected.
peersDisconnected := make(map[string]*syncPeer)
for _, peer := range p.syncPeers {
peersDisconnected[peer.address.ByteString()] = peer
}

_ = p.topology.EachConnectedPeerRev(func(addr swarm.Address, po uint8) (stop, jumpToNext bool, err error) {
if _, ok := p.syncPeers[addr.ByteString()]; !ok {
p.syncPeers[addr.ByteString()] = newSyncPeer(addr, p.bins, po)
Expand Down Expand Up @@ -187,7 +190,7 @@ func (p *Puller) disconnectPeer(addr swarm.Address) {
loggerV2.Debug("disconnecting peer", "peer_address", addr)
if peer, ok := p.syncPeers[addr.ByteString()]; ok {
peer.mtx.Lock()
peer.gone()
peer.stop()
peer.mtx.Unlock()
}
delete(p.syncPeers, addr.ByteString())
Expand Down Expand Up @@ -229,7 +232,7 @@ func (p *Puller) syncPeer(ctx context.Context, peer *syncPeer, storageRadius uin

if storedEpoch != epoch {
// cancel all bins
peer.gone()
peer.stop()

p.logger.Debug("peer epoch change detected, resetting past synced intervals", "stored_epoch", storedEpoch, "new_epoch", epoch, "peer_address", peer.address)

Expand Down Expand Up @@ -285,81 +288,97 @@ func (p *Puller) syncPeer(ctx context.Context, peer *syncPeer, storageRadius uin

// syncPeerBin will start historical and live syncing for the peer for a particular bin.
// Must be called under syncPeer lock.
func (p *Puller) syncPeerBin(ctx context.Context, peer *syncPeer, bin uint8, cur uint64) {
binCtx, cancel := context.WithCancel(ctx)
peer.setBinCancel(cancel, bin)
peer.wg.Add(1)
p.wg.Add(1)
go p.syncWorker(binCtx, peer.address, bin, cur, peer.wg.Done)
}

func (p *Puller) syncWorker(ctx context.Context, peer swarm.Address, bin uint8, cur uint64, done func()) {
func (p *Puller) syncPeerBin(parentCtx context.Context, peer *syncPeer, bin uint8, cursor uint64) {
loggerV2 := p.logger.V(2).Register()

p.metrics.SyncWorkerCounter.Inc()
defer p.wg.Done()
defer p.metrics.SyncWorkerDoneCounter.Inc()
defer done()
ctx, cancel := context.WithCancel(parentCtx)
peer.setBinCancel(cancel, bin)

loggerV2.Debug("syncWorker starting", "peer_address", peer, "bin", bin, "cursor", cur)
sync := func(isHistorical bool, address swarm.Address, start uint64, bin uint8, done func()) {
p.metrics.SyncWorkerCounter.Inc()

for {
defer p.wg.Done()
defer p.metrics.SyncWorkerDoneCounter.Inc()
defer done()

s, _, _, err := p.nextPeerInterval(peer, bin)
if err != nil {
p.metrics.SyncWorkerErrCounter.Inc()
p.logger.Error(err, "syncWorker nextPeerInterval failed, quitting")
return
}

// rate limit historical syncing
if s <= cur {
_ = p.limiter.Wait(ctx)
}
var (
cursor = start
err error
)

select {
case <-ctx.Done():
loggerV2.Debug("syncWorker context cancelled", "peer_address", peer, "bin", bin)
return
default:
}
for {
if isHistorical {
start, err = p.nextPeerInterval(address, bin)
if err != nil {
p.metrics.SyncWorkerErrCounter.Inc()
p.logger.Error(err, "syncWorker nextPeerInterval failed, quitting")
return
}

// historical sync has caught up to the cursor, exit
if start > cursor {
return
}
// rate limit historical syncing
_ = p.limiter.Wait(ctx)
}

p.metrics.SyncWorkerIterCounter.Inc()
select {
case <-ctx.Done():
loggerV2.Debug("syncWorker context cancelled", "peer_address", address, "bin", bin)
return
default:
}

syncStart := time.Now()
top, count, err := p.syncer.Sync(ctx, peer, bin, s)
p.metrics.SyncWorkerIterCounter.Inc()

if top == math.MaxUint64 {
p.metrics.MaxUintErrCounter.Inc()
p.logger.Error(nil, "syncWorker max uint64 encountered, quitting", "peer_address", peer, "bin", bin, "from", s, "topmost", top)
return
}
syncStart := time.Now()
top, count, err := p.syncer.Sync(ctx, address, bin, start)

if top <= cur {
p.metrics.SyncedCounter.WithLabelValues("historical").Add(float64(count))
p.rate.Add(count)
} else {
p.metrics.SyncedCounter.WithLabelValues("live").Add(float64(count))
}
if top == math.MaxUint64 {
p.metrics.MaxUintErrCounter.Inc()
p.logger.Error(nil, "syncWorker max uint64 encountered, quitting", "peer_address", address, "bin", bin, "from", start, "topmost", top)
return
}

if top >= s {
if err := p.addPeerInterval(peer, bin, s, top); err != nil {
if err != nil {
p.metrics.SyncWorkerErrCounter.Inc()
p.logger.Error(err, "syncWorker could not persist interval for peer, quitting", "peer_address", peer)
return
if errors.Is(err, p2p.ErrPeerNotFound) {
p.logger.Debug("syncWorker interval failed, quitting", "error", err, "peer_address", address, "bin", bin, "cursor", address, "start", start, "topmost", top)
return
}
loggerV2.Debug("syncWorker interval failed", "error", err, "peer_address", address, "bin", bin, "cursor", address, "start", start, "topmost", top)
}
loggerV2.Debug("syncWorker pulled", "bin", bin, "start", s, "topmost", top, "duration", time.Since(syncStart), "peer_address", peer)
}

if err != nil {
p.metrics.SyncWorkerErrCounter.Inc()
if errors.Is(err, p2p.ErrPeerNotFound) {
p.logger.Debug("syncWorker interval failed, quitting", "error", err, "peer_address", peer, "bin", bin, "cursor", cur, "start", s, "topmost", top)
return
if isHistorical {
p.metrics.SyncedCounter.WithLabelValues("historical").Add(float64(count))
p.rate.Add(count)
} else {
p.metrics.SyncedCounter.WithLabelValues("live").Add(float64(count))
}

// pulled at least one chunk
if top >= start {
if err := p.addPeerInterval(address, bin, start, top); err != nil {
p.metrics.SyncWorkerErrCounter.Inc()
p.logger.Error(err, "syncWorker could not persist interval for peer, quitting", "peer_address", address)
return
}
loggerV2.Debug("syncWorker pulled", "bin", bin, "start", start, "topmost", top, "isHistorical", isHistorical, "duration", time.Since(syncStart), "peer_address", address)
start = top + 1
}
p.logger.Debug("syncWorker interval failed", "error", err, "peer_address", peer, "bin", bin, "cursor", cur, "start", s, "topmost", top)
}
}

if cursor > 0 {
peer.wg.Add(1)
p.wg.Add(1)
go sync(true, peer.address, cursor, bin, peer.wg.Done)
}

peer.wg.Add(1)
p.wg.Add(1)
go sync(false, peer.address, cursor+1, bin, peer.wg.Done)
}

func (p *Puller) Close() error {
Expand Down Expand Up @@ -443,17 +462,17 @@ func (p *Puller) resetIntervals(upto uint8) (err error) {
return
}

func (p *Puller) nextPeerInterval(peer swarm.Address, bin uint8) (start, end uint64, empty bool, err error) {
func (p *Puller) nextPeerInterval(peer swarm.Address, bin uint8) (uint64, error) {
p.intervalMtx.Lock()
defer p.intervalMtx.Unlock()

i, err := p.getOrCreateInterval(peer, bin)
if err != nil {
return 0, 0, false, err
return 0, err
}

start, end, empty = i.Next(0)
return start, end, empty, nil
start, _, _ := i.Next(0)
return start, nil
}

// Must be called underlock.
Expand Down Expand Up @@ -506,7 +525,7 @@ func newSyncPeer(addr swarm.Address, bins, po uint8) *syncPeer {
}

// called when peer disconnects or on shutdown, cleans up ongoing sync operations
func (p *syncPeer) gone() {
func (p *syncPeer) stop() {
for bin, c := range p.binCancelFuncs {
c()
delete(p.binCancelFuncs, bin)
Expand Down
11 changes: 6 additions & 5 deletions pkg/puller/puller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (
"testing"
"time"

"github.com/ethersphere/bee/pkg/intervalstore"
"github.com/ethersphere/bee/pkg/log"
"github.com/ethersphere/bee/pkg/puller"
"github.com/ethersphere/bee/pkg/puller/intervalstore"
mockps "github.com/ethersphere/bee/pkg/pullsync/mock"
"github.com/ethersphere/bee/pkg/spinlock"
"github.com/ethersphere/bee/pkg/statestore/mock"
Expand Down Expand Up @@ -438,11 +438,12 @@ func TestContinueSyncing(t *testing.T) {

time.Sleep(100 * time.Millisecond)
kad.Trigger()
time.Sleep(time.Second)

calls := len(pullsync.SyncCalls(addr))
if calls != 1 {
t.Fatalf("unexpected amount of calls, got %d", calls)
err := spinlock.Wait(time.Second, func() bool {
return len(pullsync.SyncCalls(addr)) == 1
})
if err != nil {
t.Fatal(err)
}
}

Expand Down
Loading