From c0352ee461450342173cbe8286fec59deffea515 Mon Sep 17 00:00:00 2001 From: Acha Bill Date: Sat, 14 Dec 2024 04:22:02 -0500 Subject: [PATCH] feat: feeds check --- config/config.yaml | 7 ++ config/local.yaml | 7 ++ config/public-testnet.yaml | 10 +++ pkg/bee/api/api.go | 27 ++++-- pkg/bee/api/feed.go | 132 ++++++++++++++++++++++++++++ pkg/bee/client.go | 16 ++++ pkg/check/feed/feed.go | 170 +++++++++++++++++++++++++++++++++++++ pkg/config/check.go | 23 +++++ 8 files changed, 387 insertions(+), 5 deletions(-) create mode 100644 pkg/bee/api/feed.go create mode 100644 pkg/check/feed/feed.go diff --git a/config/config.yaml b/config/config.yaml index 6076a166..6eb6d4c7 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -387,6 +387,13 @@ checks: timeout: 10m type: gsoc + feed: + options: + postage-amount: 100000 + postage-depth: 20 + postage-label: feed-label + type: feed + # simulations defines simulations Beekeeper can execute against the cluster # type filed allows defining same simulation with different names and options simulations: diff --git a/config/local.yaml b/config/local.yaml index 38fba968..ccdefcbd 100644 --- a/config/local.yaml +++ b/config/local.yaml @@ -341,3 +341,10 @@ checks: postage-label: gsoc-label timeout: 10m type: gsoc + ci-feed: + options: + postage-amount: 100000 + postage-depth: 20 + postage-label: feed-label + type: feed + diff --git a/config/public-testnet.yaml b/config/public-testnet.yaml index 36b30245..ea57e1ce 100644 --- a/config/public-testnet.yaml +++ b/config/public-testnet.yaml @@ -143,3 +143,13 @@ checks: - bee-4 timeout: 12h type: load + pt-feed: + options: + postage-amount: 140000000 + postage-depth: 20 + postage-label: feed-label + type: feed + pt-feed-availability: + options: + root-ref: "6c8ee8e33d1eb652cd595dc34ab212ca1970762ad02364c7b45e8c3ba7742666" + type: feed diff --git a/pkg/bee/api/api.go b/pkg/bee/api/api.go index ea44bf3f..d990bd49 100644 --- a/pkg/bee/api/api.go +++ b/pkg/bee/api/api.go @@ -19,6 +19,7 @@ import ( const ( apiVersion = "v1" contentType = "application/json, text/plain, */*; charset=utf-8" + etagHeader = "ETag" postageStampBatchHeader = "Swarm-Postage-Batch-Id" deferredUploadHeader = "Swarm-Deferred-Upload" swarmAct = "Swarm-Act" @@ -29,6 +30,10 @@ const ( swarmTagHeader = "Swarm-Tag" swarmCacheDownloadHeader = "Swarm-Cache" swarmRedundancyFallbackMode = "Swarm-Redundancy-Fallback-Mode" + swarmOnlyRootChunk = "Swarm-Only-Root-Chunk" + swarmSocSignatureHeader = "Swarm-Soc-Signature" + swarmFeedIndexHeader = "Swarm-Feed-Index" + swarmFeedIndexNextHeader = "Swarm-Feed-Index-Next" ) var userAgent = "beekeeper/" + beekeeper.Version @@ -53,6 +58,7 @@ type Client struct { PingPong *PingPongService Postage *PostageService Stake *StakingService + Feed *FeedService } // ClientOptions holds optional parameters for the Client. @@ -92,6 +98,7 @@ func newClient(httpClient *http.Client) (c *Client) { c.PingPong = (*PingPongService)(&c.service) c.Postage = (*PostageService)(&c.service) c.Stake = (*StakingService)(&c.service) + c.Feed = (*FeedService)(&c.service) return c } @@ -177,9 +184,15 @@ func encodeJSON(w io.Writer, v interface{}) (err error) { // requestData handles the HTTP request response cycle. func (c *Client) requestData(ctx context.Context, method, path string, body io.Reader, opts *DownloadOptions) (resp io.ReadCloser, err error) { + b, _, err := c.requestDataGetHeader(ctx, method, path, body, opts) + return b, err +} + +// requestDataGetHeader handles the HTTP request response cycle and returns the response body and header. +func (c *Client) requestDataGetHeader(ctx context.Context, method, path string, body io.Reader, opts *DownloadOptions) (resp io.ReadCloser, h http.Header, err error) { req, err := http.NewRequest(method, path, body) if err != nil { - return nil, err + return nil, nil, err } req = req.WithContext(ctx) @@ -203,6 +216,10 @@ func (c *Client) requestData(ctx context.Context, method, path string, body io.R } } + if opts != nil && opts.OnlyRootChunk != nil { + req.Header.Set(swarmOnlyRootChunk, strconv.FormatBool(*opts.OnlyRootChunk)) + } + if opts != nil && opts.Cache != nil { req.Header.Set(swarmCacheDownloadHeader, strconv.FormatBool(*opts.Cache)) } @@ -211,14 +228,13 @@ func (c *Client) requestData(ctx context.Context, method, path string, body io.R } r, err := c.httpClient.Do(req) if err != nil { - return nil, err + return nil, nil, err } if err = responseErrorHandler(r); err != nil { - return nil, err + return nil, nil, err } - - return r.Body, nil + return r.Body, r.Header, nil } // requestWithHeader handles the HTTP request response cycle. @@ -333,4 +349,5 @@ type DownloadOptions struct { ActTimestamp *uint64 Cache *bool RedundancyFallbackMode *bool + OnlyRootChunk *bool } diff --git a/pkg/bee/api/feed.go b/pkg/bee/api/feed.go new file mode 100644 index 00000000..e6478746 --- /dev/null +++ b/pkg/bee/api/feed.go @@ -0,0 +1,132 @@ +package api + +import ( + "bytes" + "context" + "encoding/binary" + "encoding/hex" + "io" + "net/http" + "strconv" + "time" + + "github.com/ethersphere/bee/v2/pkg/cac" + "github.com/ethersphere/bee/v2/pkg/crypto" + "github.com/ethersphere/bee/v2/pkg/soc" + "github.com/ethersphere/bee/v2/pkg/swarm" +) + +// FeedService represents Bee's Feed service +type FeedService service + +// FeedUploadResponse represents a feed upload response +type FeedUploadResponse struct { + Reference swarm.Address `json:"reference"` + Owner string + Topic string +} + +// FindFeedUpdateResponse represents a feed update response +type FindFeedUpdateResponse struct { + SocSignature string + Index uint64 + NextIndex uint64 + Data []byte + Reference swarm.Address +} + +func ownerFromSigner(signer crypto.Signer) (string, error) { + publicKey, err := signer.PublicKey() + if err != nil { + return "", err + } + ownerBytes, err := crypto.NewEthereumAddress(*publicKey) + if err != nil { + return "", err + } + return hex.EncodeToString(ownerBytes), nil +} + +// CreateRootManifest creates an initial feed root manifest +func (f *FeedService) CreateRootManifest(ctx context.Context, signer crypto.Signer, topic []byte, o UploadOptions) (*FeedUploadResponse, error) { + ownerHex, err := ownerFromSigner(signer) + topicHex := hex.EncodeToString(topic) + h := http.Header{} + if o.Pin { + h.Add(swarmPinHeader, "true") + } + h.Add(postageStampBatchHeader, o.BatchID) + var response FeedUploadResponse + err = f.client.requestWithHeader(ctx, http.MethodPost, "/"+apiVersion+"/feeds/"+ownerHex+"/"+topicHex, h, nil, &response) + if err != nil { + return nil, err + } + + response.Owner = ownerHex + response.Topic = topicHex + return &response, nil +} + +// UpdateWithReference updates a feed with a reference +func (f *FeedService) UpdateWithReference(ctx context.Context, signer crypto.Signer, topic []byte, i uint64, addr swarm.Address, o UploadOptions) (*SocResponse, error) { + ts := make([]byte, 8) + binary.BigEndian.PutUint64(ts, uint64(time.Now().Unix())) + ch, err := cac.New(append(append([]byte{}, ts...), addr.Bytes()...)) + if err != nil { + return nil, err + } + ownerHex, err := ownerFromSigner(signer) + index := make([]byte, 8) + binary.BigEndian.PutUint64(index, i) + idBytes, err := crypto.LegacyKeccak256(append(append([]byte{}, topic...), index...)) + if err != nil { + return nil, err + } + sch, err := soc.New(idBytes, ch).Sign(signer) + if err != nil { + return nil, err + } + chunkData := sch.Data() + signatureBytes := chunkData[swarm.HashSize : swarm.HashSize+swarm.SocSignatureSize] + id := hex.EncodeToString(idBytes) + sig := hex.EncodeToString(signatureBytes) + res, err := f.client.SOC.UploadSOC(ctx, ownerHex, id, sig, bytes.NewReader(ch.Data()), o.BatchID) + if err != nil { + return nil, err + } + return res, nil +} + +// FindUpdate finds the latest update for a feed +func (f *FeedService) FindUpdate(ctx context.Context, signer crypto.Signer, topic []byte, o *DownloadOptions) (*FindFeedUpdateResponse, error) { + ownerHex, err := ownerFromSigner(signer) + topicHex := hex.EncodeToString(topic) + res, header, err := f.client.requestDataGetHeader(ctx, http.MethodGet, "/"+apiVersion+"/feeds/"+ownerHex+"/"+topicHex, nil, o) + if err != nil { + return nil, err + } + defer res.Close() + b, err := io.ReadAll(res) + if err != nil { + return nil, err + } + index, err := strconv.ParseUint(header.Get(swarmFeedIndexHeader), 10, 64) + if err != nil { + return nil, err + } + nextIndex, err := strconv.ParseUint(header.Get(swarmFeedIndexNextHeader), 10, 64) + if err != nil { + return nil, err + } + ref, err := swarm.ParseHexAddress(header.Get(etagHeader)) + if err != nil { + return nil, err + } + return &FindFeedUpdateResponse{ + SocSignature: header.Get(swarmSocSignatureHeader), + Index: index, + NextIndex: nextIndex, + Data: b, + Reference: ref, + }, nil +} diff --git a/pkg/bee/client.go b/pkg/bee/client.go index 9ea528e6..45c65ecc 100644 --- a/pkg/bee/client.go +++ b/pkg/bee/client.go @@ -7,6 +7,7 @@ import ( "encoding/json" "errors" "fmt" + "github.com/ethersphere/bee/v2/pkg/crypto" "io" "math/big" "net/http" @@ -940,3 +941,18 @@ func (c *Client) Withdraw(ctx context.Context, token, addr string, amount int64) return nil } + +// CreateRootFeedManifest creates an initial root manifest +func (c *Client) CreateRootFeedManifest(ctx context.Context, signer crypto.Signer, topic []byte, o api.UploadOptions) (*api.FeedUploadResponse, error) { + return c.api.Feed.CreateRootManifest(ctx, signer, topic, o) +} + +// UpdateFeedWithReference updates a feed with a reference +func (c *Client) UpdateFeedWithReference(ctx context.Context, signer crypto.Signer, topic []byte, i uint64, addr swarm.Address, o api.UploadOptions) (*api.SocResponse, error) { + return c.api.Feed.UpdateWithReference(ctx, signer, topic, i, addr, o) +} + +// FindFeedUpdate finds the latest update for a feed +func (c *Client) FindFeedUpdate(ctx context.Context, signer crypto.Signer, topic []byte, o *api.DownloadOptions) (*api.FindFeedUpdateResponse, error) { + return c.api.Feed.FindUpdate(ctx, signer, topic, o) +} diff --git a/pkg/check/feed/feed.go b/pkg/check/feed/feed.go new file mode 100644 index 00000000..6845ea85 --- /dev/null +++ b/pkg/check/feed/feed.go @@ -0,0 +1,170 @@ +package feed + +import ( + "bytes" + "context" + "fmt" + "github.com/ethersphere/bee/v2/pkg/swarm" + "time" + + "github.com/ethersphere/bee/v2/pkg/crypto" + "github.com/ethersphere/beekeeper/pkg/bee" + "github.com/ethersphere/beekeeper/pkg/bee/api" + "github.com/ethersphere/beekeeper/pkg/beekeeper" + "github.com/ethersphere/beekeeper/pkg/logging" + "github.com/ethersphere/beekeeper/pkg/orchestration" +) + +// Options represents check options +type Options struct { + PostageAmount int64 + PostageDepth uint64 + PostageLabel string + NUpdates int + RootRef string +} + +// NewDefaultOptions returns new default options +func NewDefaultOptions() Options { + return Options{ + PostageAmount: 1000, + PostageDepth: 17, + PostageLabel: "test-label", + NUpdates: 2, + } +} + +// compile check whether Check implements interface +var _ beekeeper.Action = (*Check)(nil) + +// Check instance. +type Check struct { + logger logging.Logger +} + +// NewCheck returns a new check instance. +func NewCheck(logger logging.Logger) beekeeper.Action { + return &Check{ + logger: logger, + } +} + +func (c *Check) Run(ctx context.Context, cluster orchestration.Cluster, opts interface{}) (err error) { + o, ok := opts.(Options) + if !ok { + return fmt.Errorf("invalid options type") + } + + if o.RootRef != "" { + c.logger.Infof("running availability check") + return c.availability(ctx, cluster, o) + } + return c.regular(ctx, cluster, o) +} + +func (c *Check) availability(ctx context.Context, cluster orchestration.Cluster, o Options) error { + ref, err := swarm.ParseHexAddress(o.RootRef) + if err != nil { + return fmt.Errorf("invalid root ref: %w", err) + } + + nodeNames := cluster.FullNodeNames() + nodeName := nodeNames[0] + clients, err := cluster.NodesClients(ctx) + if err != nil { + return err + } + + client := clients[nodeName] + _, _, err = client.DownloadFile(ctx, ref, nil) + if err != nil { + return err + } + return nil +} + +func (c *Check) regular(ctx context.Context, cluster orchestration.Cluster, o Options) error { + nodeNames := cluster.FullNodeNames() + nodeName := nodeNames[0] + clients, err := cluster.NodesClients(ctx) + if err != nil { + return err + } + client := clients[nodeName] + + batchID, err := client.GetOrCreateMutableBatch(ctx, o.PostageAmount, o.PostageDepth, o.PostageLabel) + if err != nil { + return err + } + + privKey, err := crypto.GenerateSecp256k1Key() + if err != nil { + return err + } + + signer := crypto.NewDefaultSigner(privKey) + topic, err := crypto.LegacyKeccak256([]byte("my-topic")) + if err != nil { + return err + } + + // create root + createManifestRes, err := client.CreateRootFeedManifest(ctx, signer, topic, api.UploadOptions{BatchID: batchID}) + if err != nil { + return err + } + c.logger.Infof("node %s: manifest created", nodeName) + c.logger.Infof("reference: %s", createManifestRes.Reference) + c.logger.Infof("owner: %s", createManifestRes.Owner) + c.logger.Infof("topic: %s", createManifestRes.Topic) + + // make updates + for i := 0; i < o.NUpdates; i++ { + time.Sleep(2 * time.Second) + data := fmt.Sprintf("update-%d", i) + fName := fmt.Sprintf("file-%d", i) + file := bee.NewBufferFile(fName, bytes.NewBuffer([]byte(data))) + err = client.UploadFile(context.Background(), &file, api.UploadOptions{ + BatchID: batchID, + Direct: true, + }) + if err != nil { + return err + } + ref := file.Address() + socRes, err := client.UpdateFeedWithReference(ctx, signer, topic, uint64(i), ref, api.UploadOptions{BatchID: batchID}) + if err != nil { + return err + } + c.logger.Infof("node %s: feed updated", nodeName) + c.logger.Infof("soc reference: %s", socRes.Reference) + c.logger.Infof("wrapped reference: %s", file.Address()) + } + time.Sleep(2 * time.Second) + + // fetch update + update, err := client.FindFeedUpdate(ctx, signer, topic, nil) + if err != nil { + return err + } + + c.logger.Infof("node %s: feed update found", nodeName) + c.logger.Infof("reference: %d", update.Reference) + c.logger.Infof("index: %d", update.Index) + c.logger.Infof("next index: %d", update.NextIndex) + + if update.NextIndex == uint64(o.NUpdates) { + return fmt.Errorf("expected next index to be 2, got %d", update.NextIndex) + } + + // fetch feed via bzz + d, err := client.DownloadFileBytes(ctx, createManifestRes.Reference, nil) + if err != nil { + return fmt.Errorf("download root feed: %w", err) + } + lastUpdateData := fmt.Sprintf("update-%d", o.NUpdates-1) + if string(d) != lastUpdateData { + return fmt.Errorf("expected file content to be %s, got %s", lastUpdateData, string(d)) + } + return nil +} diff --git a/pkg/config/check.go b/pkg/config/check.go index 47d49028..1123de72 100644 --- a/pkg/config/check.go +++ b/pkg/config/check.go @@ -2,6 +2,7 @@ package config import ( "fmt" + "github.com/ethersphere/beekeeper/pkg/check/feed" "math/big" "reflect" "time" @@ -618,6 +619,28 @@ var Checks = map[string]CheckType{ return nil, fmt.Errorf("applying options: %w", err) } + return opts, nil + }, + }, + "feed": { + NewAction: feed.NewCheck, + NewOptions: func(checkGlobalConfig CheckGlobalConfig, check Check) (interface{}, error) { + checkOpts := new(struct { + PostageAmount *int64 `yaml:"postage-amount"` + PostageDepth *uint64 `yaml:"postage-depth"` + PostageLabel *string `yaml:"postage-label"` + NUpdates *int `yaml:"n-updates"` + RootRef *string `yaml:"root-ref"` + }) + if err := check.Options.Decode(checkOpts); err != nil { + return nil, fmt.Errorf("decoding check %s options: %w", check.Type, err) + } + opts := feed.NewDefaultOptions() + + if err := applyCheckConfig(checkGlobalConfig, checkOpts, &opts); err != nil { + return nil, fmt.Errorf("applying options: %w", err) + } + return opts, nil }, },