Skip to content

Commit

Permalink
feat: non-local redundancy (#4491)
Browse files Browse the repository at this point in the history
Co-authored-by: Viktor Trón <[email protected]>
Co-authored-by: Anatol <[email protected]>
Co-authored-by: dysordys <[email protected]>
Co-authored-by: Gyorgy Barabas <[email protected]>
  • Loading branch information
5 people authored Feb 8, 2024
1 parent d302989 commit 0ece898
Show file tree
Hide file tree
Showing 57 changed files with 4,437 additions and 321 deletions.
2 changes: 1 addition & 1 deletion cmd/bee/cmd/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type pipelineFunc func(context.Context, io.Reader) (swarm.Address, error)

func requestPipelineFn(s storage.Putter, encrypt bool) pipelineFunc {
return func(ctx context.Context, r io.Reader) (swarm.Address, error) {
pipe := builder.NewPipelineBuilder(ctx, s, encrypt)
pipe := builder.NewPipelineBuilder(ctx, s, encrypt, 0)
return builder.FeedPipeline(ctx, pipe, r)
}
}
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ require (
github.com/hashicorp/golang-lru/v2 v2.0.5
github.com/ipfs/go-cid v0.4.1
github.com/kardianos/service v1.2.0
github.com/klauspost/reedsolomon v1.11.8
github.com/libp2p/go-libp2p v0.30.0
github.com/multiformats/go-multiaddr v0.11.0
github.com/multiformats/go-multiaddr-dns v0.3.1
Expand Down Expand Up @@ -105,7 +106,7 @@ require (
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
github.com/klauspost/compress v1.16.7 // indirect
github.com/klauspost/cpuid/v2 v2.2.5 // indirect
github.com/klauspost/cpuid/v2 v2.2.6 // indirect
github.com/koron/go-ssdp v0.0.4 // indirect
github.com/leodido/go-urn v1.2.1 // indirect
github.com/libp2p/go-buffer-pool v0.1.0 // indirect
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -522,10 +522,12 @@ github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQs
github.com/klauspost/cpuid v0.0.0-20170728055534-ae7887de9fa5/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/klauspost/cpuid/v2 v2.0.4/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.0.6/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/qbZg=
github.com/klauspost/cpuid/v2 v2.2.5/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
github.com/klauspost/cpuid/v2 v2.2.6 h1:ndNyv040zDGIDh8thGkXYjnFtiN02M1PVVF+JE/48xc=
github.com/klauspost/cpuid/v2 v2.2.6/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
github.com/klauspost/crc32 v0.0.0-20161016154125-cb6bfca970f6/go.mod h1:+ZoRqAPRLkC4NPOvfYeR5KNOrY6TD+/sAC3HXPZgDYg=
github.com/klauspost/pgzip v1.0.2-0.20170402124221-0bf5dcad4ada/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs=
github.com/klauspost/reedsolomon v1.11.8 h1:s8RpUW5TK4hjr+djiOpbZJB4ksx+TdYbRH7vHQpwPOY=
github.com/klauspost/reedsolomon v1.11.8/go.mod h1:4bXRN+cVzMdml6ti7qLouuYi32KHJ5MGv0Qd8a47h6A=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/koron/go-ssdp v0.0.4 h1:1IDwrghSKYM7yLf7XCzbByg2sJ/JcNOZRXS2jczTwz0=
Expand Down
29 changes: 18 additions & 11 deletions openapi/Swarm.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
openapi: 3.0.3

info:
version: 5.1.1
version: 5.2.0
title: Bee API
description: "A list of the currently provided Interfaces to interact with the swarm, implementing file operations and sending messages"

Expand Down Expand Up @@ -120,6 +120,11 @@ paths:
$ref: "SwarmCommon.yaml#/components/parameters/SwarmEncryptParameter"
name: swarm-encrypt
required: false
- in: header
schema:
$ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyLevelParameter"
name: swarm-redundancy-level
required: false

requestBody:
content:
Expand Down Expand Up @@ -158,11 +163,10 @@ paths:
$ref: "SwarmCommon.yaml#/components/schemas/SwarmReference"
required: true
description: Swarm address reference to content
- in: header
schema:
$ref: "SwarmCommon.yaml#/components/parameters/SwarmCache"
name: swarm-cache
required: false
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmCache"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyStrategyParameter"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyFallbackModeParameter"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmChunkRetrievalTimeoutParameter"
responses:
"200":
description: Retrieved content specified by reference
Expand Down Expand Up @@ -254,6 +258,7 @@ paths:
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmErrorDocumentParameter"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmPostageBatchId"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmDeferredUpload"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyLevelParameter"
requestBody:
content:
multipart/form-data:
Expand Down Expand Up @@ -305,11 +310,10 @@ paths:
$ref: "SwarmCommon.yaml#/components/schemas/SwarmReference"
required: true
description: Swarm address of content
- in: header
schema:
$ref: "SwarmCommon.yaml#/components/parameters/SwarmCache"
name: swarm-cache
required: false
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmCache"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyStrategyParameter"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyFallbackModeParameter"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmChunkRetrievalTimeoutParameter"
responses:
"200":
description: Ok
Expand Down Expand Up @@ -347,6 +351,9 @@ paths:
type: string
required: true
description: Path to the file in the collection.
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyStrategyParameter"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyFallbackModeParameter"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmChunkRetrievalTimeoutParameter"
responses:
"200":
description: Ok
Expand Down
46 changes: 46 additions & 0 deletions openapi/SwarmCommon.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -934,6 +934,52 @@ components:
description: >
Represents the encrypting state of the file
SwarmRedundancyLevelParameter:
in: header
name: swarm-redundancy-level
schema:
type: integer
enum: [0, 1, 2, 3, 4]
required: false
description: >
Add redundancy to the data being uploaded so that downloaders can download it with better UX.
0 value is default and does not add any redundancy to the file.
SwarmRedundancyStrategyParameter:
in: header
name: swarm-redundancy-strategy
schema:
type: integer
enum: [0, 1, 2, 3]
required: false
description: >
Specify the retrieve strategy on redundant data.
The numbers stand for NONE, DATA, PROX and RACE, respectively.
Strategy NONE means no prefetching takes place.
Strategy DATA means only data chunks are prefetched.
Strategy PROX means only chunks that are close to the node are prefetched.
Strategy RACE means all chunks are prefetched: n data chunks and k parity chunks. The first n chunks to arrive are used to reconstruct the file.
Multiple strategies can be used in a fallback cascade if the swarm redundancy fallback mode is set to true.
The default strategy is NONE, DATA, falling back to PROX, falling back to RACE
SwarmRedundancyFallbackModeParameter:
in: header
name: swarm-redundancy-fallback-mode
schema:
type: boolean
required: false
description: >
Specify if the retrieve strategies (chunk prefetching on redundant data) are used in a fallback cascade. The default is true.
SwarmChunkRetrievalTimeoutParameter:
in: header
name: swarm-chunk-retrieval-timeout
schema:
$ref: "#/components/schemas/Duration"
required: false
description: >
Specify the timeout for chunk retrieval. The default is 30 seconds.
ContentTypePreserved:
in: header
name: Content-Type
Expand Down
56 changes: 21 additions & 35 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/ethersphere/bee/pkg/feeds"
"github.com/ethersphere/bee/pkg/file/pipeline"
"github.com/ethersphere/bee/pkg/file/pipeline/builder"
"github.com/ethersphere/bee/pkg/file/redundancy"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/log"
"github.com/ethersphere/bee/pkg/p2p"
Expand Down Expand Up @@ -69,16 +70,21 @@ import (
const loggerName = "api"

const (
SwarmPinHeader = "Swarm-Pin"
SwarmTagHeader = "Swarm-Tag"
SwarmEncryptHeader = "Swarm-Encrypt"
SwarmIndexDocumentHeader = "Swarm-Index-Document"
SwarmErrorDocumentHeader = "Swarm-Error-Document"
SwarmFeedIndexHeader = "Swarm-Feed-Index"
SwarmFeedIndexNextHeader = "Swarm-Feed-Index-Next"
SwarmCollectionHeader = "Swarm-Collection"
SwarmPostageBatchIdHeader = "Swarm-Postage-Batch-Id"
SwarmDeferredUploadHeader = "Swarm-Deferred-Upload"
SwarmPinHeader = "Swarm-Pin"
SwarmTagHeader = "Swarm-Tag"
SwarmEncryptHeader = "Swarm-Encrypt"
SwarmIndexDocumentHeader = "Swarm-Index-Document"
SwarmErrorDocumentHeader = "Swarm-Error-Document"
SwarmFeedIndexHeader = "Swarm-Feed-Index"
SwarmFeedIndexNextHeader = "Swarm-Feed-Index-Next"
SwarmCollectionHeader = "Swarm-Collection"
SwarmPostageBatchIdHeader = "Swarm-Postage-Batch-Id"
SwarmDeferredUploadHeader = "Swarm-Deferred-Upload"
SwarmRedundancyLevelHeader = "Swarm-Redundancy-Level"
SwarmRedundancyStrategyHeader = "Swarm-Redundancy-Strategy"
SwarmRedundancyFallbackModeHeader = "Swarm-Redundancy-Fallback-Mode"
SwarmChunkRetrievalTimeoutHeader = "Swarm-Chunk-Retrieval-Timeout"
SwarmLookAheadBufferSizeHeader = "Swarm-Lookahead-Buffer-Size"

ImmutableHeader = "Immutable"
GasPriceHeader = "Gas-Price"
Expand All @@ -94,18 +100,6 @@ const (
OriginHeader = "Origin"
)

// The size of buffer used for prefetching content with Langos.
// Warning: This value influences the number of chunk requests and chunker join goroutines
// per file request.
// Recommended value is 8 or 16 times the io.Copy default buffer value which is 32kB, depending
// on the file size. Use lookaheadBufferSize() to get the correct buffer size for the request.
const (
smallFileBufferSize = 8 * 32 * 1024
largeFileBufferSize = 16 * 32 * 1024

largeBufferFilesizeThreshold = 10 * 1000000 // ten megs
)

const (
multiPartFormData = "multipart/form-data"
contentTypeTar = "application/x-tar"
Expand Down Expand Up @@ -610,20 +604,12 @@ func (s *Service) gasConfigMiddleware(handlerName string) func(h http.Handler) h
}
}

func lookaheadBufferSize(size int64) int {
if size <= largeBufferFilesizeThreshold {
return smallFileBufferSize
}
return largeFileBufferSize
}

// corsHandler sets CORS headers to HTTP response if allowed origins are configured.
func (s *Service) corsHandler(h http.Handler) http.Handler {
allowedHeaders := []string{
"User-Agent", "Accept", "X-Requested-With", "Access-Control-Request-Headers", "Access-Control-Request-Method", "Accept-Ranges", "Content-Encoding",
AuthorizationHeader, AcceptEncodingHeader, ContentTypeHeader, ContentDispositionHeader, RangeHeader, OriginHeader,
SwarmTagHeader, SwarmPinHeader, SwarmEncryptHeader, SwarmIndexDocumentHeader, SwarmErrorDocumentHeader, SwarmCollectionHeader, SwarmPostageBatchIdHeader, SwarmDeferredUploadHeader,
GasPriceHeader, GasLimitHeader, ImmutableHeader,
SwarmTagHeader, SwarmPinHeader, SwarmEncryptHeader, SwarmIndexDocumentHeader, SwarmErrorDocumentHeader, SwarmCollectionHeader, SwarmPostageBatchIdHeader, SwarmDeferredUploadHeader, SwarmRedundancyLevelHeader, SwarmRedundancyStrategyHeader, SwarmRedundancyFallbackModeHeader, SwarmChunkRetrievalTimeoutHeader, SwarmFeedIndexHeader, SwarmFeedIndexNextHeader, GasPriceHeader, GasLimitHeader, ImmutableHeader,
}
allowedHeadersStr := strings.Join(allowedHeaders, ", ")

Expand Down Expand Up @@ -848,16 +834,16 @@ func (s *Service) newStamperPutter(ctx context.Context, opts putterOptions) (sto

type pipelineFunc func(context.Context, io.Reader) (swarm.Address, error)

func requestPipelineFn(s storage.Putter, encrypt bool) pipelineFunc {
func requestPipelineFn(s storage.Putter, encrypt bool, rLevel redundancy.Level) pipelineFunc {
return func(ctx context.Context, r io.Reader) (swarm.Address, error) {
pipe := builder.NewPipelineBuilder(ctx, s, encrypt)
pipe := builder.NewPipelineBuilder(ctx, s, encrypt, rLevel)
return builder.FeedPipeline(ctx, pipe, r)
}
}

func requestPipelineFactory(ctx context.Context, s storage.Putter, encrypt bool) func() pipeline.Interface {
func requestPipelineFactory(ctx context.Context, s storage.Putter, encrypt bool, rLevel redundancy.Level) func() pipeline.Interface {
return func() pipeline.Interface {
return builder.NewPipelineBuilder(ctx, s, encrypt)
return builder.NewPipelineBuilder(ctx, s, encrypt, rLevel)
}
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/ethersphere/bee/pkg/feeds"
"github.com/ethersphere/bee/pkg/file/pipeline"
"github.com/ethersphere/bee/pkg/file/pipeline/builder"
"github.com/ethersphere/bee/pkg/file/redundancy"
"github.com/ethersphere/bee/pkg/jsonhttp/jsonhttptest"
"github.com/ethersphere/bee/pkg/log"
p2pmock "github.com/ethersphere/bee/pkg/p2p/mock"
Expand Down Expand Up @@ -321,9 +322,9 @@ func request(t *testing.T, client *http.Client, method, resource string, body io
return resp
}

func pipelineFactory(s storage.Putter, encrypt bool) func() pipeline.Interface {
func pipelineFactory(s storage.Putter, encrypt bool, rLevel redundancy.Level) func() pipeline.Interface {
return func() pipeline.Interface {
return builder.NewPipelineBuilder(context.Background(), s, encrypt)
return builder.NewPipelineBuilder(context.Background(), s, encrypt, rLevel)
}
}

Expand Down
14 changes: 8 additions & 6 deletions pkg/api/bytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"strconv"

"github.com/ethersphere/bee/pkg/cac"
"github.com/ethersphere/bee/pkg/file/redundancy"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/postage"
storage "github.com/ethersphere/bee/pkg/storage"
Expand All @@ -33,11 +34,12 @@ func (s *Service) bytesUploadHandler(w http.ResponseWriter, r *http.Request) {
defer span.Finish()

headers := struct {
BatchID []byte `map:"Swarm-Postage-Batch-Id" validate:"required"`
SwarmTag uint64 `map:"Swarm-Tag"`
Pin bool `map:"Swarm-Pin"`
Deferred *bool `map:"Swarm-Deferred-Upload"`
Encrypt bool `map:"Swarm-Encrypt"`
BatchID []byte `map:"Swarm-Postage-Batch-Id" validate:"required"`
SwarmTag uint64 `map:"Swarm-Tag"`
Pin bool `map:"Swarm-Pin"`
Deferred *bool `map:"Swarm-Deferred-Upload"`
Encrypt bool `map:"Swarm-Encrypt"`
RLevel redundancy.Level `map:"Swarm-Redundancy-Level"`
}{}
if response := s.mapStructure(r.Header, &headers); response != nil {
response("invalid header params", logger, w)
Expand Down Expand Up @@ -98,7 +100,7 @@ func (s *Service) bytesUploadHandler(w http.ResponseWriter, r *http.Request) {
logger: logger,
}

p := requestPipelineFn(putter, headers.Encrypt)
p := requestPipelineFn(putter, headers.Encrypt, headers.RLevel)
address, err := p(ctx, r.Body)
if err != nil {
logger.Debug("split write all failed", "error", err)
Expand Down
Loading

0 comments on commit 0ece898

Please sign in to comment.