Skip to content

Commit

Permalink
feat: feeds check
Browse files Browse the repository at this point in the history
  • Loading branch information
acha-bill committed Dec 14, 2024
1 parent bd04a53 commit c0352ee
Show file tree
Hide file tree
Showing 8 changed files with 387 additions and 5 deletions.
7 changes: 7 additions & 0 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,13 @@ checks:
timeout: 10m
type: gsoc

feed:
options:
postage-amount: 100000
postage-depth: 20
postage-label: feed-label
type: feed

# simulations defines simulations Beekeeper can execute against the cluster
# type filed allows defining same simulation with different names and options
simulations:
Expand Down
7 changes: 7 additions & 0 deletions config/local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -341,3 +341,10 @@ checks:
postage-label: gsoc-label
timeout: 10m
type: gsoc
ci-feed:
options:
postage-amount: 100000
postage-depth: 20
postage-label: feed-label
type: feed

10 changes: 10 additions & 0 deletions config/public-testnet.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,13 @@ checks:
- bee-4
timeout: 12h
type: load
pt-feed:
options:
postage-amount: 140000000
postage-depth: 20
postage-label: feed-label
type: feed
pt-feed-availability:
options:
root-ref: "6c8ee8e33d1eb652cd595dc34ab212ca1970762ad02364c7b45e8c3ba7742666"
type: feed
27 changes: 22 additions & 5 deletions pkg/bee/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
const (
apiVersion = "v1"
contentType = "application/json, text/plain, */*; charset=utf-8"
etagHeader = "ETag"
postageStampBatchHeader = "Swarm-Postage-Batch-Id"
deferredUploadHeader = "Swarm-Deferred-Upload"
swarmAct = "Swarm-Act"
Expand All @@ -29,6 +30,10 @@ const (
swarmTagHeader = "Swarm-Tag"
swarmCacheDownloadHeader = "Swarm-Cache"
swarmRedundancyFallbackMode = "Swarm-Redundancy-Fallback-Mode"
swarmOnlyRootChunk = "Swarm-Only-Root-Chunk"
swarmSocSignatureHeader = "Swarm-Soc-Signature"
swarmFeedIndexHeader = "Swarm-Feed-Index"
swarmFeedIndexNextHeader = "Swarm-Feed-Index-Next"
)

var userAgent = "beekeeper/" + beekeeper.Version
Expand All @@ -53,6 +58,7 @@ type Client struct {
PingPong *PingPongService
Postage *PostageService
Stake *StakingService
Feed *FeedService
}

// ClientOptions holds optional parameters for the Client.
Expand Down Expand Up @@ -92,6 +98,7 @@ func newClient(httpClient *http.Client) (c *Client) {
c.PingPong = (*PingPongService)(&c.service)
c.Postage = (*PostageService)(&c.service)
c.Stake = (*StakingService)(&c.service)
c.Feed = (*FeedService)(&c.service)
return c
}

Expand Down Expand Up @@ -177,9 +184,15 @@ func encodeJSON(w io.Writer, v interface{}) (err error) {

// requestData handles the HTTP request response cycle.
func (c *Client) requestData(ctx context.Context, method, path string, body io.Reader, opts *DownloadOptions) (resp io.ReadCloser, err error) {
b, _, err := c.requestDataGetHeader(ctx, method, path, body, opts)
return b, err
}

// requestDataGetHeader handles the HTTP request response cycle and returns the response body and header.
func (c *Client) requestDataGetHeader(ctx context.Context, method, path string, body io.Reader, opts *DownloadOptions) (resp io.ReadCloser, h http.Header, err error) {
req, err := http.NewRequest(method, path, body)
if err != nil {
return nil, err
return nil, nil, err
}
req = req.WithContext(ctx)

Expand All @@ -203,6 +216,10 @@ func (c *Client) requestData(ctx context.Context, method, path string, body io.R
}
}

if opts != nil && opts.OnlyRootChunk != nil {
req.Header.Set(swarmOnlyRootChunk, strconv.FormatBool(*opts.OnlyRootChunk))
}

if opts != nil && opts.Cache != nil {
req.Header.Set(swarmCacheDownloadHeader, strconv.FormatBool(*opts.Cache))
}
Expand All @@ -211,14 +228,13 @@ func (c *Client) requestData(ctx context.Context, method, path string, body io.R
}
r, err := c.httpClient.Do(req)
if err != nil {
return nil, err
return nil, nil, err
}

if err = responseErrorHandler(r); err != nil {
return nil, err
return nil, nil, err
}

return r.Body, nil
return r.Body, r.Header, nil
}

// requestWithHeader handles the HTTP request response cycle.
Expand Down Expand Up @@ -333,4 +349,5 @@ type DownloadOptions struct {
ActTimestamp *uint64
Cache *bool
RedundancyFallbackMode *bool
OnlyRootChunk *bool
}
132 changes: 132 additions & 0 deletions pkg/bee/api/feed.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package api

import (
"bytes"
"context"
"encoding/binary"
"encoding/hex"
"io"
"net/http"
"strconv"
"time"

"github.com/ethersphere/bee/v2/pkg/cac"
"github.com/ethersphere/bee/v2/pkg/crypto"
"github.com/ethersphere/bee/v2/pkg/soc"
"github.com/ethersphere/bee/v2/pkg/swarm"
)

// FeedService represents Bee's Feed service
type FeedService service

// FeedUploadResponse represents a feed upload response
type FeedUploadResponse struct {
Reference swarm.Address `json:"reference"`
Owner string
Topic string
}

// FindFeedUpdateResponse represents a feed update response
type FindFeedUpdateResponse struct {
SocSignature string
Index uint64
NextIndex uint64
Data []byte
Reference swarm.Address
}

func ownerFromSigner(signer crypto.Signer) (string, error) {
publicKey, err := signer.PublicKey()
if err != nil {
return "", err
}
ownerBytes, err := crypto.NewEthereumAddress(*publicKey)
if err != nil {
return "", err
}
return hex.EncodeToString(ownerBytes), nil
}

// CreateRootManifest creates an initial feed root manifest
func (f *FeedService) CreateRootManifest(ctx context.Context, signer crypto.Signer, topic []byte, o UploadOptions) (*FeedUploadResponse, error) {
ownerHex, err := ownerFromSigner(signer)

Check failure on line 52 in pkg/bee/api/feed.go

View workflow job for this annotation

GitHub Actions / Build (ubuntu-latest)

ineffectual assignment to err (ineffassign)
topicHex := hex.EncodeToString(topic)
h := http.Header{}
if o.Pin {
h.Add(swarmPinHeader, "true")
}
h.Add(postageStampBatchHeader, o.BatchID)
var response FeedUploadResponse
err = f.client.requestWithHeader(ctx, http.MethodPost, "/"+apiVersion+"/feeds/"+ownerHex+"/"+topicHex, h, nil, &response)
if err != nil {
return nil, err
}

response.Owner = ownerHex
response.Topic = topicHex
return &response, nil
}

// UpdateWithReference updates a feed with a reference
func (f *FeedService) UpdateWithReference(ctx context.Context, signer crypto.Signer, topic []byte, i uint64, addr swarm.Address, o UploadOptions) (*SocResponse, error) {
ts := make([]byte, 8)
binary.BigEndian.PutUint64(ts, uint64(time.Now().Unix()))
ch, err := cac.New(append(append([]byte{}, ts...), addr.Bytes()...))
if err != nil {
return nil, err
}
ownerHex, err := ownerFromSigner(signer)

Check failure on line 78 in pkg/bee/api/feed.go

View workflow job for this annotation

GitHub Actions / Build (ubuntu-latest)

ineffectual assignment to err (ineffassign)
index := make([]byte, 8)
binary.BigEndian.PutUint64(index, i)
idBytes, err := crypto.LegacyKeccak256(append(append([]byte{}, topic...), index...))
if err != nil {
return nil, err
}
sch, err := soc.New(idBytes, ch).Sign(signer)
if err != nil {
return nil, err
}
chunkData := sch.Data()
signatureBytes := chunkData[swarm.HashSize : swarm.HashSize+swarm.SocSignatureSize]
id := hex.EncodeToString(idBytes)
sig := hex.EncodeToString(signatureBytes)
res, err := f.client.SOC.UploadSOC(ctx, ownerHex, id, sig, bytes.NewReader(ch.Data()), o.BatchID)
if err != nil {
return nil, err
}
return res, nil
}

// FindUpdate finds the latest update for a feed
func (f *FeedService) FindUpdate(ctx context.Context, signer crypto.Signer, topic []byte, o *DownloadOptions) (*FindFeedUpdateResponse, error) {
ownerHex, err := ownerFromSigner(signer)

Check failure on line 102 in pkg/bee/api/feed.go

View workflow job for this annotation

GitHub Actions / Build (ubuntu-latest)

ineffectual assignment to err (ineffassign)
topicHex := hex.EncodeToString(topic)
res, header, err := f.client.requestDataGetHeader(ctx, http.MethodGet, "/"+apiVersion+"/feeds/"+ownerHex+"/"+topicHex, nil, o)
if err != nil {
return nil, err
}
defer res.Close()
b, err := io.ReadAll(res)
if err != nil {
return nil, err
}
index, err := strconv.ParseUint(header.Get(swarmFeedIndexHeader), 10, 64)
if err != nil {
return nil, err
}
nextIndex, err := strconv.ParseUint(header.Get(swarmFeedIndexNextHeader), 10, 64)
if err != nil {
return nil, err
}
ref, err := swarm.ParseHexAddress(header.Get(etagHeader))
if err != nil {
return nil, err
}
return &FindFeedUpdateResponse{
SocSignature: header.Get(swarmSocSignatureHeader),
Index: index,
NextIndex: nextIndex,
Data: b,
Reference: ref,
}, nil
}
16 changes: 16 additions & 0 deletions pkg/bee/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/ethersphere/bee/v2/pkg/crypto"
"io"
"math/big"
"net/http"
Expand Down Expand Up @@ -940,3 +941,18 @@ func (c *Client) Withdraw(ctx context.Context, token, addr string, amount int64)

return nil
}

// CreateRootFeedManifest creates an initial root manifest
func (c *Client) CreateRootFeedManifest(ctx context.Context, signer crypto.Signer, topic []byte, o api.UploadOptions) (*api.FeedUploadResponse, error) {
return c.api.Feed.CreateRootManifest(ctx, signer, topic, o)
}

// UpdateFeedWithReference updates a feed with a reference
func (c *Client) UpdateFeedWithReference(ctx context.Context, signer crypto.Signer, topic []byte, i uint64, addr swarm.Address, o api.UploadOptions) (*api.SocResponse, error) {
return c.api.Feed.UpdateWithReference(ctx, signer, topic, i, addr, o)
}

// FindFeedUpdate finds the latest update for a feed
func (c *Client) FindFeedUpdate(ctx context.Context, signer crypto.Signer, topic []byte, o *api.DownloadOptions) (*api.FindFeedUpdateResponse, error) {
return c.api.Feed.FindUpdate(ctx, signer, topic, o)
}
Loading

0 comments on commit c0352ee

Please sign in to comment.