From d324646c8ea38101bfb02b0e8dc67e27b01913d7 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Thu, 26 Oct 2023 16:32:07 +0300 Subject: [PATCH] perf(pin_api): fetch chunks in parallel (#4428) --- pkg/api/pin.go | 49 +++++++++++++++++++++++++++++++++++++-------- pkg/api/pin_test.go | 4 ++++ 2 files changed, 45 insertions(+), 8 deletions(-) diff --git a/pkg/api/pin.go b/pkg/api/pin.go index 5d82cbaea47..a98af277583 100644 --- a/pkg/api/pin.go +++ b/pkg/api/pin.go @@ -7,11 +7,14 @@ package api import ( "errors" "net/http" + "sync" "github.com/ethersphere/bee/pkg/jsonhttp" + "github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/traversal" "github.com/gorilla/mux" + "golang.org/x/sync/semaphore" ) // pinRootHash pins root hash of given reference. This method is idempotent. @@ -49,24 +52,54 @@ func (s *Service) pinRootHash(w http.ResponseWriter, r *http.Request) { getter := s.storer.Download(true) traverser := traversal.New(getter) + sem := semaphore.NewWeighted(100) + var errTraverse error + var mtxErr sync.Mutex + var wg sync.WaitGroup + err = traverser.Traverse( r.Context(), paths.Reference, func(address swarm.Address) error { - chunk, err := getter.Get(r.Context(), address) - if err != nil { - return err + mtxErr.Lock() + if errTraverse != nil { + mtxErr.Unlock() + return errTraverse } - err = putter.Put(r.Context(), chunk) - if err != nil { + mtxErr.Unlock() + if err := sem.Acquire(r.Context(), 1); err != nil { return err } + wg.Add(1) + go func() { + var err error + defer func() { + sem.Release(1) + wg.Done() + if err != nil { + mtxErr.Lock() + errTraverse = errors.Join(errTraverse, err) + mtxErr.Unlock() + } + }() + chunk, err := getter.Get(r.Context(), address) + if err != nil { + return + } + err = putter.Put(r.Context(), chunk) + }() return nil }, ) - if err != nil { - logger.Debug("pin collection failed", "error", errors.Join(err, putter.Cleanup())) - logger.Error(nil, "pin collection failed") + + wg.Wait() + + if err := errors.Join(err, errTraverse); err != nil { + logger.Error(errors.Join(err, putter.Cleanup()), "pin collection failed") + if errors.Is(err, storage.ErrNotFound) { + jsonhttp.NotFound(w, "pin collection failed") + return + } jsonhttp.InternalServerError(w, "pin collection failed") return } diff --git a/pkg/api/pin_test.go b/pkg/api/pin_test.go index 74ceafdcc5e..751b93b1d65 100644 --- a/pkg/api/pin_test.go +++ b/pkg/api/pin_test.go @@ -95,6 +95,10 @@ func TestPinHandlers(t *testing.T) { checkPinHandlers(t, client, rootHash, true) }) + t.Run("bytes missing", func(t *testing.T) { + jsonhttptest.Request(t, client, http.MethodPost, "/pins/"+swarm.RandAddress(t).String(), http.StatusNotFound) + }) + t.Run("bzz", func(t *testing.T) { tarReader := tarFiles(t, []f{{ data: []byte("