Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(retrieval): use only full nodes except bootnodes #430

Merged
merged 2 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion pkg/bee/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ type Client struct {

// ClientOptions holds optional parameters for the Client.
type ClientOptions struct {
APIURL *url.URL
APIInsecureTLS bool
APIURL *url.URL
Name string
Retry int
}

Expand Down Expand Up @@ -67,6 +68,10 @@ type Addresses struct {
PSSPublicKey string
}

func (c *Client) Name() string {
return c.opts.Name
}

func (c *Client) Config() ClientOptions {
return c.opts
}
Expand Down
65 changes: 38 additions & 27 deletions pkg/check/retrieval/retrieval.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
package retrieval

import (
"bytes"
"context"
"errors"
"fmt"
"time"

"github.com/ethersphere/beekeeper/pkg/bee"
"github.com/ethersphere/beekeeper/pkg/bee/api"
"github.com/ethersphere/beekeeper/pkg/beekeeper"
"github.com/ethersphere/beekeeper/pkg/logging"
"github.com/ethersphere/beekeeper/pkg/orchestration"
"github.com/ethersphere/beekeeper/pkg/random"
test "github.com/ethersphere/beekeeper/pkg/test"
)

// Options represents check options
type Options struct {
ChunksPerNode int // number of chunks to upload per node
GasPrice string
PostageAmount int64
PostageDepth uint64
PostageLabel string
Expand All @@ -28,7 +29,6 @@ type Options struct {
func NewDefaultOptions() Options {
return Options{
ChunksPerNode: 1,
GasPrice: "",
PostageAmount: 1,
PostageLabel: "test-label",
PostageDepth: 16,
Expand Down Expand Up @@ -63,67 +63,78 @@ func (c *Check) Run(ctx context.Context, cluster orchestration.Cluster, opts int
return fmt.Errorf("invalid options type")
}

caseOpts := test.CaseOptions{
GasPrice: o.GasPrice,
PostageAmount: o.PostageAmount,
PostageLabel: o.PostageLabel,
PostageDepth: o.PostageDepth,
Seed: o.Seed,
rnds := random.PseudoGenerators(o.Seed, o.UploadNodeCount)

overlays, err := cluster.FlattenOverlays(ctx)
if err != nil {
return err
}

checkCase, err := test.NewCheckCase(ctx, cluster, caseOpts, c.logger)
clients, err := cluster.NodesClients(ctx)
if err != nil {
return err
}

lastBee := checkCase.LastBee()
nodes := cluster.FullNodeNames()

for i := 0; i < o.UploadNodeCount; i++ {
uploader, err := checkCase.Bee(i).NewChunkUploader(ctx)
uploadNode := clients[nodes[i]]

downloadNodeIndex := (i + 1) % len(nodes) // download from the next node
downloadNode := clients[nodes[downloadNodeIndex]]

batchID, err := uploadNode.GetOrCreateMutableBatch(ctx, o.PostageAmount, o.PostageDepth, o.PostageLabel)
if err != nil {
return err
return fmt.Errorf("node %s: created batched id %w", uploadNode.Name(), err)
}
c.logger.Infof("node %s: created batched id %s", uploadNode.Name(), batchID)

for j := 0; j < o.ChunksPerNode; j++ {
// time upload
t0 := time.Now()

chunk, err := uploader.UploadRandomChunk()
chunk, err := bee.NewRandomChunk(rnds[i], c.logger)
if err != nil {
return fmt.Errorf("node %s: %w", uploadNode.Name(), err)
}

addr, err := uploadNode.UploadChunk(ctx, chunk.Data(), api.UploadOptions{BatchID: batchID})
if err != nil {
return err
return fmt.Errorf("node %s: %w", uploadNode.Name(), err)
}
c.logger.Infof("Uploaded chunk address: %s", addr.String())

d0 := time.Since(t0)

c.metrics.UploadedCounter.WithLabelValues(uploader.Overlay).Inc()
c.metrics.UploadTimeGauge.WithLabelValues(uploader.Overlay, chunk.AddrString()).Set(d0.Seconds())
c.metrics.UploadedCounter.WithLabelValues(overlays[uploadNode.Name()].String()).Inc()
c.metrics.UploadTimeGauge.WithLabelValues(overlays[uploadNode.Name()].String(), chunk.Address().String()).Set(d0.Seconds())
c.metrics.UploadTimeHistogram.Observe(d0.Seconds())

// time download
t1 := time.Now()

data, err := lastBee.DownloadChunk(ctx, chunk.Addr())
downloadData, err := downloadNode.DownloadChunk(ctx, chunk.Address(), "", nil)
if err != nil {
return fmt.Errorf("node %s: %w", lastBee.Name(), err)
return fmt.Errorf("node %s: %w", downloadNode.Name(), err)
}

d1 := time.Since(t1)

c.metrics.DownloadedCounter.WithLabelValues(uploader.Name()).Inc()
c.metrics.DownloadTimeGauge.WithLabelValues(uploader.Name(), chunk.AddrString()).Set(d1.Seconds())
c.metrics.DownloadedCounter.WithLabelValues(uploadNode.Name()).Inc()
c.metrics.DownloadTimeGauge.WithLabelValues(uploadNode.Name(), chunk.Address().String()).Set(d1.Seconds())
c.metrics.DownloadTimeHistogram.Observe(d1.Seconds())

if !chunk.Equals(data) {
c.metrics.NotRetrievedCounter.WithLabelValues(uploader.Name()).Inc()
c.logger.Infof("Node %s. Chunk %d not retrieved successfully. Uploaded size: %d Downloaded size: %d Node: %s Chunk: %s", lastBee.Name(), j, chunk.Size(), len(data), uploader.Name(), chunk.AddrString())
if chunk.Contains(data) {
if !bytes.Equal(chunk.Data(), downloadData) {
c.metrics.NotRetrievedCounter.WithLabelValues(uploadNode.Name()).Inc()
c.logger.Errorf("Chunk not retrieved successfully: DownloadNode=%s, ChunkIndex=%d, UploadedSize=%d, DownloadedSize=%d, UploadNode=%s, ChunkAddress=%s", downloadNode.Name(), j, chunk.Size(), len(downloadData), uploadNode.Name(), chunk.Address().String())
if bytes.Contains(chunk.Data(), downloadData) {
c.logger.Infof("Downloaded data is subset of the uploaded data")
}
return errRetrieval
}

c.metrics.RetrievedCounter.WithLabelValues(uploader.Name()).Inc()
c.logger.Infof("Node %s. Chunk %d retrieved successfully. Node: %s Chunk: %s", lastBee.Name(), j, uploader.Name(), chunk.AddrString())
c.metrics.RetrievedCounter.WithLabelValues(uploadNode.Name()).Inc()
c.logger.Infof("Chunk retrieved successfully: DownloadNode=%s, ChunkIndex=%d, DownloadedSize=%d, UploadNode=%s, ChunkAddress=%s", downloadNode.Name(), j, len(downloadData), uploadNode.Name(), chunk.Address().String())
}
}

Expand Down
1 change: 0 additions & 1 deletion pkg/config/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,6 @@ var Checks = map[string]CheckType{
NewOptions: func(checkGlobalConfig CheckGlobalConfig, check Check) (interface{}, error) {
checkOpts := new(struct {
ChunksPerNode *int `yaml:"chunks-per-node"`
GasPrice *string `yaml:"gas-price"`
PostageAmount *int64 `yaml:"postage-amount"`
PostageDepth *uint64 `yaml:"postage-depth"`
PostageLabel *string `yaml:"postage-label"`
Expand Down
1 change: 1 addition & 0 deletions pkg/orchestration/k8s/nodegroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func (g *NodeGroup) AddNode(ctx context.Context, name string, o orchestration.No
}

beeClientOpts := bee.ClientOptions{
Name: name,
APIURL: aURL,
APIInsecureTLS: g.clusterOpts.APIInsecureTLS,
Retry: 5,
Expand Down
2 changes: 1 addition & 1 deletion pkg/test/case.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@ func NewCheckCase(ctx context.Context, cluster orchestration.Cluster, caseOpts C
}

rnds := random.PseudoGenerators(caseOpts.Seed, len(flatOverlays))
logger.Infof("Seed: %d", caseOpts.Seed)

var (
nodes []BeeV2
count int
)

for name, addr := range flatOverlays {
nodes = append(nodes, BeeV2{
name: name,
Expand Down