Skip to content

Commit

Permalink
revert: feed wrapping (#4677) commit 198d41f
Browse files Browse the repository at this point in the history
  • Loading branch information
istae committed Dec 11, 2024
1 parent fc11edd commit 066b875
Show file tree
Hide file tree
Showing 21 changed files with 163 additions and 455 deletions.
56 changes: 4 additions & 52 deletions openapi/Swarm.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -880,11 +880,12 @@ paths:
$ref: "SwarmCommon.yaml#/components/schemas/HexString"
required: true
description: Signature
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmPinParameter"
- in: header
name: swarm-postage-batch-id
schema:
$ref: "SwarmCommon.yaml#/components/parameters/SwarmPostageBatchId"
required: true
required: false
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmPostageStamp"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmAct"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmActHistoryAddress"
Expand Down Expand Up @@ -916,47 +917,6 @@ paths:
$ref: "SwarmCommon.yaml#/components/responses/500"
default:
description: Default response
get:
summary: Resolve Single Owner Chunk data
tags:
- Single owner chunk
parameters:
- in: path
name: owner
schema:
$ref: "SwarmCommon.yaml#/components/schemas/EthereumAddress"
required: true
description: Ethereum address of the Owner of the SOC
- in: path
name: id
schema:
$ref: "SwarmCommon.yaml#/components/schemas/HexString"
required: true
description: Arbitrary identifier of the related data
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmOnlyRootChunkParameter"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmCache"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyStrategyParameter"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyFallbackModeParameter"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmChunkRetrievalTimeoutParameter"
responses:
"200":
description: Related Single Owner Chunk data
headers:
"swarm-soc-signature":
$ref: "SwarmCommon.yaml#/components/headers/SwarmSocSignature"
content:
application/octet-stream:
schema:
type: string
format: binary
"400":
$ref: "SwarmCommon.yaml#/components/responses/400"
"401":
$ref: "SwarmCommon.yaml#/components/responses/401"
"500":
$ref: "SwarmCommon.yaml#/components/responses/500"
default:
description: Default response

"/feeds/{owner}/{topic}":
post:
Expand Down Expand Up @@ -1041,26 +1001,18 @@ paths:
$ref: "SwarmCommon.yaml#/components/schemas/FeedType"
required: false
description: "Feed indexing scheme (default: sequence)"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmOnlyRootChunkParameter"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmCache"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyStrategyParameter"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyFallbackModeParameter"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmChunkRetrievalTimeoutParameter"
responses:
"200":
description: Latest feed update
headers:
"swarm-soc-signature":
$ref: "SwarmCommon.yaml#/components/headers/SwarmSocSignature"
"swarm-feed-index":
$ref: "SwarmCommon.yaml#/components/headers/SwarmFeedIndex"
"swarm-feed-index-next":
$ref: "SwarmCommon.yaml#/components/headers/SwarmFeedIndexNext"
content:
application/octet-stream:
application/json:
schema:
type: string
format: binary
$ref: "SwarmCommon.yaml#/components/schemas/ReferenceResponse"
"400":
$ref: "SwarmCommon.yaml#/components/responses/400"
"401":
Expand Down
13 changes: 0 additions & 13 deletions openapi/SwarmCommon.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1062,11 +1062,6 @@ components:
schema:
$ref: "#/components/schemas/HexString"

SwarmSocSignature:
description: "Attached digital signature of the Single Owner Chunk"
schema:
$ref: "#/components/schemas/HexString"

SwarmActHistoryAddress:
description: "Swarm address reference to the new ACT history entry"
schema:
Expand Down Expand Up @@ -1172,14 +1167,6 @@ components:
description: >
Specify the timeout for chunk retrieval. The default is 30 seconds.
SwarmOnlyRootChunkParameter:
in: header
name: swarm-only-root-chunk
schema:
type: boolean
required: false
description: "Returns only the root chunk of the content"

ContentTypePreserved:
in: header
name: Content-Type
Expand Down
4 changes: 1 addition & 3 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,8 @@ const (
SwarmEncryptHeader = "Swarm-Encrypt"
SwarmIndexDocumentHeader = "Swarm-Index-Document"
SwarmErrorDocumentHeader = "Swarm-Error-Document"
SwarmSocSignatureHeader = "Swarm-Soc-Signature"
SwarmFeedIndexHeader = "Swarm-Feed-Index"
SwarmFeedIndexNextHeader = "Swarm-Feed-Index-Next"
SwarmOnlyRootChunk = "Swarm-Only-Root-Chunk"
SwarmCollectionHeader = "Swarm-Collection"
SwarmPostageBatchIdHeader = "Swarm-Postage-Batch-Id"
SwarmPostageStampHeader = "Swarm-Postage-Stamp"
Expand Down Expand Up @@ -528,7 +526,7 @@ func (s *Service) corsHandler(h http.Handler) http.Handler {
allowedHeaders := []string{
"User-Agent", "Accept", "X-Requested-With", "Access-Control-Request-Headers", "Access-Control-Request-Method", "Accept-Ranges", "Content-Encoding",
AuthorizationHeader, AcceptEncodingHeader, ContentTypeHeader, ContentDispositionHeader, RangeHeader, OriginHeader,
SwarmTagHeader, SwarmPinHeader, SwarmEncryptHeader, SwarmIndexDocumentHeader, SwarmErrorDocumentHeader, SwarmCollectionHeader, SwarmPostageBatchIdHeader, SwarmPostageStampHeader, SwarmDeferredUploadHeader, SwarmRedundancyLevelHeader, SwarmRedundancyStrategyHeader, SwarmRedundancyFallbackModeHeader, SwarmChunkRetrievalTimeoutHeader, SwarmLookAheadBufferSizeHeader, SwarmFeedIndexHeader, SwarmFeedIndexNextHeader, SwarmSocSignatureHeader, SwarmOnlyRootChunk, GasPriceHeader, GasLimitHeader, ImmutableHeader,
SwarmTagHeader, SwarmPinHeader, SwarmEncryptHeader, SwarmIndexDocumentHeader, SwarmErrorDocumentHeader, SwarmCollectionHeader, SwarmPostageBatchIdHeader, SwarmPostageStampHeader, SwarmDeferredUploadHeader, SwarmRedundancyLevelHeader, SwarmRedundancyStrategyHeader, SwarmRedundancyFallbackModeHeader, SwarmChunkRetrievalTimeoutHeader, SwarmLookAheadBufferSizeHeader, SwarmFeedIndexHeader, SwarmFeedIndexNextHeader, GasPriceHeader, GasLimitHeader, ImmutableHeader,
}
allowedHeadersStr := strings.Join(allowedHeaders, ", ")

Expand Down
2 changes: 1 addition & 1 deletion pkg/api/bytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (s *Service) bytesGetHandler(w http.ResponseWriter, r *http.Request) {
ContentTypeHeader: {"application/octet-stream"},
}

s.downloadHandler(logger, w, r, address, additionalHeaders, true, false, nil)
s.downloadHandler(logger, w, r, address, additionalHeaders, true, false)
}

func (s *Service) bytesHeadHandler(w http.ResponseWriter, r *http.Request) {
Expand Down
22 changes: 5 additions & 17 deletions pkg/api/bzz.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethersphere/bee/v2/pkg/accesscontrol"
"github.com/ethersphere/bee/v2/pkg/feeds"
"github.com/ethersphere/bee/v2/pkg/file"
"github.com/ethersphere/bee/v2/pkg/file/joiner"
"github.com/ethersphere/bee/v2/pkg/file/loadsave"
"github.com/ethersphere/bee/v2/pkg/file/redundancy"
Expand Down Expand Up @@ -426,17 +425,14 @@ FETCH:
jsonhttp.NotFound(w, "no update found")
return
}
wc, err := feeds.GetWrappedChunk(ctx, s.storer.ChunkStore(), ch)
ref, _, err := parseFeedUpdate(ch)
if err != nil {
logger.Debug("bzz download: mapStructure feed update failed", "error", err)
logger.Error(nil, "bzz download: mapStructure feed update failed")
jsonhttp.InternalServerError(w, "mapStructure feed update")
return
}
address = wc.Address()
// modify ls and init with non-existing wrapped chunk
ls = loadsave.NewReadonlyWithRootCh(s.storer.Download(cache), wc)

address = ref
feedDereferenced = true
curBytes, err := cur.MarshalBinary()
if err != nil {
Expand Down Expand Up @@ -558,11 +554,11 @@ func (s *Service) serveManifestEntry(
additionalHeaders[ContentTypeHeader] = []string{mimeType}
}

s.downloadHandler(logger, w, r, manifestEntry.Reference(), additionalHeaders, etag, headersOnly, nil)
s.downloadHandler(logger, w, r, manifestEntry.Reference(), additionalHeaders, etag, headersOnly)
}

// downloadHandler contains common logic for downloading Swarm file from API
func (s *Service) downloadHandler(logger log.Logger, w http.ResponseWriter, r *http.Request, reference swarm.Address, additionalHeaders http.Header, etag, headersOnly bool, rootCh swarm.Chunk) {
func (s *Service) downloadHandler(logger log.Logger, w http.ResponseWriter, r *http.Request, reference swarm.Address, additionalHeaders http.Header, etag, headersOnly bool) {
headers := struct {
Strategy *getter.Strategy `map:"Swarm-Redundancy-Strategy"`
RLevel *redundancy.Level `map:"Swarm-Redundancy-Level"`
Expand Down Expand Up @@ -592,15 +588,7 @@ func (s *Service) downloadHandler(logger log.Logger, w http.ResponseWriter, r *h
ctx = redundancy.SetLevelInContext(ctx, *headers.RLevel)
}

var (
reader file.Joiner
l int64
)
if rootCh != nil {
reader, l, err = joiner.NewJoiner(ctx, s.storer.Download(cache), s.storer.Cache(), reference, rootCh)
} else {
reader, l, err = joiner.New(ctx, s.storer.Download(cache), s.storer.Cache(), reference)
}
reader, l, err := joiner.New(ctx, s.storer.Download(cache), s.storer.Cache(), reference)
if err != nil {
if errors.Is(err, storage.ErrNotFound) || errors.Is(err, topology.ErrNotFound) {
logger.Debug("api download: not found ", "address", reference, "error", err)
Expand Down
72 changes: 31 additions & 41 deletions pkg/api/feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,11 @@
package api

import (
"bytes"
"encoding/binary"
"encoding/hex"
"errors"
"io"
"fmt"
"net/http"
"strconv"
"strings"
"time"

"github.com/ethereum/go-ethereum/common"
Expand All @@ -36,6 +34,8 @@ const (
feedMetadataEntryType = "swarm-feed-type"
)

var errInvalidFeedUpdate = errors.New("invalid feed update")

type feedReferenceResponse struct {
Reference swarm.Address `json:"reference"`
}
Expand Down Expand Up @@ -64,14 +64,6 @@ func (s *Service) feedGetHandler(w http.ResponseWriter, r *http.Request) {
queries.At = time.Now().Unix()
}

headers := struct {
OnlyRootChunk bool `map:"Swarm-Only-Root-Chunk"`
}{}
if response := s.mapStructure(r.Header, &headers); response != nil {
response("invalid header params", logger, w)
return
}

f := feeds.New(paths.Topic, paths.Owner)
lookup, err := s.feedFactory.NewLookup(feeds.Sequence, f)
if err != nil {
Expand Down Expand Up @@ -102,10 +94,11 @@ func (s *Service) feedGetHandler(w http.ResponseWriter, r *http.Request) {
return
}

wc, err := feeds.GetWrappedChunk(r.Context(), s.storer.ChunkStore(), ch)
ref, _, err := parseFeedUpdate(ch)
if err != nil {
logger.Error(nil, "wrapped chunk cannot be retrieved")
jsonhttp.NotFound(w, "wrapped chunk cannot be retrieved")
logger.Debug("mapStructure feed update failed", "error", err)
logger.Error(nil, "mapStructure feed update failed")
jsonhttp.InternalServerError(w, "mapStructure feed update failed")
return
}

Expand All @@ -125,33 +118,11 @@ func (s *Service) feedGetHandler(w http.ResponseWriter, r *http.Request) {
return
}

socCh, err := soc.FromChunk(ch)
if err != nil {
logger.Error(nil, "wrapped chunk cannot be retrieved")
jsonhttp.NotFound(w, "wrapped chunk cannot be retrieved")
return
}
sig := socCh.Signature()

additionalHeaders := http.Header{
ContentTypeHeader: {"application/octet-stream"},
SwarmFeedIndexHeader: {hex.EncodeToString(curBytes)},
SwarmFeedIndexNextHeader: {hex.EncodeToString(nextBytes)},
SwarmSocSignatureHeader: {hex.EncodeToString(sig)},
"Access-Control-Expose-Headers": {SwarmFeedIndexHeader, SwarmFeedIndexNextHeader, SwarmSocSignatureHeader},
}

if headers.OnlyRootChunk {
w.Header().Set(ContentLengthHeader, strconv.Itoa(len(wc.Data())))
// include additional headers
for name, values := range additionalHeaders {
w.Header().Set(name, strings.Join(values, ", "))
}
_, _ = io.Copy(w, bytes.NewReader(wc.Data()))
return
}
w.Header().Set(SwarmFeedIndexHeader, hex.EncodeToString(curBytes))
w.Header().Set(SwarmFeedIndexNextHeader, hex.EncodeToString(nextBytes))
w.Header().Set("Access-Control-Expose-Headers", fmt.Sprintf("%s, %s", SwarmFeedIndexHeader, SwarmFeedIndexNextHeader))

s.downloadHandler(logger, w, r, wc.Address(), additionalHeaders, true, false, wc)
jsonhttp.OK(w, feedReferenceResponse{Reference: ref})
}

func (s *Service) feedPostHandler(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -307,3 +278,22 @@ func (s *Service) feedPostHandler(w http.ResponseWriter, r *http.Request) {

jsonhttp.Created(w, feedReferenceResponse{Reference: encryptedReference})
}

func parseFeedUpdate(ch swarm.Chunk) (swarm.Address, int64, error) {
s, err := soc.FromChunk(ch)
if err != nil {
return swarm.ZeroAddress, 0, fmt.Errorf("soc unmarshal: %w", err)
}

update := s.WrappedChunk().Data()
// split the timestamp and reference
// possible values right now:
// unencrypted ref: span+timestamp+ref => 8+8+32=48
// encrypted ref: span+timestamp+ref+decryptKey => 8+8+64=80
if len(update) != 48 && len(update) != 80 {
return swarm.ZeroAddress, 0, errInvalidFeedUpdate
}
ts := binary.BigEndian.Uint64(update[8:16])
ref := swarm.NewAddress(update[16:])
return ref, int64(ts), nil
}
Loading

0 comments on commit 066b875

Please sign in to comment.