From 5ed519a9083d60c48383884a3f31e0f8343b7dd7 Mon Sep 17 00:00:00 2001 From: Acha Bill <57879913+acha-bill@users.noreply.github.com> Date: Fri, 15 Nov 2024 14:05:34 -0500 Subject: [PATCH] feat: gsoc check (#433) * feat: gsoc test * fix: lint * fix: review comments * fix: review * fix: bump version --- config/config.yaml | 8 + config/local.yaml | 7 + pkg/check/gsoc/gsoc.go | 346 +++++++++++++++++++++++++++++++++++++++++ pkg/config/check.go | 21 +++ version.go | 2 +- 5 files changed, 383 insertions(+), 1 deletion(-) create mode 100644 pkg/check/gsoc/gsoc.go diff --git a/config/config.yaml b/config/config.yaml index 58cace0d..93007dcd 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -380,6 +380,14 @@ checks: timeout: 12h type: load + gsoc: + options: + postage-amount: 100000 + postage-depth: 20 + postage-label: gsoc-label + timeout: 10m + type: gsoc + # simulations defines simulations Beekeeper can execute against the cluster # type filed allows defining same simulation with different names and options simulations: diff --git a/config/local.yaml b/config/local.yaml index f7b7b6e0..a735a476 100644 --- a/config/local.yaml +++ b/config/local.yaml @@ -335,3 +335,10 @@ checks: seed: data-size: type: redundancy + ci-gsoc: + options: + postage-amount: 100000 + postage-depth: 20 + postage-label: gsoc-label + timeout: 10m + type: gsoc diff --git a/pkg/check/gsoc/gsoc.go b/pkg/check/gsoc/gsoc.go new file mode 100644 index 00000000..9fb7ace6 --- /dev/null +++ b/pkg/check/gsoc/gsoc.go @@ -0,0 +1,346 @@ +package gsoc + +import ( + "context" + "crypto/ecdsa" + "encoding/binary" + "encoding/hex" + "fmt" + "net/http" + "strconv" + "sync" + "time" + + "github.com/ethersphere/bee/v2/pkg/cac" + "github.com/ethersphere/bee/v2/pkg/crypto" + "github.com/ethersphere/bee/v2/pkg/soc" + "github.com/ethersphere/bee/v2/pkg/swarm" + "github.com/ethersphere/beekeeper/pkg/bee" + "github.com/ethersphere/beekeeper/pkg/beekeeper" + "github.com/ethersphere/beekeeper/pkg/logging" + "github.com/ethersphere/beekeeper/pkg/orchestration" + "github.com/gorilla/websocket" + "golang.org/x/sync/errgroup" +) + +// Options represents check options +type Options struct { + PostageAmount int64 + PostageDepth uint64 + PostageLabel string +} + +// NewDefaultOptions returns new default options +func NewDefaultOptions() Options { + return Options{ + PostageAmount: 1000, + PostageDepth: 17, + PostageLabel: "test-label", + } +} + +// compile check whether Check implements interface +var _ beekeeper.Action = (*Check)(nil) + +// Check instance. +type Check struct { + logger logging.Logger +} + +type socData struct { + Owner string + Sig string + Data []byte +} + +// NewCheck returns a new check instance. +func NewCheck(logger logging.Logger) beekeeper.Action { + return &Check{ + logger: logger, + } +} + +func (c *Check) Run(ctx context.Context, cluster orchestration.Cluster, opts interface{}) (err error) { + o, ok := opts.(Options) + if !ok { + return fmt.Errorf("invalid options type") + } + + fullNodeNames := cluster.FullNodeNames() + clients, err := cluster.NodesClients(ctx) + if err != nil { + return err + } + + if len(fullNodeNames) < 2 { + return fmt.Errorf("gsoc test require at least 2 full nodes") + } + + uploadClient := clients[fullNodeNames[0]] + listenClient := clients[fullNodeNames[1]] + + batches := make([]string, 2) + for i := 0; i < 2; i++ { + c.logger.Infof("gsoc: creating postage batch. amount=%d, depth=%d, label=%s", o.PostageAmount, o.PostageDepth, o.PostageLabel) + batchID, err := uploadClient.CreatePostageBatch(ctx, o.PostageAmount, o.PostageDepth, o.PostageLabel, false) + if err != nil { + return err + } + c.logger.Infof("gsoc: postage batch created: %s", batchID) + batches[i] = batchID + } + + c.logger.Infof("send messages with different postage batches sequentially...") + err = run(ctx, uploadClient, listenClient, batches, c.logger, false) + if err != nil { + return err + } + c.logger.Infof("done") + + c.logger.Infof("send messages with different postage batches parallel...") + err = run(ctx, uploadClient, listenClient, batches, c.logger, true) + if err != nil { + return err + } + c.logger.Infof("done") + + return nil +} + +func run(ctx context.Context, uploadClient *bee.Client, listenClient *bee.Client, batches []string, logger logging.Logger, parallel bool) error { + const numChunks = 10 + privKey, err := crypto.GenerateSecp256k1Key() + if err != nil { + return err + } + + addresses, err := listenClient.Addresses(ctx) + if err != nil { + return err + } + resourceId, socAddress, err := mineResourceId(ctx, addresses.Overlay, privKey, 1) + if err != nil { + return err + } + logger.Infof("gsoc: socAddress=%s, listner node address=%s", socAddress, addresses.Overlay) + + listener := &socListener{} + ch, err := listener.Listen(ctx, listenClient.Config().APIURL.Host, socAddress, logger) + if err != nil { + return fmt.Errorf("listen websocket: %w", err) + } + defer listener.Close() + + received := make(map[string]bool, numChunks) + receivedMtx := new(sync.Mutex) + + go func() { + for p := range ch { + receivedMtx.Lock() + received[p] = true + receivedMtx.Unlock() + } + }() + + if parallel { + err = runInParallel(ctx, uploadClient, numChunks, batches, resourceId, privKey, logger) + } else { + err = runInSequence(ctx, uploadClient, numChunks, batches, resourceId, privKey, logger) + } + if err != nil { + return err + } + + // wait for listener to receive all messages + time.Sleep(5 * time.Second) + listener.Close() + + receivedMtx.Lock() + defer receivedMtx.Unlock() + if len(received) != numChunks { + return fmt.Errorf("expected %d messages, got %d", numChunks, len(received)) + } + + for i := 0; i < numChunks; i++ { + want := fmt.Sprintf("data %d", i) + if !received[want] { + return fmt.Errorf("message '%s' not received", want) + } + } + return nil +} + +func uploadSoc(ctx context.Context, client *bee.Client, payload string, resourceId []byte, batchID string, privKey *ecdsa.PrivateKey) error { + d, err := makeSoc(payload, resourceId, privKey) + if err != nil { + return fmt.Errorf("make soc: %w", err) + } + _, err = client.UploadSOC(ctx, d.Owner, hex.EncodeToString(resourceId), d.Sig, d.Data, batchID) + if err != nil { + return fmt.Errorf("upload soc: %w", err) + } + return nil +} + +func runInSequence(ctx context.Context, client *bee.Client, numChunks int, batches []string, resourceId []byte, privKey *ecdsa.PrivateKey, logger logging.Logger) error { + for i := 0; i < numChunks; i++ { + payload := fmt.Sprintf("data %d", i) + logger.Infof("gsoc: submitting soc to node=%s, payload=%s", client.Name(), payload) + err := uploadSoc(ctx, client, payload, resourceId, batches[i%2], privKey) + if err != nil { + return err + } + } + return nil +} + +func runInParallel(ctx context.Context, client *bee.Client, numChunks int, batches []string, resourceId []byte, privKey *ecdsa.PrivateKey, logger logging.Logger) error { + var errG errgroup.Group + for i := 0; i < numChunks; i++ { + errG.Go(func() error { + payload := fmt.Sprintf("data %d", i) + logger.Infof("gsoc: submitting soc to node=%s, payload=%s", client.Name(), payload) + return uploadSoc(ctx, client, payload, resourceId, batches[i%2], privKey) + }) + } + return errG.Wait() +} + +func getTargetNeighborhood(address swarm.Address, depth int) (string, error) { + var targetNeighborhood string + for i := 0; i < depth; i++ { + hexChar := address.String()[i : i+1] + value, err := strconv.ParseUint(hexChar, 16, 4) + if err != nil { + return "", err + } + targetNeighborhood += fmt.Sprintf("%04b", value) + } + return targetNeighborhood, nil +} + +func mineResourceId(ctx context.Context, overlay swarm.Address, privKey *ecdsa.PrivateKey, depth int) ([]byte, swarm.Address, error) { + targetNeighborhood, err := getTargetNeighborhood(overlay, depth) + if err != nil { + return nil, swarm.ZeroAddress, err + } + + neighborhood, err := swarm.ParseBitStrAddress(targetNeighborhood) + if err != nil { + return nil, swarm.ZeroAddress, err + } + nonce := make([]byte, 32) + prox := len(targetNeighborhood) + owner, err := crypto.NewEthereumAddress(privKey.PublicKey) + if err != nil { + return nil, swarm.ZeroAddress, err + } + + i := uint64(0) + for { + select { + case <-ctx.Done(): + return nil, swarm.ZeroAddress, ctx.Err() + default: + } + + binary.LittleEndian.PutUint64(nonce, i) + address, err := soc.CreateAddress(nonce, owner) + if err != nil { + return nil, swarm.ZeroAddress, err + } + + if swarm.Proximity(address.Bytes(), neighborhood.Bytes()) >= uint8(prox) { + return nonce, address, nil + } + i++ + } +} + +func makeSoc(msg string, id []byte, privKey *ecdsa.PrivateKey) (*socData, error) { + signer := crypto.NewDefaultSigner(privKey) + + ch, err := cac.New([]byte(msg)) + if err != nil { + return nil, err + } + + sch, err := soc.New(id, ch).Sign(signer) + if err != nil { + return nil, err + } + + chunkData := sch.Data() + signatureBytes := chunkData[swarm.HashSize : swarm.HashSize+swarm.SocSignatureSize] + + publicKey, err := signer.PublicKey() + if err != nil { + return nil, err + } + + ownerBytes, err := crypto.NewEthereumAddress(*publicKey) + if err != nil { + return nil, err + } + + return &socData{ + Owner: hex.EncodeToString(ownerBytes), + Sig: hex.EncodeToString(signatureBytes), + Data: ch.Data(), + }, nil +} + +type socListener struct { + listening bool + ws *websocket.Conn + ch chan string +} + +func (s *socListener) Close() { + if !s.listening { + return + } + + s.listening = false + s.ws.Close() + close(s.ch) +} + +func (s *socListener) Listen(ctx context.Context, host string, addr swarm.Address, logger logging.Logger) (<-chan string, error) { + dialer := &websocket.Dialer{ + Proxy: http.ProxyFromEnvironment, + HandshakeTimeout: 45 * time.Second, + } + + ws, _, err := dialer.DialContext(ctx, fmt.Sprintf("ws://%s/gsoc/subscribe/%s", host, addr), http.Header{}) + if err != nil { + return nil, err + } + s.ws = ws + s.ch = make(chan string) + s.listening = true + + go func() { + for { + select { + case <-ctx.Done(): + return + default: + msgType, data, err := ws.ReadMessage() + if err != nil { + logger.Infof("gsoc: websocket error %v", err) + return + } + if msgType != websocket.BinaryMessage { + logger.Info("gsoc: websocket received non-binary message") + continue + } + + logger.Infof("gsoc: websocket received message %s", string(data)) + s.ch <- string(data) + } + } + }() + + return s.ch, nil +} diff --git a/pkg/config/check.go b/pkg/config/check.go index aefe9f27..47d49028 100644 --- a/pkg/config/check.go +++ b/pkg/config/check.go @@ -17,6 +17,7 @@ import ( "github.com/ethersphere/beekeeper/pkg/check/fileretrieval" "github.com/ethersphere/beekeeper/pkg/check/fullconnectivity" "github.com/ethersphere/beekeeper/pkg/check/gc" + "github.com/ethersphere/beekeeper/pkg/check/gsoc" "github.com/ethersphere/beekeeper/pkg/check/kademlia" "github.com/ethersphere/beekeeper/pkg/check/longavailability" "github.com/ethersphere/beekeeper/pkg/check/manifest" @@ -597,6 +598,26 @@ var Checks = map[string]CheckType{ return nil, fmt.Errorf("applying options: %w", err) } + return opts, nil + }, + }, + "gsoc": { + NewAction: gsoc.NewCheck, + NewOptions: func(checkGlobalConfig CheckGlobalConfig, check Check) (interface{}, error) { + checkOpts := new(struct { + PostageAmount *int64 `yaml:"postage-amount"` + PostageDepth *uint64 `yaml:"postage-depth"` + PostageLabel *string `yaml:"postage-label"` + }) + if err := check.Options.Decode(checkOpts); err != nil { + return nil, fmt.Errorf("decoding check %s options: %w", check.Type, err) + } + opts := gsoc.NewDefaultOptions() + + if err := applyCheckConfig(checkGlobalConfig, checkOpts, &opts); err != nil { + return nil, fmt.Errorf("applying options: %w", err) + } + return opts, nil }, }, diff --git a/version.go b/version.go index beff9452..eb146c25 100644 --- a/version.go +++ b/version.go @@ -1,7 +1,7 @@ package beekeeper var ( - version = "0.20.0" // manually set semantic version number + version = "0.21.0" // manually set semantic version number commit string // automatically set git commit hash // Version TODO