diff --git a/config/config.yaml b/config/config.yaml index 8f73f3cbf..8a489dd69 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -356,6 +356,14 @@ checks: concurrency: max-attempts: type: datadurability + redundancy: + options: + postage-amount: + postage-depth: + seed: + data-size: + type: redundancy + # simulations defines simulations Beekeeper can execute against the cluster # type filed allows defining same simulation with different names and options diff --git a/config/local.yaml b/config/local.yaml index 12e7c3f07..12caa42a3 100644 --- a/config/local.yaml +++ b/config/local.yaml @@ -395,3 +395,10 @@ checks: concurrency: max-attempts: type: datadurability + ci-redundancy: + options: + postage-amount: + postage-depth: + seed: + data-size: + type: redundancy diff --git a/go.mod b/go.mod index 1c572507d..a8cb54890 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ replace github.com/codahale/hdrhistogram => github.com/HdrHistogram/hdrhistogram require ( github.com/ethereum/go-ethereum v1.13.4 - github.com/ethersphere/bee v1.17.6 + github.com/ethersphere/bee v1.18.3-0.20240221015611-5de732c26957 github.com/ethersphere/bmt v0.1.4 github.com/ethersphere/ethproxy v0.0.5 github.com/ethersphere/node-funder v0.1.1 @@ -37,7 +37,7 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/bits-and-blooms/bitset v1.7.0 // indirect github.com/btcsuite/btcd v0.22.3 // indirect - github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect + github.com/btcsuite/btcd/btcec/v2 v2.3.2 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/cloudflare/circl v1.3.3 // indirect github.com/consensys/bavard v0.1.13 // indirect @@ -49,7 +49,7 @@ require ( github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect github.com/emirpasic/gods v1.18.1 // indirect github.com/ethereum/c-kzg-4844 v0.3.1 // indirect - github.com/ethersphere/go-sw3-abi v0.4.0 // indirect + github.com/ethersphere/go-sw3-abi v0.6.5 // indirect github.com/evanphx/json-patch v4.11.0+incompatible // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/go-chi/chi v1.5.4 // indirect @@ -73,7 +73,8 @@ require ( github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/kevinburke/ssh_config v1.2.0 // indirect - github.com/klauspost/cpuid/v2 v2.2.5 // indirect + github.com/klauspost/cpuid/v2 v2.2.6 // indirect + github.com/klauspost/reedsolomon v1.11.8 // indirect github.com/libp2p/go-buffer-pool v0.1.0 // indirect github.com/libp2p/go-libp2p v0.30.0 // indirect github.com/magiconair/properties v1.8.7 // indirect diff --git a/go.sum b/go.sum index 7f53544ae..a4939b1a8 100644 --- a/go.sum +++ b/go.sum @@ -76,8 +76,8 @@ github.com/bits-and-blooms/bitset v1.7.0 h1:YjAGVd3XmtK9ktAbX8Zg2g2PwLIMjGREZJHl github.com/bits-and-blooms/bitset v1.7.0/go.mod h1:gIdJ4wp64HaoK2YrL1Q5/N7Y16edYb8uY+O0FJTyyDA= github.com/btcsuite/btcd v0.22.3 h1:kYNaWFvOw6xvqP0vR20RP1Zq1DVMBxEO8QN5d1/EfNg= github.com/btcsuite/btcd v0.22.3/go.mod h1:wqgTSL29+50LRkmOVknEdmt8ZojIzhuWvgu/iptuN7Y= -github.com/btcsuite/btcd/btcec/v2 v2.2.0 h1:fzn1qaOt32TuLjFlkzYSsBC35Q3KUjT1SwPxiMSCF5k= -github.com/btcsuite/btcd/btcec/v2 v2.2.0/go.mod h1:U7MHm051Al6XmscBQ0BoNydpOTsFAn707034b5nY8zU= +github.com/btcsuite/btcd/btcec/v2 v2.3.2 h1:5n0X6hX0Zk+6omWcihdYvdAlGf2DfasC0GMf7DClJ3U= +github.com/btcsuite/btcd/btcec/v2 v2.3.2/go.mod h1:zYzJ8etWJQIv1Ogk7OzpWjowwOdXY1W/17j2MW85J04= github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1 h1:q0rUy8C/TYNBQS1+CGKw68tLOFYSNEs0TFnxxnS9+4U= github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1/go.mod h1:7SFka0XMvUgj3hfZtydOrQY2mwhPclbT2snogU7SQQc= github.com/bwesterb/go-ristretto v1.2.3/go.mod h1:fUIoIZaG73pV5biE2Blr2xEzDoMj7NFEuV9ekS419A0= @@ -144,14 +144,14 @@ github.com/ethereum/c-kzg-4844 v0.3.1 h1:sR65+68+WdnMKxseNWxSJuAv2tsUrihTpVBTfM/ github.com/ethereum/c-kzg-4844 v0.3.1/go.mod h1:VewdlzQmpT5QSrVhbBuGoCdFJkpaJlO1aQputP83wc0= github.com/ethereum/go-ethereum v1.13.4 h1:25HJnaWVg3q1O7Z62LaaI6S9wVq8QCw3K88g8wEzrcM= github.com/ethereum/go-ethereum v1.13.4/go.mod h1:I0U5VewuuTzvBtVzKo7b3hJzDhXOUtn9mJW7SsIPB0Q= -github.com/ethersphere/bee v1.17.6 h1:LdlKWAMesuBqZ4nCuaS6aEKn6i7OJPb7QONEgWe/c1o= -github.com/ethersphere/bee v1.17.6/go.mod h1:dxpOp5CQsQvldQ6LQXeMaGGaFRXcLB8M1Yn/PI5I45M= +github.com/ethersphere/bee v1.18.3-0.20240221015611-5de732c26957 h1:VCgb+6EEHVhWU4v6zHonYaUf0HV61FYYZ6eZN9Ymm2g= +github.com/ethersphere/bee v1.18.3-0.20240221015611-5de732c26957/go.mod h1:PIgkGa0N++OQRPp9j/f1Jr8k0M6Xzv4tgcBiwKXtk9s= github.com/ethersphere/bmt v0.1.4 h1:+rkWYNtMgDx6bkNqGdWu+U9DgGI1rRZplpSW3YhBr1Q= github.com/ethersphere/bmt v0.1.4/go.mod h1:Yd8ft1U69WDuHevZc/rwPxUv1rzPSMpMnS6xbU53aY8= github.com/ethersphere/ethproxy v0.0.5 h1:j5Mkm45jqmkET6NwGaJtaxOSFbhoAfOKzHiwHl6DBT0= github.com/ethersphere/ethproxy v0.0.5/go.mod h1:7mkVRK3+Mte00jLxFAbUQ/cBepAzwTYpkE64ItCLZYw= -github.com/ethersphere/go-sw3-abi v0.4.0 h1:T3ANY+ktWrPAwe2U0tZi+DILpkHzto5ym/XwV/Bbz8g= -github.com/ethersphere/go-sw3-abi v0.4.0/go.mod h1:BmpsvJ8idQZdYEtWnvxA8POYQ8Rl/NhyCdF0zLMOOJU= +github.com/ethersphere/go-sw3-abi v0.6.5 h1:M5dcIe1zQYvGpY2K07UNkNU9Obc4U+A1fz68Ho/Q+XE= +github.com/ethersphere/go-sw3-abi v0.6.5/go.mod h1:BmpsvJ8idQZdYEtWnvxA8POYQ8Rl/NhyCdF0zLMOOJU= github.com/ethersphere/node-funder v0.1.1 h1:Y1gzbnmZV2jCecdZuUvAsFeXTWpEFofrIT4UIEuswW0= github.com/ethersphere/node-funder v0.1.1/go.mod h1:pHYJyxpyhuAoI5c4ReaLn1CnuCG5RmcLeU7pGDzlfrI= github.com/evanphx/json-patch v4.11.0+incompatible h1:glyUF9yIYtMHzn8xaKw5rMhdWcwsYV8dZHIq5567/xs= @@ -331,8 +331,10 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= -github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/qbZg= -github.com/klauspost/cpuid/v2 v2.2.5/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= +github.com/klauspost/cpuid/v2 v2.2.6 h1:ndNyv040zDGIDh8thGkXYjnFtiN02M1PVVF+JE/48xc= +github.com/klauspost/cpuid/v2 v2.2.6/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= +github.com/klauspost/reedsolomon v1.11.8 h1:s8RpUW5TK4hjr+djiOpbZJB4ksx+TdYbRH7vHQpwPOY= +github.com/klauspost/reedsolomon v1.11.8/go.mod h1:4bXRN+cVzMdml6ti7qLouuYi32KHJ5MGv0Qd8a47h6A= github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= diff --git a/pkg/check/redundancy/redundancy.go b/pkg/check/redundancy/redundancy.go new file mode 100644 index 000000000..489a709ae --- /dev/null +++ b/pkg/check/redundancy/redundancy.go @@ -0,0 +1,182 @@ +package redundancy + +import ( + "bytes" + "context" + "fmt" + "io" + "time" + + "github.com/ethersphere/bee/pkg/file/pipeline/builder" + "github.com/ethersphere/bee/pkg/file/redundancy" + "github.com/ethersphere/bee/pkg/storage" + "github.com/ethersphere/bee/pkg/swarm" + "github.com/ethersphere/beekeeper/pkg/bee" + "github.com/ethersphere/beekeeper/pkg/bee/api" + "github.com/ethersphere/beekeeper/pkg/beekeeper" + "github.com/ethersphere/beekeeper/pkg/logging" + "github.com/ethersphere/beekeeper/pkg/orchestration" + "github.com/ethersphere/beekeeper/pkg/random" +) + +type Options struct { + PostageAmount int64 + PostageDepth uint64 + Seed int64 + DataSize int64 +} + +func NewDefaultOptions() Options { + return Options{ + PostageAmount: 1500000, + PostageDepth: 22, + Seed: time.Now().UnixNano(), + DataSize: 307200, + } +} + +var _ beekeeper.Action = (*Check)(nil) + +type pipelineFn func(ctx context.Context, r io.Reader) (swarm.Address, error) + +func requestPipelineFn(s storage.Putter, encrypt bool, rLevel redundancy.Level) pipelineFn { + return func(ctx context.Context, r io.Reader) (swarm.Address, error) { + pipe := builder.NewPipelineBuilder(ctx, s, encrypt, rLevel) + return builder.FeedPipeline(ctx, pipe, r) + } +} + +type Check struct { + logger logging.Logger +} + +func NewCheck(logger logging.Logger) beekeeper.Action { + return &Check{ + logger: logger, + } +} + +func (c *Check) Run(ctx context.Context, cluster orchestration.Cluster, o interface{}) (err error) { + opts, ok := o.(Options) + if !ok { + return fmt.Errorf("invalid options type") + } + + time.Sleep(10 * time.Second) + + for i := 1; i < 5; i++ { // skip level 0 + c.logger.Infof("started rLevel %d", i) + uploadClient, downloadClient, err := getClients(ctx, cluster, opts.Seed) + if err != nil { + return fmt.Errorf("get clients: %w", err) + } + + root, data, chunks, err := c.generateChunks(ctx, opts.DataSize, redundancy.Level(i), opts.Seed) + if err != nil { + return fmt.Errorf("get chunks: %w", err) + } + c.logger.Infof("root hash: %s, chunks: %d", root.String(), len(chunks)) + + batchID, err := uploadClient.GetOrCreateBatch(ctx, opts.PostageAmount, opts.PostageDepth, "ci-redundancy") + if err != nil { + return fmt.Errorf("get or create batch: %w", err) + } + + err = c.uploadChunks(ctx, uploadClient, chunks, redundancy.Level(i), opts.Seed, batchID) + if err != nil { + return fmt.Errorf("upload chunks: %w", err) + } + c.logger.Infof("upload completed. Downloading %s", root.String()) + d, err := downloadClient.DownloadBytes(ctx, root) + if err != nil { + return fmt.Errorf("download bytes: %w", err) + } + + if !bytes.Equal(data, d) { + return fmt.Errorf("download and initial content dont match") + } + + c.logger.Infof("rLevel %d completed successfully", i) + } + return nil +} + +func (c *Check) generateChunks(ctx context.Context, size int64, rLevel redundancy.Level, seed int64) (swarm.Address, []byte, []swarm.Chunk, error) { + putter := &splitPutter{ + chunks: make([]swarm.Chunk, 0), + } + + buf := make([]byte, size) + rnd := random.PseudoGenerator(seed) + _, err := rnd.Read(buf) + if err != nil { + return swarm.ZeroAddress, nil, nil, err + } + + p := requestPipelineFn(putter, false, rLevel) + rootAddr, err := p(ctx, bytes.NewReader(buf)) + if err != nil { + return swarm.ZeroAddress, nil, nil, err + } + + return rootAddr, buf, putter.chunks, nil +} + +func (c *Check) uploadChunks(ctx context.Context, client *bee.Client, chunks []swarm.Chunk, rLevel redundancy.Level, seed int64, batchID string) error { + rate := 0.0 + switch rLevel { + case redundancy.MEDIUM: + rate = 0.01 + case redundancy.STRONG: + rate = 0.05 + case redundancy.INSANE: + rate = 0.1 + case redundancy.PARANOID: + rate = 0.35 + } + + rnd := random.PseudoGenerator(seed) + indices := rnd.Perm(len(chunks) - 1) + offset := int(rate * float64(len(chunks))) + indices = append(indices[offset:], len(chunks)-1) + + c.logger.Infof("uploading %d chunks out of %d", len(indices), len(chunks)) + for i, j := range indices { + _, err := client.UploadChunk(ctx, chunks[j].Data(), api.UploadOptions{ + BatchID: batchID, + Direct: true, + }) + if err != nil { + return fmt.Errorf("upload chunk %d of %d: %w", i+1, len(indices), err) + } + } + return nil +} + +func getClients(ctx context.Context, cluster orchestration.Cluster, seed int64) (*bee.Client, *bee.Client, error) { + nodeNames := cluster.FullNodeNames() + clients, err := cluster.NodesClients(ctx) + if err != nil { + return nil, nil, err + } + rnd := random.PseudoGenerator(seed) + var cUpload, cDownload *bee.Client + for { + perm := rnd.Perm(len(nodeNames)) + if perm[0] != perm[1] { + cUpload = clients[nodeNames[perm[0]]] + cDownload = clients[nodeNames[perm[1]]] + break + } + } + return cUpload, cDownload, nil +} + +type splitPutter struct { + chunks []swarm.Chunk +} + +func (s *splitPutter) Put(_ context.Context, chunk swarm.Chunk) error { + s.chunks = append(s.chunks, chunk) + return nil +} diff --git a/pkg/config/check.go b/pkg/config/check.go index 7ee743912..4b69a1507 100644 --- a/pkg/config/check.go +++ b/pkg/config/check.go @@ -26,6 +26,7 @@ import ( "github.com/ethersphere/beekeeper/pkg/check/pss" "github.com/ethersphere/beekeeper/pkg/check/pullsync" "github.com/ethersphere/beekeeper/pkg/check/pushsync" + "github.com/ethersphere/beekeeper/pkg/check/redundancy" "github.com/ethersphere/beekeeper/pkg/check/retrieval" "github.com/ethersphere/beekeeper/pkg/check/settlements" "github.com/ethersphere/beekeeper/pkg/check/smoke" @@ -552,6 +553,27 @@ var Checks = map[string]CheckType{ return nil, fmt.Errorf("applying options: %w", err) } + return opts, nil + }, + }, + "redundancy": { + NewAction: redundancy.NewCheck, + NewOptions: func(checkGlobalConfig CheckGlobalConfig, check Check) (interface{}, error) { + checkOpts := new(struct { + PostageAmount *int `yaml:"postage-amount"` + PostageDepth *int `yaml:"postage-depth"` + Seed *int `yaml:"seed"` + DataSize *int `yaml:"data-size"` + }) + if err := check.Options.Decode(checkOpts); err != nil { + return nil, fmt.Errorf("decoding check %s options: %w", check.Type, err) + } + opts := redundancy.NewDefaultOptions() + + if err := applyCheckConfig(checkGlobalConfig, checkOpts, &opts); err != nil { + return nil, fmt.Errorf("applying options: %w", err) + } + return opts, nil }, },