Skip to content

Commit

Permalink
fix(puller): start syncing of peers in parallel (#4443)
Browse files Browse the repository at this point in the history
  • Loading branch information
istae authored Nov 6, 2023
1 parent ed4481f commit 48a603c
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 21 deletions.
11 changes: 8 additions & 3 deletions cmd/bee/cmd/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,8 +642,6 @@ func dbNukeCmd(cmd *cobra.Command) {
logger.Error(err, "getting sleep value failed")
}

defer func() { time.Sleep(d) }()

dataDir, err := cmd.Flags().GetString(optionNameDataDir)
if err != nil {
return fmt.Errorf("get data-dir: %w", err)
Expand Down Expand Up @@ -684,6 +682,8 @@ func dbNukeCmd(cmd *cobra.Command) {
return nil
}

logger.Info("nuking statestore...")

forgetStamps, err := cmd.Flags().GetBool(optionNameForgetStamps)
if err != nil {
return fmt.Errorf("get forget stamps: %w", err)
Expand All @@ -709,6 +709,11 @@ func dbNukeCmd(cmd *cobra.Command) {
return fmt.Errorf("remove stamperstore: %w", err)
}
}

logger.Info("nuke finished")

time.Sleep(d)

return nil
}}
c.Flags().String(optionNameDataDir, "", "data directory")
Expand Down Expand Up @@ -740,7 +745,7 @@ func removeContent(path string) error {
return err
}
}
return os.Remove(path)
return nil
}

func MarshalChunkToBinary(c swarm.Chunk) ([]byte, error) {
Expand Down
25 changes: 16 additions & 9 deletions pkg/puller/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ import (
)

type metrics struct {
SyncWorkerIterCounter prometheus.Counter // counts the number of syncing iterations
SyncWorkerCounter prometheus.Counter // count number of syncing jobs
SyncWorkerDoneCounter prometheus.Counter // count number of finished syncing jobs
SyncWorkerErrCounter prometheus.Counter // count number of errors
MaxUintErrCounter prometheus.Counter // how many times we got maxuint as topmost
SyncWorkerIterCounter prometheus.Counter // counts the number of syncing iterations
SyncWorkerCounter prometheus.Counter // count number of syncing jobs
SyncedCounter prometheus.CounterVec // number of synced chunks
SyncWorkerDoneCounter prometheus.Counter // count number of finished syncing jobs
SyncWorkerErrCounter prometheus.Counter // count number of errors
MaxUintErrCounter prometheus.Counter // how many times we got maxuint as topmost
}

func newMetrics() metrics {
Expand All @@ -25,25 +26,31 @@ func newMetrics() metrics {
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "worker_iterations",
Help: "Total history worker iterations.",
Help: "Total worker iterations.",
}),
SyncWorkerCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "worker",
Help: "Total history active worker jobs.",
Help: "Total active worker jobs.",
}),
SyncedCounter: *prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "synced_chunks",
Help: "Total synced chunks.",
}, []string{"type"}),
SyncWorkerDoneCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "worker_done",
Help: "Total history worker jobs done.",
Help: "Total worker jobs done.",
}),
SyncWorkerErrCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "worker_errors",
Help: "Total history worker errors.",
Help: "Total worker errors.",
}),
MaxUintErrCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Expand Down
27 changes: 18 additions & 9 deletions pkg/puller/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,19 +192,24 @@ func (p *Puller) disconnectPeer(addr swarm.Address) {
// recalcPeers starts or stops syncing process for peers per bin depending on the current sync radius.
// Must be called under lock.
func (p *Puller) recalcPeers(ctx context.Context, storageRadius uint8) {
var errs error
var wg sync.WaitGroup
for _, peer := range p.syncPeers {
peer.mtx.Lock()
errs = errors.Join(p.syncPeer(ctx, peer, storageRadius))
peer.mtx.Unlock()
}
if errs != nil {
p.logger.Debug("recalculation failed", "error", errs)
wg.Add(1)
p.wg.Add(1)
go func(peer *syncPeer) {
defer p.wg.Done()
defer wg.Done()
if err := p.syncPeer(ctx, peer, storageRadius); err != nil {
p.logger.Debug("sync peer failed", "peer_address", peer.address, "error", err)
}
}(peer)
}
wg.Wait()
}

// Must be called under syncPeer lock.
func (p *Puller) syncPeer(ctx context.Context, peer *syncPeer, storageRadius uint8) error {
peer.mtx.Lock()
defer peer.mtx.Unlock()

if peer.cursors == nil {
cursors, epoch, err := p.syncer.GetCursors(ctx, peer.address)
Expand Down Expand Up @@ -322,7 +327,10 @@ func (p *Puller) syncWorker(ctx context.Context, peer swarm.Address, bin uint8,
}

if top <= cur {
p.metrics.SyncedCounter.WithLabelValues("historical").Add(float64(count))
p.rate.Add(count)
} else {
p.metrics.SyncedCounter.WithLabelValues("live").Add(float64(count))
}

if top >= s {
Expand Down Expand Up @@ -490,8 +498,9 @@ func newSyncPeer(addr swarm.Address, bins, po uint8) *syncPeer {

// called when peer disconnects or on shutdown, cleans up ongoing sync operations
func (p *syncPeer) gone() {
for _, c := range p.binCancelFuncs {
for bin, c := range p.binCancelFuncs {
c()
delete(p.binCancelFuncs, bin)
}
p.wg.Wait()
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/storer/reserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ func (db *DB) startReserveWorkers(
db.logger.Error(err, "reserve set radius")
}

db.metrics.StorageRadius.Set(float64(db.reserve.Size()))

// syncing can now begin now that the reserver worker is running
db.syncer.Start(ctx)

Expand Down

0 comments on commit 48a603c

Please sign in to comment.