From 48a603c50db6fd3e87f1d95e7e99433be2feaba3 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Mon, 6 Nov 2023 18:15:07 +0300 Subject: [PATCH] fix(puller): start syncing of peers in parallel (#4443) --- cmd/bee/cmd/db.go | 11 ++++++++--- pkg/puller/metrics.go | 25 ++++++++++++++++--------- pkg/puller/puller.go | 27 ++++++++++++++++++--------- pkg/storer/reserve.go | 2 ++ 4 files changed, 44 insertions(+), 21 deletions(-) diff --git a/cmd/bee/cmd/db.go b/cmd/bee/cmd/db.go index 3e6e910105f..e2887090664 100644 --- a/cmd/bee/cmd/db.go +++ b/cmd/bee/cmd/db.go @@ -642,8 +642,6 @@ func dbNukeCmd(cmd *cobra.Command) { logger.Error(err, "getting sleep value failed") } - defer func() { time.Sleep(d) }() - dataDir, err := cmd.Flags().GetString(optionNameDataDir) if err != nil { return fmt.Errorf("get data-dir: %w", err) @@ -684,6 +682,8 @@ func dbNukeCmd(cmd *cobra.Command) { return nil } + logger.Info("nuking statestore...") + forgetStamps, err := cmd.Flags().GetBool(optionNameForgetStamps) if err != nil { return fmt.Errorf("get forget stamps: %w", err) @@ -709,6 +709,11 @@ func dbNukeCmd(cmd *cobra.Command) { return fmt.Errorf("remove stamperstore: %w", err) } } + + logger.Info("nuke finished") + + time.Sleep(d) + return nil }} c.Flags().String(optionNameDataDir, "", "data directory") @@ -740,7 +745,7 @@ func removeContent(path string) error { return err } } - return os.Remove(path) + return nil } func MarshalChunkToBinary(c swarm.Chunk) ([]byte, error) { diff --git a/pkg/puller/metrics.go b/pkg/puller/metrics.go index 7f81271feda..715a285be63 100644 --- a/pkg/puller/metrics.go +++ b/pkg/puller/metrics.go @@ -10,11 +10,12 @@ import ( ) type metrics struct { - SyncWorkerIterCounter prometheus.Counter // counts the number of syncing iterations - SyncWorkerCounter prometheus.Counter // count number of syncing jobs - SyncWorkerDoneCounter prometheus.Counter // count number of finished syncing jobs - SyncWorkerErrCounter prometheus.Counter // count number of errors - MaxUintErrCounter prometheus.Counter // how many times we got maxuint as topmost + SyncWorkerIterCounter prometheus.Counter // counts the number of syncing iterations + SyncWorkerCounter prometheus.Counter // count number of syncing jobs + SyncedCounter prometheus.CounterVec // number of synced chunks + SyncWorkerDoneCounter prometheus.Counter // count number of finished syncing jobs + SyncWorkerErrCounter prometheus.Counter // count number of errors + MaxUintErrCounter prometheus.Counter // how many times we got maxuint as topmost } func newMetrics() metrics { @@ -25,25 +26,31 @@ func newMetrics() metrics { Namespace: m.Namespace, Subsystem: subsystem, Name: "worker_iterations", - Help: "Total history worker iterations.", + Help: "Total worker iterations.", }), SyncWorkerCounter: prometheus.NewCounter(prometheus.CounterOpts{ Namespace: m.Namespace, Subsystem: subsystem, Name: "worker", - Help: "Total history active worker jobs.", + Help: "Total active worker jobs.", }), + SyncedCounter: *prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "synced_chunks", + Help: "Total synced chunks.", + }, []string{"type"}), SyncWorkerDoneCounter: prometheus.NewCounter(prometheus.CounterOpts{ Namespace: m.Namespace, Subsystem: subsystem, Name: "worker_done", - Help: "Total history worker jobs done.", + Help: "Total worker jobs done.", }), SyncWorkerErrCounter: prometheus.NewCounter(prometheus.CounterOpts{ Namespace: m.Namespace, Subsystem: subsystem, Name: "worker_errors", - Help: "Total history worker errors.", + Help: "Total worker errors.", }), MaxUintErrCounter: prometheus.NewCounter(prometheus.CounterOpts{ Namespace: m.Namespace, diff --git a/pkg/puller/puller.go b/pkg/puller/puller.go index c4f6933fa97..dc0965aa868 100644 --- a/pkg/puller/puller.go +++ b/pkg/puller/puller.go @@ -192,19 +192,24 @@ func (p *Puller) disconnectPeer(addr swarm.Address) { // recalcPeers starts or stops syncing process for peers per bin depending on the current sync radius. // Must be called under lock. func (p *Puller) recalcPeers(ctx context.Context, storageRadius uint8) { - var errs error + var wg sync.WaitGroup for _, peer := range p.syncPeers { - peer.mtx.Lock() - errs = errors.Join(p.syncPeer(ctx, peer, storageRadius)) - peer.mtx.Unlock() - } - if errs != nil { - p.logger.Debug("recalculation failed", "error", errs) + wg.Add(1) + p.wg.Add(1) + go func(peer *syncPeer) { + defer p.wg.Done() + defer wg.Done() + if err := p.syncPeer(ctx, peer, storageRadius); err != nil { + p.logger.Debug("sync peer failed", "peer_address", peer.address, "error", err) + } + }(peer) } + wg.Wait() } -// Must be called under syncPeer lock. func (p *Puller) syncPeer(ctx context.Context, peer *syncPeer, storageRadius uint8) error { + peer.mtx.Lock() + defer peer.mtx.Unlock() if peer.cursors == nil { cursors, epoch, err := p.syncer.GetCursors(ctx, peer.address) @@ -322,7 +327,10 @@ func (p *Puller) syncWorker(ctx context.Context, peer swarm.Address, bin uint8, } if top <= cur { + p.metrics.SyncedCounter.WithLabelValues("historical").Add(float64(count)) p.rate.Add(count) + } else { + p.metrics.SyncedCounter.WithLabelValues("live").Add(float64(count)) } if top >= s { @@ -490,8 +498,9 @@ func newSyncPeer(addr swarm.Address, bins, po uint8) *syncPeer { // called when peer disconnects or on shutdown, cleans up ongoing sync operations func (p *syncPeer) gone() { - for _, c := range p.binCancelFuncs { + for bin, c := range p.binCancelFuncs { c() + delete(p.binCancelFuncs, bin) } p.wg.Wait() } diff --git a/pkg/storer/reserve.go b/pkg/storer/reserve.go index 01762ae8bd2..f02e93e8c21 100644 --- a/pkg/storer/reserve.go +++ b/pkg/storer/reserve.go @@ -84,6 +84,8 @@ func (db *DB) startReserveWorkers( db.logger.Error(err, "reserve set radius") } + db.metrics.StorageRadius.Set(float64(db.reserve.Size())) + // syncing can now begin now that the reserver worker is running db.syncer.Start(ctx)