Skip to content

Commit

Permalink
perf(pin_api): fetch chunks in parallel (#4428)
Browse files Browse the repository at this point in the history
  • Loading branch information
istae authored Oct 26, 2023
1 parent 4a424d5 commit d324646
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 8 deletions.
49 changes: 41 additions & 8 deletions pkg/api/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@ package api
import (
"errors"
"net/http"
"sync"

"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/traversal"
"github.com/gorilla/mux"
"golang.org/x/sync/semaphore"
)

// pinRootHash pins root hash of given reference. This method is idempotent.
Expand Down Expand Up @@ -49,24 +52,54 @@ func (s *Service) pinRootHash(w http.ResponseWriter, r *http.Request) {
getter := s.storer.Download(true)
traverser := traversal.New(getter)

sem := semaphore.NewWeighted(100)
var errTraverse error
var mtxErr sync.Mutex
var wg sync.WaitGroup

err = traverser.Traverse(
r.Context(),
paths.Reference,
func(address swarm.Address) error {
chunk, err := getter.Get(r.Context(), address)
if err != nil {
return err
mtxErr.Lock()
if errTraverse != nil {
mtxErr.Unlock()
return errTraverse
}
err = putter.Put(r.Context(), chunk)
if err != nil {
mtxErr.Unlock()
if err := sem.Acquire(r.Context(), 1); err != nil {
return err
}
wg.Add(1)
go func() {
var err error
defer func() {
sem.Release(1)
wg.Done()
if err != nil {
mtxErr.Lock()
errTraverse = errors.Join(errTraverse, err)
mtxErr.Unlock()
}
}()
chunk, err := getter.Get(r.Context(), address)
if err != nil {
return
}
err = putter.Put(r.Context(), chunk)
}()
return nil
},
)
if err != nil {
logger.Debug("pin collection failed", "error", errors.Join(err, putter.Cleanup()))
logger.Error(nil, "pin collection failed")

wg.Wait()

if err := errors.Join(err, errTraverse); err != nil {
logger.Error(errors.Join(err, putter.Cleanup()), "pin collection failed")
if errors.Is(err, storage.ErrNotFound) {
jsonhttp.NotFound(w, "pin collection failed")
return
}
jsonhttp.InternalServerError(w, "pin collection failed")
return
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/api/pin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ func TestPinHandlers(t *testing.T) {
checkPinHandlers(t, client, rootHash, true)
})

t.Run("bytes missing", func(t *testing.T) {
jsonhttptest.Request(t, client, http.MethodPost, "/pins/"+swarm.RandAddress(t).String(), http.StatusNotFound)
})

t.Run("bzz", func(t *testing.T) {
tarReader := tarFiles(t, []f{{
data: []byte("<h1>Swarm"),
Expand Down

0 comments on commit d324646

Please sign in to comment.