Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: feeds check #441

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,13 @@ checks:
timeout: 10m
type: gsoc

feed:
options:
postage-amount: 100000
postage-depth: 20
postage-label: feed-label
type: feed

# simulations defines simulations Beekeeper can execute against the cluster
# type filed allows defining same simulation with different names and options
simulations:
Expand Down
7 changes: 7 additions & 0 deletions config/local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -341,3 +341,10 @@ checks:
postage-label: gsoc-label
timeout: 10m
type: gsoc
ci-feed:
options:
postage-amount: 100000
postage-depth: 20
postage-label: feed-label
type: feed

10 changes: 10 additions & 0 deletions config/public-testnet.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,13 @@ checks:
- bee-4
timeout: 12h
type: load
pt-feed:
options:
postage-amount: 140000000
postage-depth: 20
postage-label: feed-label
type: feed
pt-feed-availability:
options:
root-ref: "6c8ee8e33d1eb652cd595dc34ab212ca1970762ad02364c7b45e8c3ba7742666"
type: feed
27 changes: 22 additions & 5 deletions pkg/bee/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
const (
apiVersion = "v1"
contentType = "application/json, text/plain, */*; charset=utf-8"
etagHeader = "ETag"
postageStampBatchHeader = "Swarm-Postage-Batch-Id"
deferredUploadHeader = "Swarm-Deferred-Upload"
swarmAct = "Swarm-Act"
Expand All @@ -29,6 +30,10 @@ const (
swarmTagHeader = "Swarm-Tag"
swarmCacheDownloadHeader = "Swarm-Cache"
swarmRedundancyFallbackMode = "Swarm-Redundancy-Fallback-Mode"
swarmOnlyRootChunk = "Swarm-Only-Root-Chunk"
swarmSocSignatureHeader = "Swarm-Soc-Signature"
swarmFeedIndexHeader = "Swarm-Feed-Index"
swarmFeedIndexNextHeader = "Swarm-Feed-Index-Next"
)

var userAgent = "beekeeper/" + beekeeper.Version
Expand All @@ -53,6 +58,7 @@ type Client struct {
PingPong *PingPongService
Postage *PostageService
Stake *StakingService
Feed *FeedService
}

// ClientOptions holds optional parameters for the Client.
Expand Down Expand Up @@ -92,6 +98,7 @@ func newClient(httpClient *http.Client) (c *Client) {
c.PingPong = (*PingPongService)(&c.service)
c.Postage = (*PostageService)(&c.service)
c.Stake = (*StakingService)(&c.service)
c.Feed = (*FeedService)(&c.service)
return c
}

Expand Down Expand Up @@ -177,9 +184,15 @@ func encodeJSON(w io.Writer, v interface{}) (err error) {

// requestData handles the HTTP request response cycle.
func (c *Client) requestData(ctx context.Context, method, path string, body io.Reader, opts *DownloadOptions) (resp io.ReadCloser, err error) {
b, _, err := c.requestDataGetHeader(ctx, method, path, body, opts)
return b, err
}

// requestDataGetHeader handles the HTTP request response cycle and returns the response body and header.
func (c *Client) requestDataGetHeader(ctx context.Context, method, path string, body io.Reader, opts *DownloadOptions) (resp io.ReadCloser, h http.Header, err error) {
req, err := http.NewRequest(method, path, body)
if err != nil {
return nil, err
return nil, nil, err
}
req = req.WithContext(ctx)

Expand All @@ -203,6 +216,10 @@ func (c *Client) requestData(ctx context.Context, method, path string, body io.R
}
}

if opts != nil && opts.OnlyRootChunk != nil {
req.Header.Set(swarmOnlyRootChunk, strconv.FormatBool(*opts.OnlyRootChunk))
}

if opts != nil && opts.Cache != nil {
req.Header.Set(swarmCacheDownloadHeader, strconv.FormatBool(*opts.Cache))
}
Expand All @@ -211,14 +228,13 @@ func (c *Client) requestData(ctx context.Context, method, path string, body io.R
}
r, err := c.httpClient.Do(req)
if err != nil {
return nil, err
return nil, nil, err
}

if err = responseErrorHandler(r); err != nil {
return nil, err
return nil, nil, err
}

return r.Body, nil
return r.Body, r.Header, nil
}

// requestWithHeader handles the HTTP request response cycle.
Expand Down Expand Up @@ -333,4 +349,5 @@ type DownloadOptions struct {
ActTimestamp *uint64
Cache *bool
RedundancyFallbackMode *bool
OnlyRootChunk *bool
}
141 changes: 141 additions & 0 deletions pkg/bee/api/feed.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package api

import (
"bytes"
"context"
"encoding/binary"
"encoding/hex"
"io"
"net/http"
"strconv"
"time"

"github.com/ethersphere/bee/v2/pkg/cac"
"github.com/ethersphere/bee/v2/pkg/crypto"
"github.com/ethersphere/bee/v2/pkg/soc"
"github.com/ethersphere/bee/v2/pkg/swarm"
)

// FeedService represents Bee's Feed service
type FeedService service

// FeedUploadResponse represents a feed upload response
type FeedUploadResponse struct {
Reference swarm.Address `json:"reference"`
Owner string
Topic string
}

// FindFeedUpdateResponse represents a feed update response
type FindFeedUpdateResponse struct {
SocSignature string
Index uint64
NextIndex uint64
Data []byte
Reference swarm.Address
}

func ownerFromSigner(signer crypto.Signer) (string, error) {
publicKey, err := signer.PublicKey()
if err != nil {
return "", err
}
ownerBytes, err := crypto.NewEthereumAddress(*publicKey)
if err != nil {
return "", err
}
return hex.EncodeToString(ownerBytes), nil
}

// CreateRootManifest creates an initial feed root manifest
func (f *FeedService) CreateRootManifest(ctx context.Context, signer crypto.Signer, topic []byte, o UploadOptions) (*FeedUploadResponse, error) {
ownerHex, err := ownerFromSigner(signer)
if err != nil {
return nil, err
}
topicHex := hex.EncodeToString(topic)
h := http.Header{}
if o.Pin {
h.Add(swarmPinHeader, "true")
}
h.Add(postageStampBatchHeader, o.BatchID)
var response FeedUploadResponse
err = f.client.requestWithHeader(ctx, http.MethodPost, "/"+apiVersion+"/feeds/"+ownerHex+"/"+topicHex, h, nil, &response)
if err != nil {
return nil, err
}

response.Owner = ownerHex
response.Topic = topicHex
return &response, nil
}

// UpdateWithReference updates a feed with a reference
func (f *FeedService) UpdateWithReference(ctx context.Context, signer crypto.Signer, topic []byte, i uint64, addr swarm.Address, o UploadOptions) (*SocResponse, error) {
ts := make([]byte, 8)
binary.BigEndian.PutUint64(ts, uint64(time.Now().Unix()))
ch, err := cac.New(append(append([]byte{}, ts...), addr.Bytes()...))
if err != nil {
return nil, err
}
ownerHex, err := ownerFromSigner(signer)
if err != nil {
return nil, err
}
index := make([]byte, 8)
binary.BigEndian.PutUint64(index, i)
idBytes, err := crypto.LegacyKeccak256(append(append([]byte{}, topic...), index...))
if err != nil {
return nil, err
}
sch, err := soc.New(idBytes, ch).Sign(signer)
if err != nil {
return nil, err
}
chunkData := sch.Data()
signatureBytes := chunkData[swarm.HashSize : swarm.HashSize+swarm.SocSignatureSize]
id := hex.EncodeToString(idBytes)
sig := hex.EncodeToString(signatureBytes)
res, err := f.client.SOC.UploadSOC(ctx, ownerHex, id, sig, bytes.NewReader(ch.Data()), o.BatchID)
if err != nil {
return nil, err
}
return res, nil
}

// FindUpdate finds the latest update for a feed
func (f *FeedService) FindUpdate(ctx context.Context, signer crypto.Signer, topic []byte, o *DownloadOptions) (*FindFeedUpdateResponse, error) {
ownerHex, err := ownerFromSigner(signer)
if err != nil {
return nil, err
}
topicHex := hex.EncodeToString(topic)
res, header, err := f.client.requestDataGetHeader(ctx, http.MethodGet, "/"+apiVersion+"/feeds/"+ownerHex+"/"+topicHex, nil, o)
if err != nil {
return nil, err
}
defer res.Close()
b, err := io.ReadAll(res)
if err != nil {
return nil, err
}
index, err := strconv.ParseUint(header.Get(swarmFeedIndexHeader), 10, 64)
if err != nil {
return nil, err
}
nextIndex, err := strconv.ParseUint(header.Get(swarmFeedIndexNextHeader), 10, 64)
if err != nil {
return nil, err
}
ref, err := swarm.ParseHexAddress(header.Get(etagHeader))
if err != nil {
return nil, err
}
return &FindFeedUpdateResponse{
SocSignature: header.Get(swarmSocSignatureHeader),
Index: index,
NextIndex: nextIndex,
Data: b,
Reference: ref,
}, nil
}
16 changes: 16 additions & 0 deletions pkg/bee/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethersphere/bee/v2/pkg/crypto"
"github.com/ethersphere/bee/v2/pkg/swarm"
"github.com/ethersphere/beekeeper/pkg/bee/api"
"github.com/ethersphere/beekeeper/pkg/logging"
Expand Down Expand Up @@ -940,3 +941,18 @@ func (c *Client) Withdraw(ctx context.Context, token, addr string, amount int64)

return nil
}

// CreateRootFeedManifest creates an initial root manifest
func (c *Client) CreateRootFeedManifest(ctx context.Context, signer crypto.Signer, topic []byte, o api.UploadOptions) (*api.FeedUploadResponse, error) {
return c.api.Feed.CreateRootManifest(ctx, signer, topic, o)
}

// UpdateFeedWithReference updates a feed with a reference
func (c *Client) UpdateFeedWithReference(ctx context.Context, signer crypto.Signer, topic []byte, i uint64, addr swarm.Address, o api.UploadOptions) (*api.SocResponse, error) {
return c.api.Feed.UpdateWithReference(ctx, signer, topic, i, addr, o)
}

// FindFeedUpdate finds the latest update for a feed
func (c *Client) FindFeedUpdate(ctx context.Context, signer crypto.Signer, topic []byte, o *api.DownloadOptions) (*api.FindFeedUpdateResponse, error) {
return c.api.Feed.FindUpdate(ctx, signer, topic, o)
}
Loading
Loading