Skip to content

Commit

Permalink
feat: erasure encoder (#4429)
Browse files Browse the repository at this point in the history
  • Loading branch information
nugaon committed Dec 18, 2023
1 parent d889c21 commit 8c4a75e
Show file tree
Hide file tree
Showing 30 changed files with 1,008 additions and 129 deletions.
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ require (
github.com/hashicorp/golang-lru/v2 v2.0.5
github.com/ipfs/go-cid v0.4.1
github.com/kardianos/service v1.2.0
github.com/klauspost/reedsolomon v1.11.8
github.com/libp2p/go-libp2p v0.30.0
github.com/multiformats/go-multiaddr v0.11.0
github.com/multiformats/go-multiaddr-dns v0.3.1
Expand All @@ -46,7 +47,7 @@ require (
golang.org/x/exp v0.0.0-20230905200255-921286631fa9
golang.org/x/net v0.17.0
golang.org/x/sync v0.3.0
golang.org/x/sys v0.13.0
golang.org/x/sys v0.14.0
golang.org/x/term v0.13.0
golang.org/x/time v0.3.0
gopkg.in/yaml.v2 v2.4.0
Expand Down Expand Up @@ -105,7 +106,7 @@ require (
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
github.com/klauspost/compress v1.16.7 // indirect
github.com/klauspost/cpuid/v2 v2.2.5 // indirect
github.com/klauspost/cpuid/v2 v2.2.6 // indirect
github.com/koron/go-ssdp v0.0.4 // indirect
github.com/leodido/go-urn v1.2.1 // indirect
github.com/libp2p/go-buffer-pool v0.1.0 // indirect
Expand Down
10 changes: 6 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -524,10 +524,12 @@ github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQs
github.com/klauspost/cpuid v0.0.0-20170728055534-ae7887de9fa5/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/klauspost/cpuid/v2 v2.0.4/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.0.6/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/qbZg=
github.com/klauspost/cpuid/v2 v2.2.5/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
github.com/klauspost/cpuid/v2 v2.2.6 h1:ndNyv040zDGIDh8thGkXYjnFtiN02M1PVVF+JE/48xc=
github.com/klauspost/cpuid/v2 v2.2.6/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
github.com/klauspost/crc32 v0.0.0-20161016154125-cb6bfca970f6/go.mod h1:+ZoRqAPRLkC4NPOvfYeR5KNOrY6TD+/sAC3HXPZgDYg=
github.com/klauspost/pgzip v1.0.2-0.20170402124221-0bf5dcad4ada/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs=
github.com/klauspost/reedsolomon v1.11.8 h1:s8RpUW5TK4hjr+djiOpbZJB4ksx+TdYbRH7vHQpwPOY=
github.com/klauspost/reedsolomon v1.11.8/go.mod h1:4bXRN+cVzMdml6ti7qLouuYi32KHJ5MGv0Qd8a47h6A=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/koron/go-ssdp v0.0.4 h1:1IDwrghSKYM7yLf7XCzbByg2sJ/JcNOZRXS2jczTwz0=
Expand Down Expand Up @@ -1210,8 +1212,8 @@ golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q=
golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
Expand Down
6 changes: 6 additions & 0 deletions openapi/Swarm.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ paths:
$ref: "SwarmCommon.yaml#/components/parameters/SwarmEncryptParameter"
name: swarm-encrypt
required: false
- in: header
schema:
$ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyLevel"
name: swarm-redundancy-level
required: false

requestBody:
content:
Expand Down Expand Up @@ -254,6 +259,7 @@ paths:
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmErrorDocumentParameter"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmPostageBatchId"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmDeferredUpload"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyParameter"
requestBody:
content:
multipart/form-data:
Expand Down
11 changes: 11 additions & 0 deletions openapi/SwarmCommon.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -934,6 +934,17 @@ components:
description: >
Represents the encrypting state of the file
SwarmRedundancyParameter:
in: header
name: swarm-redundancy-level
schema:
type: integer
enum: [0, 1, 2, 3, 4]
required: false
description: >
Add redundancy to the data being uploaded so that downloaders can download it with better UX.
0 value is default and does not add any redundancy to the file.
ContentTypePreserved:
in: header
name: Content-Type
Expand Down
12 changes: 7 additions & 5 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/ethersphere/bee/pkg/feeds"
"github.com/ethersphere/bee/pkg/file/pipeline"
"github.com/ethersphere/bee/pkg/file/pipeline/builder"
"github.com/ethersphere/bee/pkg/file/redundancy"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/log"
"github.com/ethersphere/bee/pkg/p2p"
Expand Down Expand Up @@ -79,6 +80,7 @@ const (
SwarmCollectionHeader = "Swarm-Collection"
SwarmPostageBatchIdHeader = "Swarm-Postage-Batch-Id"
SwarmDeferredUploadHeader = "Swarm-Deferred-Upload"
SwarmRLevel = "Swarm-Redundancy-Level"

ImmutableHeader = "Immutable"
GasPriceHeader = "Gas-Price"
Expand Down Expand Up @@ -622,7 +624,7 @@ func (s *Service) corsHandler(h http.Handler) http.Handler {
allowedHeaders := []string{
"User-Agent", "Accept", "X-Requested-With", "Access-Control-Request-Headers", "Access-Control-Request-Method", "Accept-Ranges", "Content-Encoding",
AuthorizationHeader, AcceptEncodingHeader, ContentTypeHeader, ContentDispositionHeader, RangeHeader, OriginHeader,
SwarmTagHeader, SwarmPinHeader, SwarmEncryptHeader, SwarmIndexDocumentHeader, SwarmErrorDocumentHeader, SwarmCollectionHeader, SwarmPostageBatchIdHeader, SwarmDeferredUploadHeader,
SwarmTagHeader, SwarmPinHeader, SwarmEncryptHeader, SwarmIndexDocumentHeader, SwarmErrorDocumentHeader, SwarmCollectionHeader, SwarmPostageBatchIdHeader, SwarmDeferredUploadHeader, SwarmRLevel,
GasPriceHeader, GasLimitHeader, ImmutableHeader,
}
allowedHeadersStr := strings.Join(allowedHeaders, ", ")
Expand Down Expand Up @@ -848,16 +850,16 @@ func (s *Service) newStamperPutter(ctx context.Context, opts putterOptions) (sto

type pipelineFunc func(context.Context, io.Reader) (swarm.Address, error)

func requestPipelineFn(s storage.Putter, encrypt bool) pipelineFunc {
func requestPipelineFn(s storage.Putter, encrypt bool, rLevel redundancy.Level) pipelineFunc {
return func(ctx context.Context, r io.Reader) (swarm.Address, error) {
pipe := builder.NewPipelineBuilder(ctx, s, encrypt)
pipe := builder.NewPipelineBuilder(ctx, s, encrypt, rLevel)
return builder.FeedPipeline(ctx, pipe, r)
}
}

func requestPipelineFactory(ctx context.Context, s storage.Putter, encrypt bool) func() pipeline.Interface {
func requestPipelineFactory(ctx context.Context, s storage.Putter, encrypt bool, rLevel redundancy.Level) func() pipeline.Interface {
return func() pipeline.Interface {
return builder.NewPipelineBuilder(ctx, s, encrypt)
return builder.NewPipelineBuilder(ctx, s, encrypt, rLevel)
}
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/ethersphere/bee/pkg/feeds"
"github.com/ethersphere/bee/pkg/file/pipeline"
"github.com/ethersphere/bee/pkg/file/pipeline/builder"
"github.com/ethersphere/bee/pkg/file/redundancy"
"github.com/ethersphere/bee/pkg/jsonhttp/jsonhttptest"
"github.com/ethersphere/bee/pkg/log"
p2pmock "github.com/ethersphere/bee/pkg/p2p/mock"
Expand Down Expand Up @@ -321,9 +322,9 @@ func request(t *testing.T, client *http.Client, method, resource string, body io
return resp
}

func pipelineFactory(s storage.Putter, encrypt bool) func() pipeline.Interface {
func pipelineFactory(s storage.Putter, encrypt bool, rLevel redundancy.Level) func() pipeline.Interface {
return func() pipeline.Interface {
return builder.NewPipelineBuilder(context.Background(), s, encrypt)
return builder.NewPipelineBuilder(context.Background(), s, encrypt, rLevel)
}
}

Expand Down
14 changes: 8 additions & 6 deletions pkg/api/bytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"strconv"

"github.com/ethersphere/bee/pkg/cac"
"github.com/ethersphere/bee/pkg/file/redundancy"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/postage"
storage "github.com/ethersphere/bee/pkg/storage"
Expand All @@ -29,11 +30,12 @@ func (s *Service) bytesUploadHandler(w http.ResponseWriter, r *http.Request) {
logger := tracing.NewLoggerWithTraceID(r.Context(), s.logger.WithName("post_bytes").Build())

headers := struct {
BatchID []byte `map:"Swarm-Postage-Batch-Id" validate:"required"`
SwarmTag uint64 `map:"Swarm-Tag"`
Pin bool `map:"Swarm-Pin"`
Deferred *bool `map:"Swarm-Deferred-Upload"`
Encrypt bool `map:"Swarm-Encrypt"`
BatchID []byte `map:"Swarm-Postage-Batch-Id" validate:"required"`
SwarmTag uint64 `map:"Swarm-Tag"`
Pin bool `map:"Swarm-Pin"`
Deferred *bool `map:"Swarm-Deferred-Upload"`
Encrypt bool `map:"Swarm-Encrypt"`
RLevel redundancy.Level `map:"Swarm-Redundancy-Level"`
}{}
if response := s.mapStructure(r.Header, &headers); response != nil {
response("invalid header params", logger, w)
Expand Down Expand Up @@ -91,7 +93,7 @@ func (s *Service) bytesUploadHandler(w http.ResponseWriter, r *http.Request) {
logger: logger,
}

p := requestPipelineFn(putter, headers.Encrypt)
p := requestPipelineFn(putter, headers.Encrypt, headers.RLevel)
address, err := p(r.Context(), r.Body)
if err != nil {
logger.Debug("split write all failed", "error", err)
Expand Down
23 changes: 13 additions & 10 deletions pkg/api/bzz.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/ethersphere/bee/pkg/feeds"
"github.com/ethersphere/bee/pkg/file/joiner"
"github.com/ethersphere/bee/pkg/file/loadsave"
"github.com/ethersphere/bee/pkg/file/redundancy"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/log"
"github.com/ethersphere/bee/pkg/manifest"
Expand All @@ -37,13 +38,14 @@ func (s *Service) bzzUploadHandler(w http.ResponseWriter, r *http.Request) {
logger := tracing.NewLoggerWithTraceID(r.Context(), s.logger.WithName("post_bzz").Build())

headers := struct {
ContentType string `map:"Content-Type,mimeMediaType" validate:"required"`
BatchID []byte `map:"Swarm-Postage-Batch-Id" validate:"required"`
SwarmTag uint64 `map:"Swarm-Tag"`
Pin bool `map:"Swarm-Pin"`
Deferred *bool `map:"Swarm-Deferred-Upload"`
Encrypt bool `map:"Swarm-Encrypt"`
IsDir bool `map:"Swarm-Collection"`
ContentType string `map:"Content-Type,mimeMediaType" validate:"required"`
BatchID []byte `map:"Swarm-Postage-Batch-Id" validate:"required"`
SwarmTag uint64 `map:"Swarm-Tag"`
Pin bool `map:"Swarm-Pin"`
Deferred *bool `map:"Swarm-Deferred-Upload"`
Encrypt bool `map:"Swarm-Encrypt"`
IsDir bool `map:"Swarm-Collection"`
RLevel redundancy.Level `map:"Swarm-Redundancy-Level"`
}{}
if response := s.mapStructure(r.Header, &headers); response != nil {
response("invalid header params", logger, w)
Expand Down Expand Up @@ -105,7 +107,7 @@ func (s *Service) bzzUploadHandler(w http.ResponseWriter, r *http.Request) {
s.dirUploadHandler(logger, ow, r, putter, r.Header.Get(ContentTypeHeader), headers.Encrypt, tag)
return
}
s.fileUploadHandler(logger, ow, r, putter, headers.Encrypt, tag)
s.fileUploadHandler(logger, ow, r, putter, headers.Encrypt, tag, headers.RLevel)
}

// fileUploadResponse is returned when an HTTP request to upload a file is successful
Expand All @@ -122,6 +124,7 @@ func (s *Service) fileUploadHandler(
putter storer.PutterSession,
encrypt bool,
tagID uint64,
rLevel redundancy.Level,
) {
queries := struct {
FileName string `map:"name" validate:"startsnotwith=/"`
Expand All @@ -131,7 +134,7 @@ func (s *Service) fileUploadHandler(
return
}

p := requestPipelineFn(putter, encrypt)
p := requestPipelineFn(putter, encrypt, rLevel)
ctx := r.Context()

// first store the file and get its reference
Expand Down Expand Up @@ -171,7 +174,7 @@ func (s *Service) fileUploadHandler(
}
}

factory := requestPipelineFactory(ctx, putter, encrypt)
factory := requestPipelineFactory(ctx, putter, encrypt, rLevel)
l := loadsave.New(s.storer.ChunkStore(), factory)

m, err := manifest.NewDefaultManifest(l, encrypt)
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/bzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ func TestFeedIndirection(t *testing.T) {
t.Fatal(err)
}
m, err := manifest.NewDefaultManifest(
loadsave.New(storer.ChunkStore(), pipelineFactory(storer.Cache(), false)),
loadsave.New(storer.ChunkStore(), pipelineFactory(storer.Cache(), false, 0)),
false,
)
if err != nil {
Expand Down
13 changes: 11 additions & 2 deletions pkg/api/dirs.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"strings"

"github.com/ethersphere/bee/pkg/file/loadsave"
"github.com/ethersphere/bee/pkg/file/redundancy"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/log"
"github.com/ethersphere/bee/pkg/manifest"
Expand Down Expand Up @@ -63,6 +64,12 @@ func (s *Service) dirUploadHandler(
}
defer r.Body.Close()

rsParity, err := strconv.ParseUint(r.Header.Get(SwarmRLevel), 10, 1)
if err != nil {
logger.Debug("store dir failed", "rsParity parsing error")
logger.Error(nil, "store dir failed")
}

reference, err := storeDir(
r.Context(),
encrypt,
Expand All @@ -72,6 +79,7 @@ func (s *Service) dirUploadHandler(
s.storer.ChunkStore(),
r.Header.Get(SwarmIndexDocumentHeader),
r.Header.Get(SwarmErrorDocumentHeader),
redundancy.Level(rsParity),
)
if err != nil {
logger.Debug("store dir failed", "error", err)
Expand Down Expand Up @@ -117,13 +125,14 @@ func storeDir(
getter storage.Getter,
indexFilename,
errorFilename string,
rLevel redundancy.Level,
) (swarm.Address, error) {

logger := tracing.NewLoggerWithTraceID(ctx, log)
loggerV1 := logger.V(1).Build()

p := requestPipelineFn(putter, encrypt)
ls := loadsave.New(getter, requestPipelineFactory(ctx, putter, encrypt))
p := requestPipelineFn(putter, encrypt, rLevel)
ls := loadsave.New(getter, requestPipelineFactory(ctx, putter, encrypt, rLevel))

dirManifest, err := manifest.NewDefaultManifest(ls, encrypt)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func (s *Service) feedPostHandler(w http.ResponseWriter, r *http.Request) {
logger: logger,
}

l := loadsave.New(s.storer.ChunkStore(), requestPipelineFactory(r.Context(), putter, false))
l := loadsave.New(s.storer.ChunkStore(), requestPipelineFactory(r.Context(), putter, false, 0))
feedManifest, err := manifest.NewDefaultManifest(l, false)
if err != nil {
logger.Debug("create manifest failed", "error", err)
Expand Down
7 changes: 7 additions & 0 deletions pkg/encryption/encryption.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,3 +185,10 @@ func GenerateRandomKey(l int) Key {
}
return key
}

func min(a, b int) int {
if a < b {
return a
}
return b
}
18 changes: 10 additions & 8 deletions pkg/encryption/store/decrypt_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"encoding/binary"

"github.com/ethersphere/bee/pkg/encryption"
"github.com/ethersphere/bee/pkg/file"
"github.com/ethersphere/bee/pkg/file/redundancy"
storage "github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"golang.org/x/crypto/sha3"
Expand Down Expand Up @@ -37,7 +39,7 @@ func (s *decryptingStore) Get(ctx context.Context, addr swarm.Address) (ch swarm
return nil, err
}

d, err := decryptChunkData(ch.Data(), ref[swarm.HashSize:])
d, err := DecryptChunkData(ch.Data(), ref[swarm.HashSize:])
if err != nil {
return nil, err
}
Expand All @@ -48,19 +50,19 @@ func (s *decryptingStore) Get(ctx context.Context, addr swarm.Address) (ch swarm
}
}

func decryptChunkData(chunkData []byte, encryptionKey encryption.Key) ([]byte, error) {
func DecryptChunkData(chunkData []byte, encryptionKey encryption.Key) ([]byte, error) {
decryptedSpan, decryptedData, err := decrypt(chunkData, encryptionKey)
if err != nil {
return nil, err
}

// removing extra bytes which were just added for padding
length := binary.LittleEndian.Uint64(decryptedSpan)
refSize := int64(swarm.HashSize + encryption.KeyLength)
for length > swarm.ChunkSize {
length = length + (swarm.ChunkSize - 1)
length = length / swarm.ChunkSize
length *= uint64(refSize)
level, span := redundancy.DecodeSpan(decryptedSpan)
length := binary.LittleEndian.Uint64(span)
if length > swarm.ChunkSize {
dataRefSize := uint64(swarm.HashSize + encryption.KeyLength)
dataShards, parities := file.ReferenceCount(length, level, true)
length = dataRefSize*uint64(dataShards) + uint64(parities*swarm.HashSize)
}

c := make([]byte, length+8)
Expand Down
2 changes: 1 addition & 1 deletion pkg/file/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func testSplitThenJoin(t *testing.T) {
paramstring = strings.Split(t.Name(), "/")
dataIdx, _ = strconv.ParseInt(paramstring[1], 10, 0)
store = inmemchunkstore.New()
p = builder.NewPipelineBuilder(context.Background(), store, false)
p = builder.NewPipelineBuilder(context.Background(), store, false, 0)
data, _ = test.GetVector(t, int(dataIdx))
)

Expand Down
Loading

0 comments on commit 8c4a75e

Please sign in to comment.