diff --git a/go.mod b/go.mod index 3e7bf2f4e3d..5f38b82e566 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/ethersphere/go-price-oracle-abi v0.1.0 github.com/ethersphere/go-storage-incentives-abi v0.6.0 github.com/ethersphere/go-sw3-abi v0.4.0 + github.com/ethersphere/langos v1.0.0 github.com/go-playground/validator/v10 v10.11.1 github.com/gogo/protobuf v1.3.2 github.com/google/go-cmp v0.5.9 diff --git a/go.sum b/go.sum index c08d1ef71bb..0afd81c3918 100644 --- a/go.sum +++ b/go.sum @@ -244,6 +244,8 @@ github.com/ethersphere/go-storage-incentives-abi v0.6.0 h1:lfGViU/wJg/CyXlntNvTQ github.com/ethersphere/go-storage-incentives-abi v0.6.0/go.mod h1:SXvJVtM4sEsaSKD0jc1ClpDLw8ErPoROZDme4Wrc/Nc= github.com/ethersphere/go-sw3-abi v0.4.0 h1:T3ANY+ktWrPAwe2U0tZi+DILpkHzto5ym/XwV/Bbz8g= github.com/ethersphere/go-sw3-abi v0.4.0/go.mod h1:BmpsvJ8idQZdYEtWnvxA8POYQ8Rl/NhyCdF0zLMOOJU= +github.com/ethersphere/langos v1.0.0 h1:NBtNKzXTTRSue95uOlzPN4py7Aofs0xWPzyj4AI1Vcc= +github.com/ethersphere/langos v1.0.0/go.mod h1:dlcN2j4O8sQ+BlCaxeBu43bgr4RQ+inJ+pHwLeZg5Tw= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fjl/memsize v0.0.0-20190710130421-bcb5799ab5e5 h1:FtmdgXiUlNeRsoNMFlKLDt+S+6hbjVMEW6RGQ7aUf7c= github.com/fjl/memsize v0.0.0-20190710130421-bcb5799ab5e5/go.mod h1:VvhXpOYNQvB+uIk2RvXzuaQtkQJzzIx6lSBe1xv7hi0= diff --git a/pkg/api/api.go b/pkg/api/api.go index 19b0e718e70..470174c68d4 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -84,6 +84,7 @@ const ( SwarmRedundancyStrategyHeader = "Swarm-Redundancy-Strategy" SwarmRedundancyFallbackModeHeader = "Swarm-Redundancy-Fallback-Mode" SwarmChunkRetrievalTimeoutHeader = "Swarm-Chunk-Retrieval-Timeout" + SwarmLookAheadBufferSizeHeader = "Swarm-Lookahead-Buffer-Size" ImmutableHeader = "Immutable" GasPriceHeader = "Gas-Price" diff --git a/pkg/api/bzz.go b/pkg/api/bzz.go index fc5b5ec56be..f94de54ae4b 100644 --- a/pkg/api/bzz.go +++ b/pkg/api/bzz.go @@ -31,9 +31,29 @@ import ( "github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/topology" "github.com/ethersphere/bee/pkg/tracing" + "github.com/ethersphere/langos" "github.com/gorilla/mux" ) +// The size of buffer used for prefetching content with Langos when not using erasure coding +// Warning: This value influences the number of chunk requests and chunker join goroutines +// per file request. +// Recommended value is 8 or 16 times the io.Copy default buffer value which is 32kB, depending +// on the file size. Use lookaheadBufferSize() to get the correct buffer size for the request. +const ( + smallFileBufferSize = 8 * 32 * 1024 + largeFileBufferSize = 16 * 32 * 1024 + + largeBufferFilesizeThreshold = 10 * 1000000 // ten megs +) + +func lookaheadBufferSize(size int64) int { + if size <= largeBufferFilesizeThreshold { + return smallFileBufferSize + } + return largeFileBufferSize +} + func (s *Service) bzzUploadHandler(w http.ResponseWriter, r *http.Request) { logger := tracing.NewLoggerWithTraceID(r.Context(), s.logger.WithName("post_bzz").Build()) @@ -275,6 +295,7 @@ func (s *Service) serveReference(logger log.Logger, address swarm.Address, pathV Strategy getter.Strategy `map:"Swarm-Redundancy-Strategy"` FallbackMode bool `map:"Swarm-Redundancy-Fallback-Mode"` ChunkRetrievalTimeout string `map:"Swarm-Chunk-Retrieval-Timeout"` + LookaheadBufferSize string `map:"Swarm-Lookahead-Buffer-Size"` }{} if response := s.mapStructure(r.Header, &headers); response != nil { @@ -464,6 +485,7 @@ func (s *Service) downloadHandler(logger log.Logger, w http.ResponseWriter, r *h Strategy getter.Strategy `map:"Swarm-Redundancy-Strategy"` FallbackMode bool `map:"Swarm-Redundancy-Fallback-Mode"` ChunkRetrievalTimeout string `map:"Swarm-Chunk-Retrieval-Timeout"` + LookaheadBufferSize *string `map:"Swarm-Lookahead-Buffer-Size"` }{} if response := s.mapStructure(r.Header, &headers); response != nil { @@ -500,9 +522,18 @@ func (s *Service) downloadHandler(logger log.Logger, w http.ResponseWriter, r *h } w.Header().Set(ContentLengthHeader, strconv.FormatInt(l, 10)) w.Header().Set("Access-Control-Expose-Headers", ContentDispositionHeader) + bufSize := int64(lookaheadBufferSize(l)) + if headers.LookaheadBufferSize != nil { + bufSize, err = strconv.ParseInt(*headers.LookaheadBufferSize, 10, 64) + if err != nil { + logger.Debug("parsing lookahead buffer size", "error", err) + bufSize = 0 + } + } + if bufSize > 0 { + http.ServeContent(w, r, "", time.Now(), langos.NewBufferedLangos(reader, int(bufSize))) + } http.ServeContent(w, r, "", time.Now(), reader) - // NOTE: temporary workaround for testing, watch this... - // http.ServeContent(w, r, "", time.Now(), langos.NewBufferedLangos(reader, lookaheadBufferSize(l))) } // manifestMetadataLoad returns the value for a key stored in the metadata of diff --git a/pkg/api/bzz_test.go b/pkg/api/bzz_test.go index edf90a78c48..634d4eb0166 100644 --- a/pkg/api/bzz_test.go +++ b/pkg/api/bzz_test.go @@ -126,6 +126,7 @@ func TestBzzUploadDownloadWithRedundancy(t *testing.T) { fileDownloadResource(refResponse.Reference.String()), http.StatusPartialContent, jsonhttptest.WithRequestHeader(api.RangeHeader, rangeHeader), + jsonhttptest.WithRequestHeader(api.SwarmLookAheadBufferSizeHeader, "0"), // set for the replicas so that no replica gets deleted jsonhttptest.WithRequestHeader(api.SwarmRedundancyLevelHeader, "0"), jsonhttptest.WithRequestHeader(api.SwarmRedundancyStrategyHeader, "0"),