Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/act #408

Merged
merged 4 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,16 @@ checks:
options:
timeout: 5m
type: pingpong
act:
options:
file-name: act
file-size: 1024
postage-depth: 20
postage-amount: 420000000
postage-label: act-label
seed: 0
timeout: 5m
type: act
withdraw:
options:
target-address: 0xec44cb15b1b033e74d55ac5d0e24d861bde54532
Expand Down
9 changes: 9 additions & 0 deletions config/local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,15 @@ bee-configs:

# CI checks
checks:
ci-act:
options:
file-size: 1024
postage-depth: 20
postage-amount: 420000000
postage-label: act-label
seed: 0
timeout: 5m
type: act
ci-full-connectivity:
timeout: 5m
type: full-connectivity
Expand Down
63 changes: 63 additions & 0 deletions pkg/bee/api/act.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package api

import (
"context"
"io"
"net/http"
"net/url"

"github.com/ethersphere/bee/pkg/swarm"
)

type ActService service

type ActUploadResponse struct {
Reference swarm.Address `json:"reference"`
HistoryAddress swarm.Address
}

type ActGranteesResponse struct {
Reference swarm.Address `json:"ref"`
HistoryAddress swarm.Address `json:"historyref"`
}

func (a *ActService) Download(ctx context.Context, addr swarm.Address, opts *DownloadOptions) (resp io.ReadCloser, err error) {
return a.client.requestData(ctx, http.MethodGet, "/"+apiVersion+"/bzz/"+addr.String()+"/", nil, opts)
}

func (a *ActService) Upload(ctx context.Context, name string, data io.Reader, o UploadOptions) (ActUploadResponse, error) {
var resp ActUploadResponse
h := http.Header{}
h.Add(postageStampBatchHeader, o.BatchID)
h.Add("swarm-deferred-upload", "true")
h.Add("content-type", "application/octet-stream")
h.Add("Swarm-Act", "true")
h.Add(swarmPinHeader, "true")
historyParser := func(h http.Header) {
resp.HistoryAddress, _ = swarm.ParseHexAddress(h.Get("Swarm-Act-History-Address"))
}
err := a.client.requestWithHeader(ctx, http.MethodPost, "/"+apiVersion+"/bzz?"+url.QueryEscape("name="+name), h, data, &resp, historyParser)
return resp, err
}

func (a *ActService) AddGrantees(ctx context.Context, data io.Reader, o UploadOptions) (ActGranteesResponse, error) {
var resp ActGranteesResponse
h := http.Header{}
h.Add(postageStampBatchHeader, o.BatchID)
h.Add(swarmActHistoryAddress, o.ActHistoryAddress.String())
err := a.client.requestWithHeader(ctx, http.MethodPost, "/"+apiVersion+"/grantee", h, data, &resp)
return resp, err
}

func (a *ActService) GetGrantees(ctx context.Context, addr swarm.Address) (resp io.ReadCloser, err error) {
return a.client.requestData(ctx, http.MethodGet, "/"+apiVersion+"/grantee/"+addr.String(), nil, nil)
}

func (a *ActService) PatchGrantees(ctx context.Context, data io.Reader, addr swarm.Address, haddr swarm.Address, batchID string) (ActGranteesResponse, error) {
var resp ActGranteesResponse
h := http.Header{}
h.Add("swarm-postage-batch-id", batchID)
h.Add("swarm-act-history-address", haddr.String())
err := a.client.requestWithHeader(ctx, http.MethodPatch, "/"+apiVersion+"/grantee/"+addr.String(), h, data, &resp)
return resp, err
}
48 changes: 37 additions & 11 deletions pkg/bee/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,19 @@ import (
"strconv"
"strings"

"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/beekeeper"
)

const (
apiVersion = "v1"
contentType = "application/json; charset=utf-8"
contentType = "application/json, text/plain, */*; charset=utf-8"
postageStampBatchHeader = "Swarm-Postage-Batch-Id"
deferredUploadHeader = "Swarm-Deferred-Upload"
swarmAct = "Swarm-Act"
swarmActHistoryAddress = "Swarm-Act-History-Address"
swarmActPublisher = "Swarm-Act-Publisher"
swarmActTimestamp = "Swarm-Act-Timestamp"
swarmPinHeader = "Swarm-Pin"
swarmTagHeader = "Swarm-Tag"
swarmCacheDownloadHeader = "Swarm-Cache"
Expand All @@ -33,6 +38,7 @@ type Client struct {
service service // Reuse a single struct instead of allocating one for each service on the heap.

// Services that API provides.
Act *ActService
Bytes *BytesService
Chunks *ChunksService
Files *FilesService
Expand Down Expand Up @@ -71,6 +77,7 @@ func NewClient(baseURL *url.URL, o *ClientOptions) (c *Client) {
func newClient(httpClient *http.Client) (c *Client) {
c = &Client{httpClient: httpClient}
c.service.client = c
c.Act = (*ActService)(&c.service)
c.Bytes = (*BytesService)(&c.service)
c.Chunks = (*ChunksService)(&c.service)
c.Files = (*FilesService)(&c.service)
Expand Down Expand Up @@ -179,14 +186,28 @@ func (c *Client) requestData(ctx context.Context, method, path string, body io.R
req.Header.Set("Content-Type", contentType)
}
req.Header.Set("Accept", contentType)
// ACT
if opts != nil {
if opts.Act != nil {
req.Header.Set(swarmAct, strconv.FormatBool(*opts.Act))
}
if opts.ActHistoryAddress != nil {
req.Header.Set(swarmActHistoryAddress, (*opts.ActHistoryAddress).String())
}
if opts.ActPublicKey != nil {
req.Header.Set(swarmActPublisher, (*opts.ActPublicKey).String())
}
if opts.ActTimestamp != nil {
req.Header.Set(swarmActTimestamp, strconv.FormatUint(*opts.ActTimestamp, 10))
}
}

if opts != nil && opts.Cache != nil {
req.Header.Set(swarmCacheDownloadHeader, strconv.FormatBool(*opts.Cache))
}
if opts != nil && opts.RedundancyFallbackMode != nil {
req.Header.Set(swarmRedundancyFallbackMode, strconv.FormatBool(*opts.RedundancyFallbackMode))
}

r, err := c.httpClient.Do(req)
if err != nil {
return nil, err
Expand All @@ -200,7 +221,7 @@ func (c *Client) requestData(ctx context.Context, method, path string, body io.R
}

// requestWithHeader handles the HTTP request response cycle.
func (c *Client) requestWithHeader(ctx context.Context, method, path string, header http.Header, body io.Reader, v interface{}) (err error) {
func (c *Client) requestWithHeader(ctx context.Context, method, path string, header http.Header, body io.Reader, v interface{}, headerParser ...func(http.Header)) (err error) {
req, err := http.NewRequest(method, path, body)
if err != nil {
return err
Expand All @@ -215,12 +236,11 @@ func (c *Client) requestWithHeader(ctx context.Context, method, path string, hea
return err
}

if err = responseErrorHandler(r); err != nil {
return err
}

if v != nil && strings.Contains(r.Header.Get("Content-Type"), "application/json") {
_ = json.NewDecoder(r.Body).Decode(&v)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use err = json.NewDecoder(r.Body).Decode(&v) and return if not nil , else we do the for and return nil , right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is done in responseErrorHandler

for _, parser := range headerParser {
parser(r.Header)
}
return err
}

Expand Down Expand Up @@ -292,13 +312,19 @@ func (f roundTripperFunc) RoundTrip(r *http.Request) (*http.Response, error) {
}

type UploadOptions struct {
Pin bool
Tag uint64
BatchID string
Direct bool
Act bool
Pin bool
Tag uint64
BatchID string
Direct bool
ActHistoryAddress swarm.Address
}

type DownloadOptions struct {
Act *bool
ActHistoryAddress *swarm.Address
ActPublicKey *swarm.Address
ActTimestamp *uint64
Cache *bool
RedundancyFallbackMode *bool
}
65 changes: 65 additions & 0 deletions pkg/bee/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -223,6 +224,21 @@ func (c *Client) DownloadFile(ctx context.Context, a swarm.Address, opts *api.Do
return size, h.Sum(nil), nil
}

func (c *Client) DownloadActFile(ctx context.Context, a swarm.Address, opts *api.DownloadOptions) (size int64, hash []byte, err error) {
r, err := c.api.Act.Download(ctx, a, opts)
if err != nil {
return 0, nil, fmt.Errorf("download file %s: %w", a, err)
}

h := fileHasher()
size, err = io.Copy(h, r)
if err != nil {
return 0, nil, fmt.Errorf("download file %s, hashing copy: %w", a, err)
}

return size, h.Sum(nil), nil
}

// HasChunk returns true/false if node has a chunk
func (c *Client) HasChunk(ctx context.Context, a swarm.Address) (bool, error) {
return c.api.Node.HasChunk(ctx, a)
Expand Down Expand Up @@ -750,6 +766,55 @@ func (c *Client) UploadFile(ctx context.Context, f *File, o api.UploadOptions) (
return
}

func (c *Client) UploadActFile(ctx context.Context, f *File, o api.UploadOptions) (err error) {
h := fileHasher()
r, err := c.api.Act.Upload(ctx, f.Name(), io.TeeReader(f.DataReader(), h), o)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we have also some defer r.Close()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

r is ActUploadResponse struct

if err != nil {
return fmt.Errorf("upload ACT file: %w", err)
}

f.SetAddress(r.Reference)
f.SetHistroryAddress(r.HistoryAddress)
f.SetHash(h.Sum(nil))

return nil
}

func (c *Client) AddActGrantees(ctx context.Context, f *File, o api.UploadOptions) (err error) {
h := fileHasher()
r, err := c.api.Act.AddGrantees(ctx, io.TeeReader(f.DataReader(), h), o)
if err != nil {
return fmt.Errorf("add ACT grantees: %w", err)
}

f.SetAddress(r.Reference)
f.SetHistroryAddress(r.HistoryAddress)
f.SetHash(h.Sum(nil))

return nil
}

func (c *Client) GetActGrantees(ctx context.Context, a swarm.Address) (addresses []string, err error) {
r, e := c.api.Act.GetGrantees(ctx, a)
if e != nil {
return nil, fmt.Errorf("get grantees: %s: %w", a, e)
}
defer r.Close()
err = json.NewDecoder(r).Decode(&addresses)
return addresses, err
}

func (c *Client) PatchActGrantees(ctx context.Context, pf *File, addr swarm.Address, haddr swarm.Address, batchID string) (err error) {
r, err := c.api.Act.PatchGrantees(ctx, pf.DataReader(), addr, haddr, batchID)
if err != nil {
return fmt.Errorf("add ACT grantees: %w", err)
}

pf.SetAddress(r.Reference)
pf.SetHistroryAddress(r.HistoryAddress)
return nil
}

// UploadCollection uploads TAR collection bytes to the node
func (c *Client) UploadCollection(ctx context.Context, f *File, o api.UploadOptions) (err error) {
h := fileHasher()
Expand Down
19 changes: 14 additions & 5 deletions pkg/bee/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ import (

// File represents Bee file
type File struct {
address swarm.Address
name string
hash []byte
dataReader io.Reader
size int64
address swarm.Address
name string
hash []byte
dataReader io.Reader
size int64
historyAddress swarm.Address
}

// NewRandomFile returns new pseudorandom file
Expand Down Expand Up @@ -62,6 +63,10 @@ func (f *File) Address() swarm.Address {
return f.address
}

func (f *File) HistroryAddress() swarm.Address {
return f.historyAddress
}

// Name returns file's name
func (f *File) Name() string {
return f.name
Expand Down Expand Up @@ -109,6 +114,10 @@ func (f *File) SetAddress(a swarm.Address) {
f.address = a
}

func (f *File) SetHistroryAddress(a swarm.Address) {
f.historyAddress = a
}

func (f *File) SetHash(h []byte) {
f.hash = h
}
Expand Down
Loading