Skip to content

Commit

Permalink
feat: feed wrapping (#4677)
Browse files Browse the repository at this point in the history
  • Loading branch information
nugaon authored Sep 19, 2024
1 parent 795bd22 commit 198d41f
Show file tree
Hide file tree
Showing 21 changed files with 456 additions and 162 deletions.
56 changes: 53 additions & 3 deletions openapi/Swarm.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -840,11 +840,12 @@ paths:
$ref: "SwarmCommon.yaml#/components/schemas/HexString"
required: true
description: Signature
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmPinParameter"
- in: header
name: swarm-postage-batch-id
schema:
$ref: "SwarmCommon.yaml#/components/parameters/SwarmPostageBatchId"
required: true
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmPinParameter"
required: false
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmPostageStamp"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmAct"
Expand Down Expand Up @@ -877,6 +878,47 @@ paths:
$ref: "SwarmCommon.yaml#/components/responses/500"
default:
description: Default response
get:
summary: Resolve Single Owner Chunk data
tags:
- Single owner chunk
parameters:
- in: path
name: owner
schema:
$ref: "SwarmCommon.yaml#/components/schemas/EthereumAddress"
required: true
description: Ethereum address of the Owner of the SOC
- in: path
name: id
schema:
$ref: "SwarmCommon.yaml#/components/schemas/HexString"
required: true
description: Arbitrary identifier of the related data
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmOnlyRootChunkParameter"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmCache"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyStrategyParameter"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyFallbackModeParameter"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmChunkRetrievalTimeoutParameter"
responses:
"200":
description: Related Single Owner Chunk data
headers:
"swarm-soc-signature":
$ref: "SwarmCommon.yaml#/components/headers/SwarmSocSignature"
content:
application/octet-stream:
schema:
type: string
format: binary
"400":
$ref: "SwarmCommon.yaml#/components/responses/400"
"401":
$ref: "SwarmCommon.yaml#/components/responses/401"
"500":
$ref: "SwarmCommon.yaml#/components/responses/500"
default:
description: Default response

"/feeds/{owner}/{topic}":
post:
Expand Down Expand Up @@ -961,18 +1003,26 @@ paths:
$ref: "SwarmCommon.yaml#/components/schemas/FeedType"
required: false
description: "Feed indexing scheme (default: sequence)"
- $ref: "SwarmCommon.yaml#/components/headers/SwarmOnlyRootChunkParameter"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmCache"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyStrategyParameter"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyFallbackModeParameter"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmChunkRetrievalTimeoutParameter"
responses:
"200":
description: Latest feed update
headers:
"swarm-soc-signature":
$ref: "SwarmCommon.yaml#/components/headers/SwarmSocSignature"
"swarm-feed-index":
$ref: "SwarmCommon.yaml#/components/headers/SwarmFeedIndex"
"swarm-feed-index-next":
$ref: "SwarmCommon.yaml#/components/headers/SwarmFeedIndexNext"
content:
application/json:
application/octet-stream:
schema:
$ref: "SwarmCommon.yaml#/components/schemas/ReferenceResponse"
type: string
format: binary
"400":
$ref: "SwarmCommon.yaml#/components/responses/400"
"401":
Expand Down
13 changes: 13 additions & 0 deletions openapi/SwarmCommon.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1031,6 +1031,11 @@ components:
schema:
$ref: "#/components/schemas/HexString"

SwarmSocSignature:
description: "Attached digital signature of the Single Owner Chunk"
schema:
$ref: "#/components/schemas/HexString"

SwarmActHistoryAddress:
description: "Swarm address reference to the new ACT history entry"
schema:
Expand Down Expand Up @@ -1136,6 +1141,14 @@ components:
description: >
Specify the timeout for chunk retrieval. The default is 30 seconds.
SwarmOnlyRootChunkParameter:
in: header
name: swarm-only-root-chunk
schema:
type: boolean
required: false
description: "Returns only the root chunk of the content"

ContentTypePreserved:
in: header
name: Content-Type
Expand Down
4 changes: 3 additions & 1 deletion pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,10 @@ const (
SwarmEncryptHeader = "Swarm-Encrypt"
SwarmIndexDocumentHeader = "Swarm-Index-Document"
SwarmErrorDocumentHeader = "Swarm-Error-Document"
SwarmSocSignatureHeader = "Swarm-Soc-Signature"
SwarmFeedIndexHeader = "Swarm-Feed-Index"
SwarmFeedIndexNextHeader = "Swarm-Feed-Index-Next"
SwarmOnlyRootChunk = "Swarm-Only-Root-Chunk"
SwarmCollectionHeader = "Swarm-Collection"
SwarmPostageBatchIdHeader = "Swarm-Postage-Batch-Id"
SwarmPostageStampHeader = "Swarm-Postage-Stamp"
Expand Down Expand Up @@ -520,7 +522,7 @@ func (s *Service) corsHandler(h http.Handler) http.Handler {
allowedHeaders := []string{
"User-Agent", "Accept", "X-Requested-With", "Access-Control-Request-Headers", "Access-Control-Request-Method", "Accept-Ranges", "Content-Encoding",
AuthorizationHeader, AcceptEncodingHeader, ContentTypeHeader, ContentDispositionHeader, RangeHeader, OriginHeader,
SwarmTagHeader, SwarmPinHeader, SwarmEncryptHeader, SwarmIndexDocumentHeader, SwarmErrorDocumentHeader, SwarmCollectionHeader, SwarmPostageBatchIdHeader, SwarmPostageStampHeader, SwarmDeferredUploadHeader, SwarmRedundancyLevelHeader, SwarmRedundancyStrategyHeader, SwarmRedundancyFallbackModeHeader, SwarmChunkRetrievalTimeoutHeader, SwarmLookAheadBufferSizeHeader, SwarmFeedIndexHeader, SwarmFeedIndexNextHeader, GasPriceHeader, GasLimitHeader, ImmutableHeader,
SwarmTagHeader, SwarmPinHeader, SwarmEncryptHeader, SwarmIndexDocumentHeader, SwarmErrorDocumentHeader, SwarmCollectionHeader, SwarmPostageBatchIdHeader, SwarmPostageStampHeader, SwarmDeferredUploadHeader, SwarmRedundancyLevelHeader, SwarmRedundancyStrategyHeader, SwarmRedundancyFallbackModeHeader, SwarmChunkRetrievalTimeoutHeader, SwarmLookAheadBufferSizeHeader, SwarmFeedIndexHeader, SwarmFeedIndexNextHeader, SwarmSocSignatureHeader, SwarmOnlyRootChunk, GasPriceHeader, GasLimitHeader, ImmutableHeader,
}
allowedHeadersStr := strings.Join(allowedHeaders, ", ")

Expand Down
2 changes: 1 addition & 1 deletion pkg/api/bytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (s *Service) bytesGetHandler(w http.ResponseWriter, r *http.Request) {
ContentTypeHeader: {"application/octet-stream"},
}

s.downloadHandler(logger, w, r, address, additionalHeaders, true, false)
s.downloadHandler(logger, w, r, address, additionalHeaders, true, false, nil)
}

func (s *Service) bytesHeadHandler(w http.ResponseWriter, r *http.Request) {
Expand Down
22 changes: 17 additions & 5 deletions pkg/api/bzz.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethersphere/bee/v2/pkg/accesscontrol"
"github.com/ethersphere/bee/v2/pkg/feeds"
"github.com/ethersphere/bee/v2/pkg/file"
"github.com/ethersphere/bee/v2/pkg/file/joiner"
"github.com/ethersphere/bee/v2/pkg/file/loadsave"
"github.com/ethersphere/bee/v2/pkg/file/redundancy"
Expand Down Expand Up @@ -421,14 +422,17 @@ FETCH:
jsonhttp.NotFound(w, "no update found")
return
}
ref, _, err := parseFeedUpdate(ch)
wc, err := feeds.GetWrappedChunk(ctx, s.storer.ChunkStore(), ch)
if err != nil {
logger.Debug("bzz download: mapStructure feed update failed", "error", err)
logger.Error(nil, "bzz download: mapStructure feed update failed")
jsonhttp.InternalServerError(w, "mapStructure feed update")
return
}
address = ref
address = wc.Address()
// modify ls and init with non-existing wrapped chunk
ls = loadsave.NewReadonlyWithRootCh(s.storer.Download(cache), wc)

feedDereferenced = true
curBytes, err := cur.MarshalBinary()
if err != nil {
Expand Down Expand Up @@ -550,11 +554,11 @@ func (s *Service) serveManifestEntry(
additionalHeaders[ContentTypeHeader] = []string{mimeType}
}

s.downloadHandler(logger, w, r, manifestEntry.Reference(), additionalHeaders, etag, headersOnly)
s.downloadHandler(logger, w, r, manifestEntry.Reference(), additionalHeaders, etag, headersOnly, nil)
}

// downloadHandler contains common logic for downloading Swarm file from API
func (s *Service) downloadHandler(logger log.Logger, w http.ResponseWriter, r *http.Request, reference swarm.Address, additionalHeaders http.Header, etag, headersOnly bool) {
func (s *Service) downloadHandler(logger log.Logger, w http.ResponseWriter, r *http.Request, reference swarm.Address, additionalHeaders http.Header, etag, headersOnly bool, rootCh swarm.Chunk) {
headers := struct {
Strategy *getter.Strategy `map:"Swarm-Redundancy-Strategy"`
FallbackMode *bool `map:"Swarm-Redundancy-Fallback-Mode"`
Expand All @@ -580,7 +584,15 @@ func (s *Service) downloadHandler(logger log.Logger, w http.ResponseWriter, r *h
return
}

reader, l, err := joiner.New(ctx, s.storer.Download(cache), s.storer.Cache(), reference)
var (
reader file.Joiner
l int64
)
if rootCh != nil {
reader, l, err = joiner.NewJoiner(ctx, s.storer.Download(cache), s.storer.Cache(), reference, rootCh)
} else {
reader, l, err = joiner.New(ctx, s.storer.Download(cache), s.storer.Cache(), reference)
}
if err != nil {
if errors.Is(err, storage.ErrNotFound) || errors.Is(err, topology.ErrNotFound) {
logger.Debug("api download: not found ", "address", reference, "error", err)
Expand Down
72 changes: 41 additions & 31 deletions pkg/api/feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
package api

import (
"encoding/binary"
"bytes"
"encoding/hex"
"errors"
"fmt"
"io"
"net/http"
"strconv"
"strings"
"time"

"github.com/ethereum/go-ethereum/common"
Expand All @@ -34,8 +36,6 @@ const (
feedMetadataEntryType = "swarm-feed-type"
)

var errInvalidFeedUpdate = errors.New("invalid feed update")

type feedReferenceResponse struct {
Reference swarm.Address `json:"reference"`
}
Expand Down Expand Up @@ -64,6 +64,14 @@ func (s *Service) feedGetHandler(w http.ResponseWriter, r *http.Request) {
queries.At = time.Now().Unix()
}

headers := struct {
OnlyRootChunk bool `map:"Swarm-Only-Root-Chunk"`
}{}
if response := s.mapStructure(r.Header, &headers); response != nil {
response("invalid header params", logger, w)
return
}

f := feeds.New(paths.Topic, paths.Owner)
lookup, err := s.feedFactory.NewLookup(feeds.Sequence, f)
if err != nil {
Expand Down Expand Up @@ -94,11 +102,10 @@ func (s *Service) feedGetHandler(w http.ResponseWriter, r *http.Request) {
return
}

ref, _, err := parseFeedUpdate(ch)
wc, err := feeds.GetWrappedChunk(r.Context(), s.storer.ChunkStore(), ch)
if err != nil {
logger.Debug("mapStructure feed update failed", "error", err)
logger.Error(nil, "mapStructure feed update failed")
jsonhttp.InternalServerError(w, "mapStructure feed update failed")
logger.Error(nil, "wrapped chunk cannot be retrieved")
jsonhttp.NotFound(w, "wrapped chunk cannot be retrieved")
return
}

Expand All @@ -118,11 +125,33 @@ func (s *Service) feedGetHandler(w http.ResponseWriter, r *http.Request) {
return
}

w.Header().Set(SwarmFeedIndexHeader, hex.EncodeToString(curBytes))
w.Header().Set(SwarmFeedIndexNextHeader, hex.EncodeToString(nextBytes))
w.Header().Set("Access-Control-Expose-Headers", fmt.Sprintf("%s, %s", SwarmFeedIndexHeader, SwarmFeedIndexNextHeader))
socCh, err := soc.FromChunk(ch)
if err != nil {
logger.Error(nil, "wrapped chunk cannot be retrieved")
jsonhttp.NotFound(w, "wrapped chunk cannot be retrieved")
return
}
sig := socCh.Signature()

additionalHeaders := http.Header{
ContentTypeHeader: {"application/octet-stream"},
SwarmFeedIndexHeader: {hex.EncodeToString(curBytes)},
SwarmFeedIndexNextHeader: {hex.EncodeToString(nextBytes)},
SwarmSocSignatureHeader: {hex.EncodeToString(sig)},
"Access-Control-Expose-Headers": {SwarmFeedIndexHeader, SwarmFeedIndexNextHeader, SwarmSocSignatureHeader},
}

if headers.OnlyRootChunk {
w.Header().Set(ContentLengthHeader, strconv.Itoa(len(wc.Data())))
// include additional headers
for name, values := range additionalHeaders {
w.Header().Set(name, strings.Join(values, ", "))
}
_, _ = io.Copy(w, bytes.NewReader(wc.Data()))
return
}

jsonhttp.OK(w, feedReferenceResponse{Reference: ref})
s.downloadHandler(logger, w, r, wc.Address(), additionalHeaders, true, false, wc)
}

func (s *Service) feedPostHandler(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -278,22 +307,3 @@ func (s *Service) feedPostHandler(w http.ResponseWriter, r *http.Request) {

jsonhttp.Created(w, feedReferenceResponse{Reference: encryptedReference})
}

func parseFeedUpdate(ch swarm.Chunk) (swarm.Address, int64, error) {
s, err := soc.FromChunk(ch)
if err != nil {
return swarm.ZeroAddress, 0, fmt.Errorf("soc unmarshal: %w", err)
}

update := s.WrappedChunk().Data()
// split the timestamp and reference
// possible values right now:
// unencrypted ref: span+timestamp+ref => 8+8+32=48
// encrypted ref: span+timestamp+ref+decryptKey => 8+8+64=80
if len(update) != 48 && len(update) != 80 {
return swarm.ZeroAddress, 0, errInvalidFeedUpdate
}
ts := binary.BigEndian.Uint64(update[8:16])
ref := swarm.NewAddress(update[16:])
return ref, int64(ts), nil
}
Loading

0 comments on commit 198d41f

Please sign in to comment.