Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: feed wrapping #4677

Merged
merged 44 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
20c8878
chore: add todo for bzz
nugaon May 15, 2024
6332838
feat: getWrappedChunk with legacy payload handling
nugaon May 15, 2024
8c14c90
feat: add content length header for feeds path
nugaon May 15, 2024
8515af0
feat: remove payload structure for feeds
nugaon May 15, 2024
33f7b8b
chore: indicating bootstrap fixme
nugaon May 15, 2024
7edec9e
chore: update call arguments
nugaon May 15, 2024
d93cece
test: adjust feed testing
nugaon May 15, 2024
e0923f2
feat: return chunk payload on api
nugaon May 15, 2024
b6d382e
test: feed api
nugaon May 15, 2024
df01783
test: correct expected hashes
nugaon May 22, 2024
abe6896
feat: joiner and loadsaver with rootch init
nugaon May 22, 2024
ef05e2e
test: chunk wrapping and wrong legacy payload resolution
nugaon May 22, 2024
eccb5e4
test: changed feed response
nugaon May 22, 2024
a7a6e1e
chore: remove invalidFeedUpdate error
nugaon May 22, 2024
c768d88
test: make and copy wrappedRef
nugaon May 23, 2024
62acd85
fix: remove payload time related logic
nugaon May 23, 2024
c7ddac9
feat: give back whole data
nugaon May 24, 2024
dae4346
chore: resolve linter issues
nugaon May 25, 2024
27372fb
test: deactivate epoch based finder test
nugaon May 27, 2024
652a89e
feat: generate mock soc with span
nugaon May 27, 2024
7989e4d
test: resolve content that takes more than one chunk
nugaon May 27, 2024
d114125
chore: remove generated file
nugaon May 28, 2024
b8d6ee2
fix: epoch finder
nugaon May 28, 2024
1a11337
feat: only wrapped chunk
nugaon May 29, 2024
381f7bc
test: only wrapped chunk
nugaon May 29, 2024
c06cc24
refactor: only wrapped chunk to only root chunk
nugaon May 29, 2024
5215af9
docs: only root chunk
nugaon May 29, 2024
e8c2ff6
docs: add remaining redundancy related header options
nugaon May 29, 2024
0126076
fix: epoch ts def
nugaon May 29, 2024
4e1aea7
test: parallel
nugaon May 29, 2024
061bbca
fix: typeo on additional header set
nugaon Jun 6, 2024
ea0dfb3
refactor: remove joiner wrapper
nugaon Jun 6, 2024
9d99af1
feat: soc get api
nugaon Jun 6, 2024
df16c0c
fix: return on error and chunk get
nugaon Jun 7, 2024
7b15219
feat: swarm soc signature header in feed endpoint
nugaon Jun 27, 2024
f2cc8d8
refactor: swarm signature header
nugaon Jun 27, 2024
f4c117f
docs: update openapi
nugaon Jul 1, 2024
3d7de7c
fix: whitespace check
nugaon Jul 1, 2024
a6611df
docs: fix method namings
nugaon Sep 5, 2024
85a924f
Merge branch 'master' into feat/feed-wrapping
nugaon Sep 16, 2024
caed975
fix: params after merge
nugaon Sep 16, 2024
c347904
docs: fix duplicate keys
nugaon Sep 16, 2024
994bc2c
feat: add new headers to allowedHeaders
nugaon Sep 19, 2024
ba593c0
docs: typeo
nugaon Sep 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 53 additions & 3 deletions openapi/Swarm.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -840,11 +840,12 @@ paths:
$ref: "SwarmCommon.yaml#/components/schemas/HexString"
required: true
description: Signature
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmPinParameter"
- in: header
name: swarm-postage-batch-id
schema:
$ref: "SwarmCommon.yaml#/components/parameters/SwarmPostageBatchId"
required: true
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmPinParameter"
required: false
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmPostageStamp"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmAct"
Expand Down Expand Up @@ -877,6 +878,47 @@ paths:
$ref: "SwarmCommon.yaml#/components/responses/500"
default:
description: Default response
get:
summary: Resolve Single Owner Chunk data
tags:
- Single owner chunk
parameters:
- in: path
name: owner
schema:
$ref: "SwarmCommon.yaml#/components/schemas/EthereumAddress"
required: true
description: Ethereum address of the Owner of the SOC
- in: path
name: id
schema:
$ref: "SwarmCommon.yaml#/components/schemas/HexString"
required: true
description: Arbitrary identifier of the related data
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmOnlyRootChunkParameter"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmCache"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyStrategyParameter"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyFallbackModeParameter"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmChunkRetrievalTimeoutParameter"
responses:
"200":
description: Related Single Owner Chunk data
headers:
"swarm-soc-signature":
$ref: "SwarmCommon.yaml#/components/headers/SwarmSocSignature"
content:
application/octet-stream:
schema:
type: string
format: binary
"400":
$ref: "SwarmCommon.yaml#/components/responses/400"
"401":
$ref: "SwarmCommon.yaml#/components/responses/401"
"500":
$ref: "SwarmCommon.yaml#/components/responses/500"
default:
description: Default response

"/feeds/{owner}/{topic}":
post:
Expand Down Expand Up @@ -961,18 +1003,26 @@ paths:
$ref: "SwarmCommon.yaml#/components/schemas/FeedType"
required: false
description: "Feed indexing scheme (default: sequence)"
- $ref: "SwarmCommon.yaml#/components/headers/SwarmOnlyRootChunkParameter"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmCache"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyStrategyParameter"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyFallbackModeParameter"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmChunkRetrievalTimeoutParameter"
responses:
"200":
description: Latest feed update
headers:
"swarm-soc-signature":
$ref: "SwarmCommon.yaml#/components/headers/SwarmSocSignature"
"swarm-feed-index":
$ref: "SwarmCommon.yaml#/components/headers/SwarmFeedIndex"
"swarm-feed-index-next":
$ref: "SwarmCommon.yaml#/components/headers/SwarmFeedIndexNext"
content:
application/json:
application/octet-stream:
schema:
$ref: "SwarmCommon.yaml#/components/schemas/ReferenceResponse"
type: string
format: binary
"400":
$ref: "SwarmCommon.yaml#/components/responses/400"
"401":
Expand Down
13 changes: 13 additions & 0 deletions openapi/SwarmCommon.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1031,6 +1031,11 @@ components:
schema:
$ref: "#/components/schemas/HexString"

SwarmSocSignature:
description: "Attached digital signature of the Single Owner Chunk"
schema:
$ref: "#/components/schemas/HexString"

SwarmActHistoryAddress:
description: "Swarm address reference to the new ACT history entry"
schema:
Expand Down Expand Up @@ -1136,6 +1141,14 @@ components:
description: >
Specify the timeout for chunk retrieval. The default is 30 seconds.

SwarmOnlyRootChunkParameter:
in: header
name: swarm-only-root-chunk
schema:
type: boolean
required: false
description: "Returns only the root chunk of the content"

ContentTypePreserved:
in: header
name: Content-Type
Expand Down
4 changes: 3 additions & 1 deletion pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,10 @@ const (
SwarmEncryptHeader = "Swarm-Encrypt"
SwarmIndexDocumentHeader = "Swarm-Index-Document"
SwarmErrorDocumentHeader = "Swarm-Error-Document"
SwarmSocSignatureHeader = "Swarm-Soc-Signature"
SwarmFeedIndexHeader = "Swarm-Feed-Index"
SwarmFeedIndexNextHeader = "Swarm-Feed-Index-Next"
SwarmOnlyRootChunk = "Swarm-Only-Root-Chunk"
SwarmCollectionHeader = "Swarm-Collection"
SwarmPostageBatchIdHeader = "Swarm-Postage-Batch-Id"
SwarmPostageStampHeader = "Swarm-Postage-Stamp"
Expand Down Expand Up @@ -520,7 +522,7 @@ func (s *Service) corsHandler(h http.Handler) http.Handler {
allowedHeaders := []string{
"User-Agent", "Accept", "X-Requested-With", "Access-Control-Request-Headers", "Access-Control-Request-Method", "Accept-Ranges", "Content-Encoding",
AuthorizationHeader, AcceptEncodingHeader, ContentTypeHeader, ContentDispositionHeader, RangeHeader, OriginHeader,
SwarmTagHeader, SwarmPinHeader, SwarmEncryptHeader, SwarmIndexDocumentHeader, SwarmErrorDocumentHeader, SwarmCollectionHeader, SwarmPostageBatchIdHeader, SwarmPostageStampHeader, SwarmDeferredUploadHeader, SwarmRedundancyLevelHeader, SwarmRedundancyStrategyHeader, SwarmRedundancyFallbackModeHeader, SwarmChunkRetrievalTimeoutHeader, SwarmLookAheadBufferSizeHeader, SwarmFeedIndexHeader, SwarmFeedIndexNextHeader, GasPriceHeader, GasLimitHeader, ImmutableHeader,
SwarmTagHeader, SwarmPinHeader, SwarmEncryptHeader, SwarmIndexDocumentHeader, SwarmErrorDocumentHeader, SwarmCollectionHeader, SwarmPostageBatchIdHeader, SwarmPostageStampHeader, SwarmDeferredUploadHeader, SwarmRedundancyLevelHeader, SwarmRedundancyStrategyHeader, SwarmRedundancyFallbackModeHeader, SwarmChunkRetrievalTimeoutHeader, SwarmLookAheadBufferSizeHeader, SwarmFeedIndexHeader, SwarmFeedIndexNextHeader, SwarmSocSignatureHeader, SwarmOnlyRootChunk, GasPriceHeader, GasLimitHeader, ImmutableHeader,
}
allowedHeadersStr := strings.Join(allowedHeaders, ", ")

Expand Down
2 changes: 1 addition & 1 deletion pkg/api/bytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (s *Service) bytesGetHandler(w http.ResponseWriter, r *http.Request) {
ContentTypeHeader: {"application/octet-stream"},
}

s.downloadHandler(logger, w, r, address, additionalHeaders, true, false)
s.downloadHandler(logger, w, r, address, additionalHeaders, true, false, nil)
}

func (s *Service) bytesHeadHandler(w http.ResponseWriter, r *http.Request) {
Expand Down
22 changes: 17 additions & 5 deletions pkg/api/bzz.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethersphere/bee/v2/pkg/accesscontrol"
"github.com/ethersphere/bee/v2/pkg/feeds"
"github.com/ethersphere/bee/v2/pkg/file"
"github.com/ethersphere/bee/v2/pkg/file/joiner"
"github.com/ethersphere/bee/v2/pkg/file/loadsave"
"github.com/ethersphere/bee/v2/pkg/file/redundancy"
Expand Down Expand Up @@ -421,14 +422,17 @@ FETCH:
jsonhttp.NotFound(w, "no update found")
return
}
ref, _, err := parseFeedUpdate(ch)
wc, err := feeds.GetWrappedChunk(ctx, s.storer.ChunkStore(), ch)
if err != nil {
logger.Debug("bzz download: mapStructure feed update failed", "error", err)
logger.Error(nil, "bzz download: mapStructure feed update failed")
jsonhttp.InternalServerError(w, "mapStructure feed update")
return
}
address = ref
address = wc.Address()
// modify ls and init with non-existing wrapped chunk
ls = loadsave.NewReadonlyWithRootCh(s.storer.Download(cache), wc)

feedDereferenced = true
curBytes, err := cur.MarshalBinary()
if err != nil {
Expand Down Expand Up @@ -550,11 +554,11 @@ func (s *Service) serveManifestEntry(
additionalHeaders[ContentTypeHeader] = []string{mimeType}
}

s.downloadHandler(logger, w, r, manifestEntry.Reference(), additionalHeaders, etag, headersOnly)
s.downloadHandler(logger, w, r, manifestEntry.Reference(), additionalHeaders, etag, headersOnly, nil)
}

// downloadHandler contains common logic for downloading Swarm file from API
func (s *Service) downloadHandler(logger log.Logger, w http.ResponseWriter, r *http.Request, reference swarm.Address, additionalHeaders http.Header, etag, headersOnly bool) {
func (s *Service) downloadHandler(logger log.Logger, w http.ResponseWriter, r *http.Request, reference swarm.Address, additionalHeaders http.Header, etag, headersOnly bool, rootCh swarm.Chunk) {
headers := struct {
Strategy *getter.Strategy `map:"Swarm-Redundancy-Strategy"`
FallbackMode *bool `map:"Swarm-Redundancy-Fallback-Mode"`
Expand All @@ -580,7 +584,15 @@ func (s *Service) downloadHandler(logger log.Logger, w http.ResponseWriter, r *h
return
}

reader, l, err := joiner.New(ctx, s.storer.Download(cache), s.storer.Cache(), reference)
var (
reader file.Joiner
l int64
)
if rootCh != nil {
reader, l, err = joiner.NewJoiner(ctx, s.storer.Download(cache), s.storer.Cache(), reference, rootCh)
} else {
reader, l, err = joiner.New(ctx, s.storer.Download(cache), s.storer.Cache(), reference)
}
if err != nil {
if errors.Is(err, storage.ErrNotFound) || errors.Is(err, topology.ErrNotFound) {
logger.Debug("api download: not found ", "address", reference, "error", err)
Expand Down
72 changes: 41 additions & 31 deletions pkg/api/feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
package api

import (
"encoding/binary"
"bytes"
"encoding/hex"
"errors"
"fmt"
"io"
"net/http"
"strconv"
"strings"
"time"

"github.com/ethereum/go-ethereum/common"
Expand All @@ -34,8 +36,6 @@ const (
feedMetadataEntryType = "swarm-feed-type"
)

var errInvalidFeedUpdate = errors.New("invalid feed update")

type feedReferenceResponse struct {
Reference swarm.Address `json:"reference"`
}
Expand Down Expand Up @@ -64,6 +64,14 @@ func (s *Service) feedGetHandler(w http.ResponseWriter, r *http.Request) {
queries.At = time.Now().Unix()
}

headers := struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we keep the queries above with At and After ? Also bellow there is a lookup using this queries.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it may be needed for epoch based feeds.
I created an issue about that #4830

OnlyRootChunk bool `map:"Swarm-Only-Root-Chunk"`
}{}
if response := s.mapStructure(r.Header, &headers); response != nil {
response("invalid header params", logger, w)
return
}

f := feeds.New(paths.Topic, paths.Owner)
lookup, err := s.feedFactory.NewLookup(feeds.Sequence, f)
if err != nil {
Expand Down Expand Up @@ -94,11 +102,10 @@ func (s *Service) feedGetHandler(w http.ResponseWriter, r *http.Request) {
return
}

ref, _, err := parseFeedUpdate(ch)
wc, err := feeds.GetWrappedChunk(r.Context(), s.storer.ChunkStore(), ch)
if err != nil {
logger.Debug("mapStructure feed update failed", "error", err)
logger.Error(nil, "mapStructure feed update failed")
jsonhttp.InternalServerError(w, "mapStructure feed update failed")
logger.Error(nil, "wrapped chunk cannot be retrieved")
jsonhttp.NotFound(w, "wrapped chunk cannot be retrieved")
return
}

Expand All @@ -118,11 +125,33 @@ func (s *Service) feedGetHandler(w http.ResponseWriter, r *http.Request) {
return
}

w.Header().Set(SwarmFeedIndexHeader, hex.EncodeToString(curBytes))
w.Header().Set(SwarmFeedIndexNextHeader, hex.EncodeToString(nextBytes))
w.Header().Set("Access-Control-Expose-Headers", fmt.Sprintf("%s, %s", SwarmFeedIndexHeader, SwarmFeedIndexNextHeader))
socCh, err := soc.FromChunk(ch)
if err != nil {
logger.Error(nil, "wrapped chunk cannot be retrieved")
jsonhttp.NotFound(w, "wrapped chunk cannot be retrieved")
return
}
sig := socCh.Signature()

additionalHeaders := http.Header{
ContentTypeHeader: {"application/octet-stream"},
SwarmFeedIndexHeader: {hex.EncodeToString(curBytes)},
SwarmFeedIndexNextHeader: {hex.EncodeToString(nextBytes)},
SwarmSocSignatureHeader: {hex.EncodeToString(sig)},
"Access-Control-Expose-Headers": {SwarmFeedIndexHeader, SwarmFeedIndexNextHeader, SwarmSocSignatureHeader},
}

if headers.OnlyRootChunk {
w.Header().Set(ContentLengthHeader, strconv.Itoa(len(wc.Data())))
// include additional headers
for name, values := range additionalHeaders {
w.Header().Set(name, strings.Join(values, ", "))
}
_, _ = io.Copy(w, bytes.NewReader(wc.Data()))
return
}

jsonhttp.OK(w, feedReferenceResponse{Reference: ref})
s.downloadHandler(logger, w, r, wc.Address(), additionalHeaders, true, false, wc)
}

func (s *Service) feedPostHandler(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -278,22 +307,3 @@ func (s *Service) feedPostHandler(w http.ResponseWriter, r *http.Request) {

jsonhttp.Created(w, feedReferenceResponse{Reference: encryptedReference})
}

func parseFeedUpdate(ch swarm.Chunk) (swarm.Address, int64, error) {
s, err := soc.FromChunk(ch)
if err != nil {
return swarm.ZeroAddress, 0, fmt.Errorf("soc unmarshal: %w", err)
}

update := s.WrappedChunk().Data()
// split the timestamp and reference
// possible values right now:
// unencrypted ref: span+timestamp+ref => 8+8+32=48
// encrypted ref: span+timestamp+ref+decryptKey => 8+8+64=80
if len(update) != 48 && len(update) != 80 {
return swarm.ZeroAddress, 0, errInvalidFeedUpdate
}
ts := binary.BigEndian.Uint64(update[8:16])
ref := swarm.NewAddress(update[16:])
return ref, int64(ts), nil
}
Loading
Loading