Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/ethersphere/beekeeper int…
Browse files Browse the repository at this point in the history
…o golang-v1.22
  • Loading branch information
gacevicljubisa committed Jun 19, 2024
2 parents 6a9a59e + ea0e5a5 commit 3343605
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 59 deletions.
7 changes: 5 additions & 2 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -335,9 +335,12 @@ checks:
nodes-sync-wait: 1m
duration: 12h
downloader-count: 3
upload-group:
uploader-count: 3
max-storage-radius: 2
storage-radius-check-wait: 5m
upload-groups:
- gateway
download-group:
download-groups:
- bee
- light
timeout: 12h
Expand Down
2 changes: 1 addition & 1 deletion pkg/bee/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ func (c *Client) CreatePostageBatch(ctx context.Context, amount int64, depth uin
if err != nil {
return "", fmt.Errorf("print reserve state (after): %w", err)
}
c.log.Infof("reserve state (after buying the batch):\n%s", rs.String())
c.log.Infof("reserve state (after buying the batch): %s", rs.String())
c.log.Infof("created batch id %s with depth %d and amount %d", id, depth, amount)
}
return id, nil
Expand Down
86 changes: 60 additions & 26 deletions pkg/check/smoke/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/beekeeper/pkg/bee"
"github.com/ethersphere/beekeeper/pkg/beekeeper"
"github.com/ethersphere/beekeeper/pkg/logging"
"github.com/ethersphere/beekeeper/pkg/orchestration"
Expand All @@ -25,14 +26,14 @@ var _ beekeeper.Action = (*LoadCheck)(nil)
// Check instance
type LoadCheck struct {
metrics metrics
logger logging.Logger
log logging.Logger
}

// NewCheck returns new check
func NewLoadCheck(logger logging.Logger) beekeeper.Action {
func NewLoadCheck(log logging.Logger) beekeeper.Action {
return &LoadCheck{
metrics: newMetrics("check_load"),
logger: logger,
log: log,
}
}

Expand All @@ -47,9 +48,15 @@ func (c *LoadCheck) Run(ctx context.Context, cluster orchestration.Cluster, opts
return errors.New("no uploaders requested, quiting")
}

c.logger.Info("random seed: ", o.RndSeed)
c.logger.Info("content size: ", o.ContentSize)
c.logger.Info("max batch lifespan: ", o.MaxUseBatch)
if o.MaxStorageRadius == 0 {
return errors.New("max storage radius is not set")
}

c.log.Infof("random seed: %v", o.RndSeed)
c.log.Infof("content size: %v", o.ContentSize)
c.log.Infof("max batch lifespan: %v", o.MaxUseBatch)
c.log.Infof("max storage radius: %v", o.MaxStorageRadius)
c.log.Infof("storage radius check wait time: %v", o.StorageRadiusCheckWait)

clients, err := cluster.NodesClients(ctx)
if err != nil {
Expand All @@ -59,7 +66,7 @@ func (c *LoadCheck) Run(ctx context.Context, cluster orchestration.Cluster, opts
ctx, cancel := context.WithTimeout(ctx, o.Duration)
defer cancel()

test := &test{clients: clients, logger: c.logger}
test := &test{clients: clients, logger: c.log}

uploaders := selectNames(cluster, o.UploadGroups...)
downloaders := selectNames(cluster, o.DownloadGroups...)
Expand All @@ -69,10 +76,10 @@ func (c *LoadCheck) Run(ctx context.Context, cluster orchestration.Cluster, opts
for i := 0; true; i++ {
select {
case <-ctx.Done():
c.logger.Info("we are done")
c.log.Info("we are done")
return nil
default:
c.logger.Infof("starting iteration: #%d", i)
c.log.Infof("starting iteration: #%d", i)
}

var (
Expand All @@ -83,13 +90,13 @@ func (c *LoadCheck) Run(ctx context.Context, cluster orchestration.Cluster, opts

txData = make([]byte, o.ContentSize)
if _, err := crand.Read(txData); err != nil {
c.logger.Infof("unable to create random content: %v", err)
c.log.Infof("unable to create random content: %v", err)
continue
}

txNames := pickRandom(o.UploaderCount, uploaders)

c.logger.Infof("uploader: %s", txNames)
c.log.Infof("uploader: %s", txNames)

var (
upload sync.WaitGroup
Expand All @@ -102,24 +109,30 @@ func (c *LoadCheck) Run(ctx context.Context, cluster orchestration.Cluster, opts
txName := txName

go func() {
defer once.Do(func() { upload.Done() }) // don't wait for all uploads
defer once.Do(func() {
upload.Done()
}) // don't wait for all uploads
for retries := 10; txDuration == 0 && retries > 0; retries-- {
select {
case <-ctx.Done():
c.logger.Info("we are done")
c.log.Info("we are done")
return
default:
}

if !c.checkStorageRadius(ctx, test.clients[txName], o.MaxStorageRadius, o.StorageRadiusCheckWait) {
return
}

c.metrics.UploadAttempts.Inc()
var duration time.Duration
c.logger.Infof("uploading to: %s", txName)
c.log.Infof("uploading to: %s", txName)

batchID := batches.Get(txName)
if batchID == "" {
batchID, err = clients[txName].CreatePostageBatch(ctx, o.PostageAmount, o.PostageDepth, "load-test", true)
if err != nil {
c.logger.Errorf("create new batch: %v", err)
c.log.Errorf("create new batch: %v", err)
return
}
batches.Store(txName, batchID)
Expand All @@ -128,8 +141,8 @@ func (c *LoadCheck) Run(ctx context.Context, cluster orchestration.Cluster, opts
address, duration, err = test.upload(ctx, txName, txData, batchID)
if err != nil {
c.metrics.UploadErrors.Inc()
c.logger.Infof("upload failed: %v", err)
c.logger.Infof("retrying in: %v", o.TxOnErrWait)
c.log.Infof("upload failed: %v", err)
c.log.Infof("retrying in: %v", o.TxOnErrWait)
time.Sleep(o.TxOnErrWait)
return
}
Expand All @@ -144,12 +157,12 @@ func (c *LoadCheck) Run(ctx context.Context, cluster orchestration.Cluster, opts
continue
}

c.logger.Infof("sleeping for: %v seconds", o.NodesSyncWait.Seconds())
c.log.Infof("sleeping for: %v seconds", o.NodesSyncWait.Seconds())
time.Sleep(o.NodesSyncWait) // Wait for nodes to sync.

// pick a batch of downloaders
rxNames := pickRandom(o.DownloaderCount, downloaders)
c.logger.Infof("downloaders: %s", rxNames)
c.log.Infof("downloaders: %s", rxNames)

var wg sync.WaitGroup

Expand All @@ -167,7 +180,7 @@ func (c *LoadCheck) Run(ctx context.Context, cluster orchestration.Cluster, opts
for retries := 10; rxDuration == 0 && retries > 0; retries-- {
select {
case <-ctx.Done():
c.logger.Infof("context done in retry: %v", retries)
c.log.Infof("context done in retry: %v", retries)
return
default:
}
Expand All @@ -177,8 +190,8 @@ func (c *LoadCheck) Run(ctx context.Context, cluster orchestration.Cluster, opts
rxData, rxDuration, err = test.download(ctx, rxName, address)
if err != nil {
c.metrics.DownloadErrors.Inc()
c.logger.Infof("download failed: %v", err)
c.logger.Infof("retrying in: %v", o.RxOnErrWait)
c.log.Infof("download failed: %v", err)
c.log.Infof("retrying in: %v", o.RxOnErrWait)
time.Sleep(o.RxOnErrWait)
}
}
Expand All @@ -189,15 +202,15 @@ func (c *LoadCheck) Run(ctx context.Context, cluster orchestration.Cluster, opts
}

if !bytes.Equal(rxData, txData) {
c.logger.Info("uploaded data does not match downloaded data")
c.log.Info("uploaded data does not match downloaded data")

c.metrics.DownloadMismatch.Inc()

rxLen, txLen := len(rxData), len(txData)
if rxLen != txLen {
c.logger.Infof("length mismatch: download length %d; upload length %d", rxLen, txLen)
c.log.Infof("length mismatch: download length %d; upload length %d", rxLen, txLen)
if txLen < rxLen {
c.logger.Info("length mismatch: rx length is bigger then tx length")
c.log.Info("length mismatch: rx length is bigger then tx length")
}
return
}
Expand All @@ -208,7 +221,7 @@ func (c *LoadCheck) Run(ctx context.Context, cluster orchestration.Cluster, opts
diff++
}
}
c.logger.Infof("data mismatch: found %d different bytes, ~%.2f%%", diff, float64(diff)/float64(txLen)*100)
c.log.Infof("data mismatch: found %d different bytes, ~%.2f%%", diff, float64(diff)/float64(txLen)*100)
return
}

Expand All @@ -225,6 +238,27 @@ func (c *LoadCheck) Run(ctx context.Context, cluster orchestration.Cluster, opts
return nil
}

func (c *LoadCheck) checkStorageRadius(ctx context.Context, client *bee.Client, maxRadius uint8, wait time.Duration) bool {
for {
rs, err := client.ReserveState(ctx)
if err != nil {
c.log.Infof("error getting state: %v", err)
return false
}
if rs.StorageRadius < maxRadius {
return true
}
c.log.Infof("waiting %v for StorageRadius to decrease. Current: %d, Max: %d", wait, rs.StorageRadius, maxRadius)

select {
case <-ctx.Done():
c.log.Infof("context done in StorageRadius check: %v", ctx.Err())
return false
case <-time.After(wait):
}
}
}

func pickRandom(count int, peers []string) (names []string) {
seq := randomIntSeq(count, len(peers))
for _, i := range seq {
Expand Down
36 changes: 20 additions & 16 deletions pkg/check/smoke/smoke.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,27 +29,31 @@ type Options struct {
UploadTimeout time.Duration
DownloadTimeout time.Duration
// load test params
UploaderCount int
UploadGroups []string
DownloaderCount int
DownloadGroups []string
MaxUseBatch time.Duration
UploaderCount int
UploadGroups []string
DownloaderCount int
DownloadGroups []string
MaxUseBatch time.Duration
MaxStorageRadius uint8
StorageRadiusCheckWait time.Duration
}

// NewDefaultOptions returns new default options
func NewDefaultOptions() Options {
return Options{
ContentSize: 5000000,
RndSeed: time.Now().UnixNano(),
PostageAmount: 50_000_000,
PostageDepth: 24,
TxOnErrWait: 10 * time.Second,
RxOnErrWait: 10 * time.Second,
NodesSyncWait: time.Minute,
Duration: 12 * time.Hour,
UploadTimeout: 60 * time.Minute,
DownloadTimeout: 60 * time.Minute,
MaxUseBatch: 12 * time.Hour,
ContentSize: 5000000,
RndSeed: time.Now().UnixNano(),
PostageAmount: 50_000_000,
PostageDepth: 24,
TxOnErrWait: 10 * time.Second,
RxOnErrWait: 10 * time.Second,
NodesSyncWait: time.Minute,
Duration: 12 * time.Hour,
UploadTimeout: 60 * time.Minute,
DownloadTimeout: 60 * time.Minute,
MaxUseBatch: 12 * time.Hour,
MaxStorageRadius: 2,
StorageRadiusCheckWait: 5 * time.Minute,
}
}

Expand Down
30 changes: 16 additions & 14 deletions pkg/config/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,20 +383,22 @@ var Checks = map[string]CheckType{
NewAction: smoke.NewLoadCheck,
NewOptions: func(checkGlobalConfig CheckGlobalConfig, check Check) (interface{}, error) {
checkOpts := new(struct {
ContentSize *int64 `yaml:"content-size"`
RndSeed *int64 `yaml:"rnd-seed"`
PostageAmount *int64 `yaml:"postage-amount"`
PostageDepth *uint64 `yaml:"postage-depth"`
GasPrice *string `yaml:"gas-price"`
TxOnErrWait *time.Duration `yaml:"tx-on-err-wait"`
RxOnErrWait *time.Duration `yaml:"rx-on-err-wait"`
NodesSyncWait *time.Duration `yaml:"nodes-sync-wait"`
Duration *time.Duration `yaml:"duration"`
UploaderCount *int `yaml:"uploader-count"`
UploadGroups *[]string `yaml:"upload-groups"`
DownloaderCount *int `yaml:"downloader-count"`
DownloadGroups *[]string `yaml:"download-groups"`
MaxUseBatch *time.Duration `yaml:"max-use-batch"`
ContentSize *int64 `yaml:"content-size"`
RndSeed *int64 `yaml:"rnd-seed"`
PostageAmount *int64 `yaml:"postage-amount"`
PostageDepth *uint64 `yaml:"postage-depth"`
GasPrice *string `yaml:"gas-price"`
TxOnErrWait *time.Duration `yaml:"tx-on-err-wait"`
RxOnErrWait *time.Duration `yaml:"rx-on-err-wait"`
NodesSyncWait *time.Duration `yaml:"nodes-sync-wait"`
Duration *time.Duration `yaml:"duration"`
UploaderCount *int `yaml:"uploader-count"`
UploadGroups *[]string `yaml:"upload-groups"`
DownloaderCount *int `yaml:"downloader-count"`
DownloadGroups *[]string `yaml:"download-groups"`
MaxUseBatch *time.Duration `yaml:"max-use-batch"`
MaxStorageRadius *uint8 `yaml:"max-storage-radius"`
StorageRadiusCheckWait *time.Duration `yaml:"storage-radius-check-wait"`
})
if err := check.Options.Decode(checkOpts); err != nil {
return nil, fmt.Errorf("decoding check %s options: %w", check.Type, err)
Expand Down

0 comments on commit 3343605

Please sign in to comment.