Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: redundancy integration test #383

Merged
merged 18 commits into from
Feb 27, 2024
8 changes: 8 additions & 0 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,14 @@ checks:
concurrency:
max-attempts:
type: datadurability
redundancy:
options:
postage-amount:
postage-depth:
seed:
data-size:
type: redundancy


# simulations defines simulations Beekeeper can execute against the cluster
# type filed allows defining same simulation with different names and options
Expand Down
7 changes: 7 additions & 0 deletions config/local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -395,3 +395,10 @@ checks:
concurrency:
max-attempts:
type: datadurability
ci-redundancy:
options:
postage-amount:
postage-depth:
seed:
data-size:
type: redundancy
9 changes: 5 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ replace github.com/codahale/hdrhistogram => github.com/HdrHistogram/hdrhistogram

require (
github.com/ethereum/go-ethereum v1.13.4
github.com/ethersphere/bee v1.17.6
github.com/ethersphere/bee v1.18.3-0.20240221015611-5de732c26957
github.com/ethersphere/bmt v0.1.4
github.com/ethersphere/ethproxy v0.0.5
github.com/ethersphere/node-funder v0.1.1
Expand Down Expand Up @@ -37,7 +37,7 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/bits-and-blooms/bitset v1.7.0 // indirect
github.com/btcsuite/btcd v0.22.3 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.3.2 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cloudflare/circl v1.3.3 // indirect
github.com/consensys/bavard v0.1.13 // indirect
Expand All @@ -49,7 +49,7 @@ require (
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
github.com/emirpasic/gods v1.18.1 // indirect
github.com/ethereum/c-kzg-4844 v0.3.1 // indirect
github.com/ethersphere/go-sw3-abi v0.4.0 // indirect
github.com/ethersphere/go-sw3-abi v0.6.5 // indirect
github.com/evanphx/json-patch v4.11.0+incompatible // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/go-chi/chi v1.5.4 // indirect
Expand All @@ -73,7 +73,8 @@ require (
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/kevinburke/ssh_config v1.2.0 // indirect
github.com/klauspost/cpuid/v2 v2.2.5 // indirect
github.com/klauspost/cpuid/v2 v2.2.6 // indirect
github.com/klauspost/reedsolomon v1.11.8 // indirect
github.com/libp2p/go-buffer-pool v0.1.0 // indirect
github.com/libp2p/go-libp2p v0.30.0 // indirect
github.com/magiconair/properties v1.8.7 // indirect
Expand Down
18 changes: 10 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ github.com/bits-and-blooms/bitset v1.7.0 h1:YjAGVd3XmtK9ktAbX8Zg2g2PwLIMjGREZJHl
github.com/bits-and-blooms/bitset v1.7.0/go.mod h1:gIdJ4wp64HaoK2YrL1Q5/N7Y16edYb8uY+O0FJTyyDA=
github.com/btcsuite/btcd v0.22.3 h1:kYNaWFvOw6xvqP0vR20RP1Zq1DVMBxEO8QN5d1/EfNg=
github.com/btcsuite/btcd v0.22.3/go.mod h1:wqgTSL29+50LRkmOVknEdmt8ZojIzhuWvgu/iptuN7Y=
github.com/btcsuite/btcd/btcec/v2 v2.2.0 h1:fzn1qaOt32TuLjFlkzYSsBC35Q3KUjT1SwPxiMSCF5k=
github.com/btcsuite/btcd/btcec/v2 v2.2.0/go.mod h1:U7MHm051Al6XmscBQ0BoNydpOTsFAn707034b5nY8zU=
github.com/btcsuite/btcd/btcec/v2 v2.3.2 h1:5n0X6hX0Zk+6omWcihdYvdAlGf2DfasC0GMf7DClJ3U=
github.com/btcsuite/btcd/btcec/v2 v2.3.2/go.mod h1:zYzJ8etWJQIv1Ogk7OzpWjowwOdXY1W/17j2MW85J04=
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1 h1:q0rUy8C/TYNBQS1+CGKw68tLOFYSNEs0TFnxxnS9+4U=
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1/go.mod h1:7SFka0XMvUgj3hfZtydOrQY2mwhPclbT2snogU7SQQc=
github.com/bwesterb/go-ristretto v1.2.3/go.mod h1:fUIoIZaG73pV5biE2Blr2xEzDoMj7NFEuV9ekS419A0=
Expand Down Expand Up @@ -144,14 +144,14 @@ github.com/ethereum/c-kzg-4844 v0.3.1 h1:sR65+68+WdnMKxseNWxSJuAv2tsUrihTpVBTfM/
github.com/ethereum/c-kzg-4844 v0.3.1/go.mod h1:VewdlzQmpT5QSrVhbBuGoCdFJkpaJlO1aQputP83wc0=
github.com/ethereum/go-ethereum v1.13.4 h1:25HJnaWVg3q1O7Z62LaaI6S9wVq8QCw3K88g8wEzrcM=
github.com/ethereum/go-ethereum v1.13.4/go.mod h1:I0U5VewuuTzvBtVzKo7b3hJzDhXOUtn9mJW7SsIPB0Q=
github.com/ethersphere/bee v1.17.6 h1:LdlKWAMesuBqZ4nCuaS6aEKn6i7OJPb7QONEgWe/c1o=
github.com/ethersphere/bee v1.17.6/go.mod h1:dxpOp5CQsQvldQ6LQXeMaGGaFRXcLB8M1Yn/PI5I45M=
github.com/ethersphere/bee v1.18.3-0.20240221015611-5de732c26957 h1:VCgb+6EEHVhWU4v6zHonYaUf0HV61FYYZ6eZN9Ymm2g=
github.com/ethersphere/bee v1.18.3-0.20240221015611-5de732c26957/go.mod h1:PIgkGa0N++OQRPp9j/f1Jr8k0M6Xzv4tgcBiwKXtk9s=
github.com/ethersphere/bmt v0.1.4 h1:+rkWYNtMgDx6bkNqGdWu+U9DgGI1rRZplpSW3YhBr1Q=
github.com/ethersphere/bmt v0.1.4/go.mod h1:Yd8ft1U69WDuHevZc/rwPxUv1rzPSMpMnS6xbU53aY8=
github.com/ethersphere/ethproxy v0.0.5 h1:j5Mkm45jqmkET6NwGaJtaxOSFbhoAfOKzHiwHl6DBT0=
github.com/ethersphere/ethproxy v0.0.5/go.mod h1:7mkVRK3+Mte00jLxFAbUQ/cBepAzwTYpkE64ItCLZYw=
github.com/ethersphere/go-sw3-abi v0.4.0 h1:T3ANY+ktWrPAwe2U0tZi+DILpkHzto5ym/XwV/Bbz8g=
github.com/ethersphere/go-sw3-abi v0.4.0/go.mod h1:BmpsvJ8idQZdYEtWnvxA8POYQ8Rl/NhyCdF0zLMOOJU=
github.com/ethersphere/go-sw3-abi v0.6.5 h1:M5dcIe1zQYvGpY2K07UNkNU9Obc4U+A1fz68Ho/Q+XE=
github.com/ethersphere/go-sw3-abi v0.6.5/go.mod h1:BmpsvJ8idQZdYEtWnvxA8POYQ8Rl/NhyCdF0zLMOOJU=
github.com/ethersphere/node-funder v0.1.1 h1:Y1gzbnmZV2jCecdZuUvAsFeXTWpEFofrIT4UIEuswW0=
github.com/ethersphere/node-funder v0.1.1/go.mod h1:pHYJyxpyhuAoI5c4ReaLn1CnuCG5RmcLeU7pGDzlfrI=
github.com/evanphx/json-patch v4.11.0+incompatible h1:glyUF9yIYtMHzn8xaKw5rMhdWcwsYV8dZHIq5567/xs=
Expand Down Expand Up @@ -331,8 +331,10 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I=
github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/qbZg=
github.com/klauspost/cpuid/v2 v2.2.5/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
github.com/klauspost/cpuid/v2 v2.2.6 h1:ndNyv040zDGIDh8thGkXYjnFtiN02M1PVVF+JE/48xc=
github.com/klauspost/cpuid/v2 v2.2.6/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
github.com/klauspost/reedsolomon v1.11.8 h1:s8RpUW5TK4hjr+djiOpbZJB4ksx+TdYbRH7vHQpwPOY=
github.com/klauspost/reedsolomon v1.11.8/go.mod h1:4bXRN+cVzMdml6ti7qLouuYi32KHJ5MGv0Qd8a47h6A=
github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
Expand Down
182 changes: 182 additions & 0 deletions pkg/check/redundancy/redundancy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
package redundancy

import (
"bytes"
"context"
"fmt"
"io"
"time"

"github.com/ethersphere/bee/pkg/file/pipeline/builder"
"github.com/ethersphere/bee/pkg/file/redundancy"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/beekeeper/pkg/bee"
"github.com/ethersphere/beekeeper/pkg/bee/api"
"github.com/ethersphere/beekeeper/pkg/beekeeper"
"github.com/ethersphere/beekeeper/pkg/logging"
"github.com/ethersphere/beekeeper/pkg/orchestration"
"github.com/ethersphere/beekeeper/pkg/random"
)

type Options struct {
PostageAmount int64
PostageDepth uint64
Seed int64
DataSize int64
}

func NewDefaultOptions() Options {
return Options{
PostageAmount: 1500000,
PostageDepth: 22,
Seed: time.Now().UnixNano(),
DataSize: 307200,
}
}

var _ beekeeper.Action = (*Check)(nil)

type pipelineFn func(ctx context.Context, r io.Reader) (swarm.Address, error)

func requestPipelineFn(s storage.Putter, encrypt bool, rLevel redundancy.Level) pipelineFn {
return func(ctx context.Context, r io.Reader) (swarm.Address, error) {
pipe := builder.NewPipelineBuilder(ctx, s, encrypt, rLevel)
return builder.FeedPipeline(ctx, pipe, r)
}
}

type Check struct {
logger logging.Logger
}

func NewCheck(logger logging.Logger) beekeeper.Action {
return &Check{
logger: logger,
}
}

func (c *Check) Run(ctx context.Context, cluster orchestration.Cluster, o interface{}) (err error) {
opts, ok := o.(Options)
if !ok {
return fmt.Errorf("invalid options type")
}

time.Sleep(10 * time.Second)

for i := 1; i < 5; i++ { // skip level 0
c.logger.Infof("started rLevel %d", i)
uploadClient, downloadClient, err := getClients(ctx, cluster, opts.Seed)
if err != nil {
return fmt.Errorf("get clients: %w", err)
}

root, data, chunks, err := c.generateChunks(ctx, opts.DataSize, redundancy.Level(i), opts.Seed)
if err != nil {
return fmt.Errorf("get chunks: %w", err)
}
c.logger.Infof("root hash: %s, chunks: %d", root.String(), len(chunks))

batchID, err := uploadClient.GetOrCreateBatch(ctx, opts.PostageAmount, opts.PostageDepth, "ci-redundancy")
if err != nil {
return fmt.Errorf("get or create batch: %w", err)
}

err = c.uploadChunks(ctx, uploadClient, chunks, redundancy.Level(i), opts.Seed, batchID)
if err != nil {
return fmt.Errorf("upload chunks: %w", err)
}
c.logger.Infof("upload completed. Downloading %s", root.String())
d, err := downloadClient.DownloadBytes(ctx, root)
if err != nil {
return fmt.Errorf("download bytes: %w", err)
}

if !bytes.Equal(data, d) {
return fmt.Errorf("download and initial content dont match")
}

c.logger.Infof("rLevel %d completed successfully", i)
}
return nil
}

func (c *Check) generateChunks(ctx context.Context, size int64, rLevel redundancy.Level, seed int64) (swarm.Address, []byte, []swarm.Chunk, error) {
putter := &splitPutter{
chunks: make([]swarm.Chunk, 0),
}

buf := make([]byte, size)
rnd := random.PseudoGenerator(seed)
_, err := rnd.Read(buf)
if err != nil {
return swarm.ZeroAddress, nil, nil, err
}

p := requestPipelineFn(putter, false, rLevel)
rootAddr, err := p(ctx, bytes.NewReader(buf))
if err != nil {
return swarm.ZeroAddress, nil, nil, err
}

return rootAddr, buf, putter.chunks, nil
}

func (c *Check) uploadChunks(ctx context.Context, client *bee.Client, chunks []swarm.Chunk, rLevel redundancy.Level, seed int64, batchID string) error {
rate := 0.0
switch rLevel {
case redundancy.MEDIUM:
rate = 0.01
case redundancy.STRONG:
rate = 0.05
case redundancy.INSANE:
rate = 0.1
case redundancy.PARANOID:
rate = 0.35
}

rnd := random.PseudoGenerator(seed)
indices := rnd.Perm(len(chunks) - 1)
offset := int(rate * float64(len(chunks)))
indices = append(indices[offset:], len(chunks)-1)

c.logger.Infof("uploading %d chunks out of %d", len(indices), len(chunks))
for i, j := range indices {
_, err := client.UploadChunk(ctx, chunks[j].Data(), api.UploadOptions{
BatchID: batchID,
Direct: true,
})
if err != nil {
return fmt.Errorf("upload chunk %d of %d: %w", i+1, len(indices), err)
}
}
return nil
}

func getClients(ctx context.Context, cluster orchestration.Cluster, seed int64) (*bee.Client, *bee.Client, error) {
nodeNames := cluster.FullNodeNames()
clients, err := cluster.NodesClients(ctx)
if err != nil {
return nil, nil, err
}
rnd := random.PseudoGenerator(seed)
var cUpload, cDownload *bee.Client
for {
perm := rnd.Perm(len(nodeNames))
if perm[0] != perm[1] {
cUpload = clients[nodeNames[perm[0]]]
cDownload = clients[nodeNames[perm[1]]]
break
}
}
return cUpload, cDownload, nil
}

type splitPutter struct {
chunks []swarm.Chunk
}

func (s *splitPutter) Put(_ context.Context, chunk swarm.Chunk) error {
s.chunks = append(s.chunks, chunk)
return nil
}
22 changes: 22 additions & 0 deletions pkg/config/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/ethersphere/beekeeper/pkg/check/pss"
"github.com/ethersphere/beekeeper/pkg/check/pullsync"
"github.com/ethersphere/beekeeper/pkg/check/pushsync"
"github.com/ethersphere/beekeeper/pkg/check/redundancy"
"github.com/ethersphere/beekeeper/pkg/check/retrieval"
"github.com/ethersphere/beekeeper/pkg/check/settlements"
"github.com/ethersphere/beekeeper/pkg/check/smoke"
Expand Down Expand Up @@ -552,6 +553,27 @@ var Checks = map[string]CheckType{
return nil, fmt.Errorf("applying options: %w", err)
}

return opts, nil
},
},
"redundancy": {
NewAction: redundancy.NewCheck,
NewOptions: func(checkGlobalConfig CheckGlobalConfig, check Check) (interface{}, error) {
checkOpts := new(struct {
PostageAmount *int `yaml:"postage-amount"`
PostageDepth *int `yaml:"postage-depth"`
Seed *int `yaml:"seed"`
DataSize *int `yaml:"data-size"`
})
if err := check.Options.Decode(checkOpts); err != nil {
return nil, fmt.Errorf("decoding check %s options: %w", check.Type, err)
}
opts := redundancy.NewDefaultOptions()

if err := applyCheckConfig(checkGlobalConfig, checkOpts, &opts); err != nil {
return nil, fmt.Errorf("applying options: %w", err)
}

return opts, nil
},
},
Expand Down
Loading