Skip to content

Commit

Permalink
feat: redundancy retrieve api (#4529)
Browse files Browse the repository at this point in the history
Co-authored-by: nugaon <[email protected]>
Co-authored-by: Anatol <[email protected]>
Co-authored-by: dysordys <[email protected]>
Co-authored-by: Gyorgy Barabas <[email protected]>
  • Loading branch information
5 people authored Feb 7, 2024
1 parent 5cc9eef commit e3d4d5a
Show file tree
Hide file tree
Showing 25 changed files with 1,193 additions and 255 deletions.
2 changes: 1 addition & 1 deletion openapi/SwarmCommon.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -954,7 +954,7 @@ components:
required: false
description: >
Specify the retrieve strategy on redundant data.
The mumbers stand for NONE, DATA, PROX and RACE, respectively.
The numbers stand for NONE, DATA, PROX and RACE, respectively.
Strategy NONE means no prefetching takes place.
Strategy DATA means only data chunks are prefetched.
Strategy PROX means only chunks that are close to the node are prefetched.
Expand Down
22 changes: 2 additions & 20 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ const (
SwarmRedundancyLevelHeader = "Swarm-Redundancy-Level"
SwarmRedundancyStrategyHeader = "Swarm-Redundancy-Strategy"
SwarmRedundancyFallbackModeHeader = "Swarm-Redundancy-Fallback-Mode"
SwarmChunkRetrievalTimeoutHeader = "Swarm-Chunk-Retrieval-Timeout-Level"
SwarmChunkRetrievalTimeoutHeader = "Swarm-Chunk-Retrieval-Timeout"
SwarmLookAheadBufferSizeHeader = "Swarm-Lookahead-Buffer-Size"

ImmutableHeader = "Immutable"
GasPriceHeader = "Gas-Price"
Expand All @@ -99,18 +100,6 @@ const (
OriginHeader = "Origin"
)

// The size of buffer used for prefetching content with Langos.
// Warning: This value influences the number of chunk requests and chunker join goroutines
// per file request.
// Recommended value is 8 or 16 times the io.Copy default buffer value which is 32kB, depending
// on the file size. Use lookaheadBufferSize() to get the correct buffer size for the request.
const (
smallFileBufferSize = 8 * 32 * 1024
largeFileBufferSize = 16 * 32 * 1024

largeBufferFilesizeThreshold = 10 * 1000000 // ten megs
)

const (
multiPartFormData = "multipart/form-data"
contentTypeTar = "application/x-tar"
Expand Down Expand Up @@ -615,13 +604,6 @@ func (s *Service) gasConfigMiddleware(handlerName string) func(h http.Handler) h
}
}

func lookaheadBufferSize(size int64) int {
if size <= largeBufferFilesizeThreshold {
return smallFileBufferSize
}
return largeFileBufferSize
}

// corsHandler sets CORS headers to HTTP response if allowed origins are configured.
func (s *Service) corsHandler(h http.Handler) http.Handler {
allowedHeaders := []string{
Expand Down
51 changes: 44 additions & 7 deletions pkg/api/bzz.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,25 @@ import (
"github.com/gorilla/mux"
)

// The size of buffer used for prefetching content with Langos when not using erasure coding
// Warning: This value influences the number of chunk requests and chunker join goroutines
// per file request.
// Recommended value is 8 or 16 times the io.Copy default buffer value which is 32kB, depending
// on the file size. Use lookaheadBufferSize() to get the correct buffer size for the request.
const (
smallFileBufferSize = 8 * 32 * 1024
largeFileBufferSize = 16 * 32 * 1024

largeBufferFilesizeThreshold = 10 * 1000000 // ten megs
)

func lookaheadBufferSize(size int64) int {
if size <= largeBufferFilesizeThreshold {
return smallFileBufferSize
}
return largeFileBufferSize
}

func (s *Service) bzzUploadHandler(w http.ResponseWriter, r *http.Request) {
logger := tracing.NewLoggerWithTraceID(r.Context(), s.logger.WithName("post_bzz").Build())

Expand Down Expand Up @@ -272,8 +291,13 @@ func (s *Service) serveReference(logger log.Logger, address swarm.Address, pathV
loggerV1 := logger.V(1).Build()

headers := struct {
Cache *bool `map:"Swarm-Cache"`
Cache *bool `map:"Swarm-Cache"`
Strategy getter.Strategy `map:"Swarm-Redundancy-Strategy"`
FallbackMode bool `map:"Swarm-Redundancy-Fallback-Mode"`
ChunkRetrievalTimeout string `map:"Swarm-Chunk-Retrieval-Timeout"`
LookaheadBufferSize *string `map:"Swarm-Lookahead-Buffer-Size"`
}{}

if response := s.mapStructure(r.Header, &headers); response != nil {
response("invalid header params", logger, w)
return
Expand All @@ -282,10 +306,12 @@ func (s *Service) serveReference(logger log.Logger, address swarm.Address, pathV
if headers.Cache != nil {
cache = *headers.Cache
}

ls := loadsave.NewReadonly(s.storer.Download(cache))
feedDereferenced := false

ctx := r.Context()
ctx = getter.SetConfigInContext(ctx, headers.Strategy, headers.FallbackMode, headers.ChunkRetrievalTimeout, getter.DefaultStrategyTimeout.String())

FETCH:
// read manifest entry
Expand Down Expand Up @@ -366,7 +392,6 @@ FETCH:
jsonhttp.NotFound(w, "address not found or incorrect")
return
}

me, err := m.Lookup(ctx, pathVar)
if err != nil {
loggerV1.Debug("bzz download: invalid path", "address", address, "path", pathVar, "error", err)
Expand Down Expand Up @@ -459,8 +484,10 @@ func (s *Service) downloadHandler(logger log.Logger, w http.ResponseWriter, r *h
Cache *bool `map:"Swarm-Cache"`
Strategy getter.Strategy `map:"Swarm-Redundancy-Strategy"`
FallbackMode bool `map:"Swarm-Redundancy-Fallback-Mode"`
ChunkRetrievalTimeout time.Duration `map:"Swarm-Chunk-Retrieval-Timeout"`
ChunkRetrievalTimeout string `map:"Swarm-Chunk-Retrieval-Timeout"`
LookaheadBufferSize *string `map:"Swarm-Lookahead-Buffer-Size"`
}{}

if response := s.mapStructure(r.Header, &headers); response != nil {
response("invalid header params", logger, w)
return
Expand All @@ -471,9 +498,7 @@ func (s *Service) downloadHandler(logger log.Logger, w http.ResponseWriter, r *h
}

ctx := r.Context()
ctx = getter.SetStrategy(ctx, headers.Strategy)
ctx = getter.SetStrict(ctx, headers.FallbackMode)
ctx = getter.SetFetchTimeout(ctx, headers.ChunkRetrievalTimeout)
ctx = getter.SetConfigInContext(ctx, headers.Strategy, headers.FallbackMode, headers.ChunkRetrievalTimeout, getter.DefaultStrategyTimeout.String())
reader, l, err := joiner.New(ctx, s.storer.Download(cache), s.storer.Cache(), reference)
if err != nil {
if errors.Is(err, storage.ErrNotFound) || errors.Is(err, topology.ErrNotFound) {
Expand All @@ -497,7 +522,19 @@ func (s *Service) downloadHandler(logger log.Logger, w http.ResponseWriter, r *h
}
w.Header().Set(ContentLengthHeader, strconv.FormatInt(l, 10))
w.Header().Set("Access-Control-Expose-Headers", ContentDispositionHeader)
http.ServeContent(w, r, "", time.Now(), langos.NewBufferedLangos(reader, lookaheadBufferSize(l)))
bufSize := int64(lookaheadBufferSize(l))
if headers.LookaheadBufferSize != nil {
bufSize, err = strconv.ParseInt(*headers.LookaheadBufferSize, 10, 64)
if err != nil {
logger.Debug("parsing lookahead buffer size", "error", err)
bufSize = 0
}
}
if bufSize > 0 {
http.ServeContent(w, r, "", time.Now(), langos.NewBufferedLangos(reader, int(bufSize)))
return
}
http.ServeContent(w, r, "", time.Now(), reader)
}

// manifestMetadataLoad returns the value for a key stored in the metadata of
Expand Down
Loading

0 comments on commit e3d4d5a

Please sign in to comment.