diff --git a/openapi/Swarm.yaml b/openapi/Swarm.yaml index 588d31b59d9..e9ef6b840a4 100644 --- a/openapi/Swarm.yaml +++ b/openapi/Swarm.yaml @@ -840,11 +840,12 @@ paths: $ref: "SwarmCommon.yaml#/components/schemas/HexString" required: true description: Signature - - $ref: "SwarmCommon.yaml#/components/parameters/SwarmPinParameter" - in: header name: swarm-postage-batch-id schema: $ref: "SwarmCommon.yaml#/components/parameters/SwarmPostageBatchId" + required: true + - $ref: "SwarmCommon.yaml#/components/parameters/SwarmPinParameter" required: false - $ref: "SwarmCommon.yaml#/components/parameters/SwarmPostageStamp" - $ref: "SwarmCommon.yaml#/components/parameters/SwarmAct" @@ -877,6 +878,47 @@ paths: $ref: "SwarmCommon.yaml#/components/responses/500" default: description: Default response + get: + summary: Resolve Single Owner Chunk data + tags: + - Single owner chunk + parameters: + - in: path + name: owner + schema: + $ref: "SwarmCommon.yaml#/components/schemas/EthereumAddress" + required: true + description: Ethereum address of the Owner of the SOC + - in: path + name: id + schema: + $ref: "SwarmCommon.yaml#/components/schemas/HexString" + required: true + description: Arbitrary identifier of the related data + - $ref: "SwarmCommon.yaml#/components/parameters/SwarmOnlyRootChunkParameter" + - $ref: "SwarmCommon.yaml#/components/parameters/SwarmCache" + - $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyStrategyParameter" + - $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyFallbackModeParameter" + - $ref: "SwarmCommon.yaml#/components/parameters/SwarmChunkRetrievalTimeoutParameter" + responses: + "200": + description: Related Single Owner Chunk data + headers: + "swarm-soc-signature": + $ref: "SwarmCommon.yaml#/components/headers/SwarmSocSignature" + content: + application/octet-stream: + schema: + type: string + format: binary + "400": + $ref: "SwarmCommon.yaml#/components/responses/400" + "401": + $ref: "SwarmCommon.yaml#/components/responses/401" + "500": + $ref: "SwarmCommon.yaml#/components/responses/500" + default: + description: Default response "/feeds/{owner}/{topic}": post: @@ -961,18 +1003,26 @@ paths: $ref: "SwarmCommon.yaml#/components/schemas/FeedType" required: false description: "Feed indexing scheme (default: sequence)" + - $ref: "SwarmCommon.yaml#/components/headers/SwarmOnlyRootChunkParameter" + - $ref: "SwarmCommon.yaml#/components/parameters/SwarmCache" + - $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyStrategyParameter" + - $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyFallbackModeParameter" + - $ref: "SwarmCommon.yaml#/components/parameters/SwarmChunkRetrievalTimeoutParameter" responses: "200": description: Latest feed update headers: + "swarm-soc-signature": + $ref: "SwarmCommon.yaml#/components/headers/SwarmSocSignature" "swarm-feed-index": $ref: "SwarmCommon.yaml#/components/headers/SwarmFeedIndex" "swarm-feed-index-next": $ref: "SwarmCommon.yaml#/components/headers/SwarmFeedIndexNext" content: - application/json: + application/octet-stream: schema: - $ref: "SwarmCommon.yaml#/components/schemas/ReferenceResponse" + type: string + format: binary "400": $ref: "SwarmCommon.yaml#/components/responses/400" "401": diff --git a/openapi/SwarmCommon.yaml b/openapi/SwarmCommon.yaml index 92027be2a34..d4b4feb8c5c 100644 --- a/openapi/SwarmCommon.yaml +++ b/openapi/SwarmCommon.yaml @@ -1031,6 +1031,11 @@ components: schema: $ref: "#/components/schemas/HexString" + SwarmSocSignature: + description: "Attached digital signature of the Single Owner Chunk" + schema: + $ref: "#/components/schemas/HexString" + SwarmActHistoryAddress: description: "Swarm address reference to the new ACT history entry" schema: @@ -1136,6 +1141,14 @@ components: description: > Specify the timeout for chunk retrieval. The default is 30 seconds. + SwarmOnlyRootChunkParameter: + in: header + name: swarm-only-root-chunk + schema: + type: boolean + required: false + description: "Returns only the root chunk of the content" + ContentTypePreserved: in: header name: Content-Type diff --git a/pkg/api/api.go b/pkg/api/api.go index b4c7f7ad18b..4fc70fbfb17 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -74,8 +74,10 @@ const ( SwarmEncryptHeader = "Swarm-Encrypt" SwarmIndexDocumentHeader = "Swarm-Index-Document" SwarmErrorDocumentHeader = "Swarm-Error-Document" + SwarmSocSignatureHeader = "Swarm-Soc-Signature" SwarmFeedIndexHeader = "Swarm-Feed-Index" SwarmFeedIndexNextHeader = "Swarm-Feed-Index-Next" + SwarmOnlyRootChunk = "Swarm-Only-Root-Chunk" SwarmCollectionHeader = "Swarm-Collection" SwarmPostageBatchIdHeader = "Swarm-Postage-Batch-Id" SwarmPostageStampHeader = "Swarm-Postage-Stamp" @@ -520,7 +522,7 @@ func (s *Service) corsHandler(h http.Handler) http.Handler { allowedHeaders := []string{ "User-Agent", "Accept", "X-Requested-With", "Access-Control-Request-Headers", "Access-Control-Request-Method", "Accept-Ranges", "Content-Encoding", AuthorizationHeader, AcceptEncodingHeader, ContentTypeHeader, ContentDispositionHeader, RangeHeader, OriginHeader, - SwarmTagHeader, SwarmPinHeader, SwarmEncryptHeader, SwarmIndexDocumentHeader, SwarmErrorDocumentHeader, SwarmCollectionHeader, SwarmPostageBatchIdHeader, SwarmPostageStampHeader, SwarmDeferredUploadHeader, SwarmRedundancyLevelHeader, SwarmRedundancyStrategyHeader, SwarmRedundancyFallbackModeHeader, SwarmChunkRetrievalTimeoutHeader, SwarmLookAheadBufferSizeHeader, SwarmFeedIndexHeader, SwarmFeedIndexNextHeader, GasPriceHeader, GasLimitHeader, ImmutableHeader, + SwarmTagHeader, SwarmPinHeader, SwarmEncryptHeader, SwarmIndexDocumentHeader, SwarmErrorDocumentHeader, SwarmCollectionHeader, SwarmPostageBatchIdHeader, SwarmPostageStampHeader, SwarmDeferredUploadHeader, SwarmRedundancyLevelHeader, SwarmRedundancyStrategyHeader, SwarmRedundancyFallbackModeHeader, SwarmChunkRetrievalTimeoutHeader, SwarmLookAheadBufferSizeHeader, SwarmFeedIndexHeader, SwarmFeedIndexNextHeader, SwarmSocSignatureHeader, SwarmOnlyRootChunk, GasPriceHeader, GasLimitHeader, ImmutableHeader, } allowedHeadersStr := strings.Join(allowedHeaders, ", ") diff --git a/pkg/api/bytes.go b/pkg/api/bytes.go index 11a600f854b..5eabf41458b 100644 --- a/pkg/api/bytes.go +++ b/pkg/api/bytes.go @@ -182,7 +182,7 @@ func (s *Service) bytesGetHandler(w http.ResponseWriter, r *http.Request) { ContentTypeHeader: {"application/octet-stream"}, } - s.downloadHandler(logger, w, r, address, additionalHeaders, true, false) + s.downloadHandler(logger, w, r, address, additionalHeaders, true, false, nil) } func (s *Service) bytesHeadHandler(w http.ResponseWriter, r *http.Request) { diff --git a/pkg/api/bzz.go b/pkg/api/bzz.go index 539779cd2d6..559ce652c43 100644 --- a/pkg/api/bzz.go +++ b/pkg/api/bzz.go @@ -23,6 +23,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethersphere/bee/v2/pkg/accesscontrol" "github.com/ethersphere/bee/v2/pkg/feeds" + "github.com/ethersphere/bee/v2/pkg/file" "github.com/ethersphere/bee/v2/pkg/file/joiner" "github.com/ethersphere/bee/v2/pkg/file/loadsave" "github.com/ethersphere/bee/v2/pkg/file/redundancy" @@ -421,14 +422,17 @@ FETCH: jsonhttp.NotFound(w, "no update found") return } - ref, _, err := parseFeedUpdate(ch) + wc, err := feeds.GetWrappedChunk(ctx, s.storer.ChunkStore(), ch) if err != nil { logger.Debug("bzz download: mapStructure feed update failed", "error", err) logger.Error(nil, "bzz download: mapStructure feed update failed") jsonhttp.InternalServerError(w, "mapStructure feed update") return } - address = ref + address = wc.Address() + // modify ls and init with non-existing wrapped chunk + ls = loadsave.NewReadonlyWithRootCh(s.storer.Download(cache), wc) + feedDereferenced = true curBytes, err := cur.MarshalBinary() if err != nil { @@ -550,11 +554,11 @@ func (s *Service) serveManifestEntry( additionalHeaders[ContentTypeHeader] = []string{mimeType} } - s.downloadHandler(logger, w, r, manifestEntry.Reference(), additionalHeaders, etag, headersOnly) + s.downloadHandler(logger, w, r, manifestEntry.Reference(), additionalHeaders, etag, headersOnly, nil) } // downloadHandler contains common logic for downloading Swarm file from API -func (s *Service) downloadHandler(logger log.Logger, w http.ResponseWriter, r *http.Request, reference swarm.Address, additionalHeaders http.Header, etag, headersOnly bool) { +func (s *Service) downloadHandler(logger log.Logger, w http.ResponseWriter, r *http.Request, reference swarm.Address, additionalHeaders http.Header, etag, headersOnly bool, rootCh swarm.Chunk) { headers := struct { Strategy *getter.Strategy `map:"Swarm-Redundancy-Strategy"` FallbackMode *bool `map:"Swarm-Redundancy-Fallback-Mode"` @@ -580,7 +584,15 @@ func (s *Service) downloadHandler(logger log.Logger, w http.ResponseWriter, r *h return } - reader, l, err := joiner.New(ctx, s.storer.Download(cache), s.storer.Cache(), reference) + var ( + reader file.Joiner + l int64 + ) + if rootCh != nil { + reader, l, err = joiner.NewJoiner(ctx, s.storer.Download(cache), s.storer.Cache(), reference, rootCh) + } else { + reader, l, err = joiner.New(ctx, s.storer.Download(cache), s.storer.Cache(), reference) + } if err != nil { if errors.Is(err, storage.ErrNotFound) || errors.Is(err, topology.ErrNotFound) { logger.Debug("api download: not found ", "address", reference, "error", err) diff --git a/pkg/api/feed.go b/pkg/api/feed.go index 7750fd7605c..7c92f3fd855 100644 --- a/pkg/api/feed.go +++ b/pkg/api/feed.go @@ -5,11 +5,13 @@ package api import ( - "encoding/binary" + "bytes" "encoding/hex" "errors" - "fmt" + "io" "net/http" + "strconv" + "strings" "time" "github.com/ethereum/go-ethereum/common" @@ -34,8 +36,6 @@ const ( feedMetadataEntryType = "swarm-feed-type" ) -var errInvalidFeedUpdate = errors.New("invalid feed update") - type feedReferenceResponse struct { Reference swarm.Address `json:"reference"` } @@ -64,6 +64,14 @@ func (s *Service) feedGetHandler(w http.ResponseWriter, r *http.Request) { queries.At = time.Now().Unix() } + headers := struct { + OnlyRootChunk bool `map:"Swarm-Only-Root-Chunk"` + }{} + if response := s.mapStructure(r.Header, &headers); response != nil { + response("invalid header params", logger, w) + return + } + f := feeds.New(paths.Topic, paths.Owner) lookup, err := s.feedFactory.NewLookup(feeds.Sequence, f) if err != nil { @@ -94,11 +102,10 @@ func (s *Service) feedGetHandler(w http.ResponseWriter, r *http.Request) { return } - ref, _, err := parseFeedUpdate(ch) + wc, err := feeds.GetWrappedChunk(r.Context(), s.storer.ChunkStore(), ch) if err != nil { - logger.Debug("mapStructure feed update failed", "error", err) - logger.Error(nil, "mapStructure feed update failed") - jsonhttp.InternalServerError(w, "mapStructure feed update failed") + logger.Error(nil, "wrapped chunk cannot be retrieved") + jsonhttp.NotFound(w, "wrapped chunk cannot be retrieved") return } @@ -118,11 +125,33 @@ func (s *Service) feedGetHandler(w http.ResponseWriter, r *http.Request) { return } - w.Header().Set(SwarmFeedIndexHeader, hex.EncodeToString(curBytes)) - w.Header().Set(SwarmFeedIndexNextHeader, hex.EncodeToString(nextBytes)) - w.Header().Set("Access-Control-Expose-Headers", fmt.Sprintf("%s, %s", SwarmFeedIndexHeader, SwarmFeedIndexNextHeader)) + socCh, err := soc.FromChunk(ch) + if err != nil { + logger.Error(nil, "wrapped chunk cannot be retrieved") + jsonhttp.NotFound(w, "wrapped chunk cannot be retrieved") + return + } + sig := socCh.Signature() + + additionalHeaders := http.Header{ + ContentTypeHeader: {"application/octet-stream"}, + SwarmFeedIndexHeader: {hex.EncodeToString(curBytes)}, + SwarmFeedIndexNextHeader: {hex.EncodeToString(nextBytes)}, + SwarmSocSignatureHeader: {hex.EncodeToString(sig)}, + "Access-Control-Expose-Headers": {SwarmFeedIndexHeader, SwarmFeedIndexNextHeader, SwarmSocSignatureHeader}, + } + + if headers.OnlyRootChunk { + w.Header().Set(ContentLengthHeader, strconv.Itoa(len(wc.Data()))) + // include additional headers + for name, values := range additionalHeaders { + w.Header().Set(name, strings.Join(values, ", ")) + } + _, _ = io.Copy(w, bytes.NewReader(wc.Data())) + return + } - jsonhttp.OK(w, feedReferenceResponse{Reference: ref}) + s.downloadHandler(logger, w, r, wc.Address(), additionalHeaders, true, false, wc) } func (s *Service) feedPostHandler(w http.ResponseWriter, r *http.Request) { @@ -278,22 +307,3 @@ func (s *Service) feedPostHandler(w http.ResponseWriter, r *http.Request) { jsonhttp.Created(w, feedReferenceResponse{Reference: encryptedReference}) } - -func parseFeedUpdate(ch swarm.Chunk) (swarm.Address, int64, error) { - s, err := soc.FromChunk(ch) - if err != nil { - return swarm.ZeroAddress, 0, fmt.Errorf("soc unmarshal: %w", err) - } - - update := s.WrappedChunk().Data() - // split the timestamp and reference - // possible values right now: - // unencrypted ref: span+timestamp+ref => 8+8+32=48 - // encrypted ref: span+timestamp+ref+decryptKey => 8+8+64=80 - if len(update) != 48 && len(update) != 80 { - return swarm.ZeroAddress, 0, errInvalidFeedUpdate - } - ts := binary.BigEndian.Uint64(update[8:16]) - ref := swarm.NewAddress(update[16:]) - return ref, int64(ts), nil -} diff --git a/pkg/api/feed_test.go b/pkg/api/feed_test.go index a35b9ce3423..843756d7237 100644 --- a/pkg/api/feed_test.go +++ b/pkg/api/feed_test.go @@ -5,11 +5,13 @@ package api_test import ( + "bytes" "context" "encoding/binary" "encoding/hex" "errors" "fmt" + "io" "math/big" "net/http" "testing" @@ -17,6 +19,7 @@ import ( "github.com/ethersphere/bee/v2/pkg/api" "github.com/ethersphere/bee/v2/pkg/feeds" "github.com/ethersphere/bee/v2/pkg/file/loadsave" + "github.com/ethersphere/bee/v2/pkg/file/splitter" "github.com/ethersphere/bee/v2/pkg/jsonhttp" "github.com/ethersphere/bee/v2/pkg/jsonhttp/jsonhttptest" "github.com/ethersphere/bee/v2/pkg/log" @@ -24,8 +27,10 @@ import ( "github.com/ethersphere/bee/v2/pkg/postage" mockpost "github.com/ethersphere/bee/v2/pkg/postage/mock" testingsoc "github.com/ethersphere/bee/v2/pkg/soc/testing" + testingc "github.com/ethersphere/bee/v2/pkg/storage/testing" mockstorer "github.com/ethersphere/bee/v2/pkg/storer/mock" "github.com/ethersphere/bee/v2/pkg/swarm" + "github.com/ethersphere/bee/v2/pkg/util/testutil" ) const ownerString = "8d3766440f0d7b949a5e32995d09619a7f86e632" @@ -44,13 +49,22 @@ func TestFeed_Get(t *testing.T) { } mockStorer = mockstorer.New() ) + putter, err := mockStorer.Upload(context.Background(), false, 0) + if err != nil { + t.Fatal(err) + } + mockWrappedCh := testingc.FixtureChunk("0033") + err = putter.Put(context.Background(), mockWrappedCh) + if err != nil { + t.Fatal(err) + } t.Run("with at", func(t *testing.T) { t.Parallel() var ( timestamp = int64(12121212) - ch = toChunk(t, uint64(timestamp), expReference.Bytes()) + ch = toChunk(t, uint64(timestamp), mockWrappedCh.Address().Bytes()) look = newMockLookup(12, 0, ch, nil, &id{}, &id{}) factory = newMockFactory(look) idBytes, _ = (&id{}).MarshalBinary() @@ -61,7 +75,7 @@ func TestFeed_Get(t *testing.T) { ) jsonhttptest.Request(t, client, http.MethodGet, feedResource(ownerString, "aabbcc", "12"), http.StatusOK, - jsonhttptest.WithExpectedJSONResponse(api.FeedReferenceResponse{Reference: expReference}), + jsonhttptest.WithExpectedResponse(mockWrappedCh.Data()[swarm.SpanSize:]), jsonhttptest.WithExpectedResponseHeader(api.SwarmFeedIndexHeader, hex.EncodeToString(idBytes)), ) }) @@ -71,7 +85,7 @@ func TestFeed_Get(t *testing.T) { var ( timestamp = int64(12121212) - ch = toChunk(t, uint64(timestamp), expReference.Bytes()) + ch = toChunk(t, uint64(timestamp), mockWrappedCh.Address().Bytes()) look = newMockLookup(-1, 2, ch, nil, &id{}, &id{}) factory = newMockFactory(look) idBytes, _ = (&id{}).MarshalBinary() @@ -83,10 +97,102 @@ func TestFeed_Get(t *testing.T) { ) jsonhttptest.Request(t, client, http.MethodGet, feedResource(ownerString, "aabbcc", ""), http.StatusOK, - jsonhttptest.WithExpectedJSONResponse(api.FeedReferenceResponse{Reference: expReference}), + jsonhttptest.WithExpectedResponse(mockWrappedCh.Data()[swarm.SpanSize:]), + jsonhttptest.WithExpectedContentLength(len(mockWrappedCh.Data()[swarm.SpanSize:])), jsonhttptest.WithExpectedResponseHeader(api.SwarmFeedIndexHeader, hex.EncodeToString(idBytes)), ) }) + + t.Run("chunk wrapping", func(t *testing.T) { + t.Parallel() + + testData := []byte{0, 1, 2, 3, 4, 5, 6, 7, 8} + + var ( + ch = testingsoc.GenerateMockSOC(t, testData).Chunk() + look = newMockLookup(-1, 2, ch, nil, &id{}, &id{}) + factory = newMockFactory(look) + idBytes, _ = (&id{}).MarshalBinary() + + client, _, _, _ = newTestServer(t, testServerOptions{ + Storer: mockStorer, + Feeds: factory, + }) + ) + + jsonhttptest.Request(t, client, http.MethodGet, feedResource(ownerString, "aabbcc", ""), http.StatusOK, + jsonhttptest.WithExpectedResponse(testData), + jsonhttptest.WithExpectedContentLength(len(testData)), + jsonhttptest.WithExpectedResponseHeader(api.SwarmFeedIndexHeader, hex.EncodeToString(idBytes)), + ) + }) + + t.Run("legacy payload with non existing wrapped chunk", func(t *testing.T) { + t.Parallel() + + wrappedRef := make([]byte, swarm.HashSize) + _ = copy(wrappedRef, mockWrappedCh.Address().Bytes()) + wrappedRef[0]++ + + var ( + ch = toChunk(t, uint64(12121212), wrappedRef) + look = newMockLookup(-1, 2, ch, nil, &id{}, &id{}) + factory = newMockFactory(look) + + client, _, _, _ = newTestServer(t, testServerOptions{ + Storer: mockStorer, + Feeds: factory, + }) + ) + + jsonhttptest.Request(t, client, http.MethodGet, feedResource(ownerString, "aabbcc", ""), http.StatusNotFound) + }) + + t.Run("bigger payload than one chunk", func(t *testing.T) { + t.Parallel() + + testDataLen := 5000 + testData := testutil.RandBytesWithSeed(t, testDataLen, 1) + s := splitter.NewSimpleSplitter(putter) + addr, err := s.Split(context.Background(), io.NopCloser(bytes.NewReader(testData)), int64(testDataLen), false) + if err != nil { + t.Fatal(err) + } + + // get root ch addr then add wrap it with soc + testRootCh, err := mockStorer.ChunkStore().Get(context.Background(), addr) + if err != nil { + t.Fatal(err) + } + var ( + ch = testingsoc.GenerateMockSOCWithSpan(t, testRootCh.Data()).Chunk() + look = newMockLookup(-1, 2, ch, nil, &id{}, &id{}) + factory = newMockFactory(look) + idBytes, _ = (&id{}).MarshalBinary() + + client, _, _, _ = newTestServer(t, testServerOptions{ + Storer: mockStorer, + Feeds: factory, + }) + ) + + t.Run("retrieve chunk tree", func(t *testing.T) { + jsonhttptest.Request(t, client, http.MethodGet, feedResource(ownerString, "aabbcc", ""), http.StatusOK, + jsonhttptest.WithExpectedResponse(testData), + jsonhttptest.WithExpectedContentLength(testDataLen), + jsonhttptest.WithExpectedResponseHeader(api.SwarmFeedIndexHeader, hex.EncodeToString(idBytes)), + ) + }) + + t.Run("retrieve only wrapped chunk", func(t *testing.T) { + jsonhttptest.Request(t, client, http.MethodGet, feedResource(ownerString, "aabbcc", ""), http.StatusOK, + jsonhttptest.WithRequestHeader(api.SwarmOnlyRootChunk, "true"), + jsonhttptest.WithExpectedResponse(testRootCh.Data()), + jsonhttptest.WithExpectedContentLength(len(testRootCh.Data())), + jsonhttptest.WithExpectedResponseHeader(api.SwarmFeedIndexHeader, hex.EncodeToString(idBytes)), + ) + }) + }) } // nolint:paralleltest diff --git a/pkg/api/router.go b/pkg/api/router.go index 000521aab67..f9a0716b84b 100644 --- a/pkg/api/router.go +++ b/pkg/api/router.go @@ -66,9 +66,13 @@ func (s *Service) MountAPI() { "/bzz", "/bytes", "/chunks", + "/feeds", + "/soc", rootPath + "/bzz", rootPath + "/bytes", rootPath + "/chunks", + rootPath + "/feeds", + rootPath + "/soc", } return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -247,6 +251,7 @@ func (s *Service) mountAPI() { }) handle("/soc/{owner}/{id}", jsonhttp.MethodHandler{ + "GET": http.HandlerFunc(s.socGetHandler), "POST": web.ChainHandlers( jsonhttp.NewMaxBodyBytesHandler(swarm.ChunkWithSpanSize), web.FinalHandlerFunc(s.socUploadHandler), diff --git a/pkg/api/soc.go b/pkg/api/soc.go index 9eff4bd3c99..09d4f1d3ae2 100644 --- a/pkg/api/soc.go +++ b/pkg/api/soc.go @@ -5,9 +5,13 @@ package api import ( + "bytes" + "encoding/hex" "errors" "io" "net/http" + "strconv" + "strings" "github.com/ethersphere/bee/v2/pkg/accesscontrol" "github.com/ethersphere/bee/v2/pkg/cac" @@ -226,3 +230,66 @@ func (s *Service) socUploadHandler(w http.ResponseWriter, r *http.Request) { jsonhttp.Created(w, socPostResponse{Reference: reference}) } + +func (s *Service) socGetHandler(w http.ResponseWriter, r *http.Request) { + logger := s.logger.WithName("get_soc").Build() + + paths := struct { + Owner []byte `map:"owner" validate:"required"` + ID []byte `map:"id" validate:"required"` + }{} + if response := s.mapStructure(mux.Vars(r), &paths); response != nil { + response("invalid path params", logger, w) + return + } + + headers := struct { + OnlyRootChunk bool `map:"Swarm-Only-Root-Chunk"` + }{} + if response := s.mapStructure(r.Header, &headers); response != nil { + response("invalid header params", logger, w) + return + } + + address, err := soc.CreateAddress(paths.ID, paths.Owner) + if err != nil { + logger.Error(err, "soc address cannot be created") + jsonhttp.BadRequest(w, "soc address cannot be created") + return + } + + getter := s.storer.Download(true) + sch, err := getter.Get(r.Context(), address) + if err != nil { + logger.Error(err, "soc retrieval has been failed") + jsonhttp.NotFound(w, "requested chunk cannot be retrieved") + return + } + socCh, err := soc.FromChunk(sch) + if err != nil { + logger.Error(err, "chunk is not a signle owner chunk") + jsonhttp.InternalServerError(w, "chunk is not a single owner chunk") + return + } + + sig := socCh.Signature() + wc := socCh.WrappedChunk() + + additionalHeaders := http.Header{ + ContentTypeHeader: {"application/octet-stream"}, + SwarmSocSignatureHeader: {hex.EncodeToString(sig)}, + "Access-Control-Expose-Headers": {SwarmSocSignatureHeader}, + } + + if headers.OnlyRootChunk { + w.Header().Set(ContentLengthHeader, strconv.Itoa(len(wc.Data()))) + // include additional headers + for name, values := range additionalHeaders { + w.Header().Set(name, strings.Join(values, ", ")) + } + _, _ = io.Copy(w, bytes.NewReader(wc.Data())) + return + } + + s.downloadHandler(logger, w, r, wc.Address(), additionalHeaders, true, false, wc) +} diff --git a/pkg/api/soc_test.go b/pkg/api/soc_test.go index 94eeccf60dd..2407689ece1 100644 --- a/pkg/api/soc_test.go +++ b/pkg/api/soc_test.go @@ -89,16 +89,31 @@ func TestSOC(t *testing.T) { ) // try to fetch the same chunk - rsrc := fmt.Sprintf("/chunks/" + s.Address().String()) - resp := request(t, client, http.MethodGet, rsrc, nil, http.StatusOK) - data, err := io.ReadAll(resp.Body) - if err != nil { - t.Fatal(err) - } - - if !bytes.Equal(s.Chunk().Data(), data) { - t.Fatal("data retrieved doesn't match uploaded content") - } + t.Run("chunks fetch", func(t *testing.T) { + rsrc := fmt.Sprintf("/chunks/" + s.Address().String()) + resp := request(t, client, http.MethodGet, rsrc, nil, http.StatusOK) + data, err := io.ReadAll(resp.Body) + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(s.Chunk().Data(), data) { + t.Fatal("data retrieved doesn't match uploaded content") + } + }) + + t.Run("soc fetch", func(t *testing.T) { + rsrc := fmt.Sprintf("/soc/%s/%s", hex.EncodeToString(s.Owner), hex.EncodeToString(s.ID)) + resp := request(t, client, http.MethodGet, rsrc, nil, http.StatusOK) + data, err := io.ReadAll(resp.Body) + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(s.WrappedChunk.Data()[swarm.SpanSize:], data) { + t.Fatal("data retrieved doesn't match uploaded content") + } + }) }) t.Run("postage", func(t *testing.T) { diff --git a/pkg/feeds/epochs/finder.go b/pkg/feeds/epochs/finder.go index a85ab309e76..d5133fdaa57 100644 --- a/pkg/feeds/epochs/finder.go +++ b/pkg/feeds/epochs/finder.go @@ -51,10 +51,7 @@ func (f *finder) common(ctx context.Context, at int64, after uint64) (*epoch, sw } return e, nil, err } - ts, err := feeds.UpdatedAt(ch) - if err != nil { - return e, nil, err - } + ts := e.length() * e.start if ts <= uint64(at) { return e, ch, nil } @@ -78,10 +75,7 @@ func (f *finder) at(ctx context.Context, at uint64, e *epoch, ch swarm.Chunk) (s } // epoch found // check if timestamp is later then target - ts, err := feeds.UpdatedAt(uch) - if err != nil { - return nil, err - } + ts := e.length() * e.start if ts > at { if e.isLeft() { return ch, nil @@ -131,10 +125,7 @@ func (f *asyncFinder) get(ctx context.Context, at int64, e *epoch) (swarm.Chunk, } return nil, nil } - ts, err := feeds.UpdatedAt(u) - if err != nil { - return nil, err - } + ts := e.length() * e.start diff := at - int64(ts) if diff < 0 { return nil, nil diff --git a/pkg/feeds/epochs/updater.go b/pkg/feeds/epochs/updater.go index b36d77e7d96..f3588b98f1b 100644 --- a/pkg/feeds/epochs/updater.go +++ b/pkg/feeds/epochs/updater.go @@ -34,7 +34,7 @@ func NewUpdater(putter storage.Putter, signer crypto.Signer, topic []byte) (feed // Update pushes an update to the feed through the chunk stores func (u *updater) Update(ctx context.Context, at int64, payload []byte) error { e := next(u.epoch, u.last, uint64(at)) - err := u.Put(ctx, e, at, payload) + err := u.Put(ctx, e, payload) if err != nil { return err } diff --git a/pkg/feeds/feed.go b/pkg/feeds/feed.go index 7c9495cdfac..ac8d232f5ce 100644 --- a/pkg/feeds/feed.go +++ b/pkg/feeds/feed.go @@ -107,7 +107,7 @@ func NewUpdate(f *Feed, idx Index, timestamp int64, payload, sig []byte) (swarm. if err != nil { return nil, fmt.Errorf("update: %w", err) } - cac, err := toChunk(uint64(timestamp), payload) + cac, err := toChunk(payload) if err != nil { return nil, fmt.Errorf("toChunk: %w", err) } diff --git a/pkg/feeds/getter.go b/pkg/feeds/getter.go index 77b7599dd9e..9fcb2ccae77 100644 --- a/pkg/feeds/getter.go +++ b/pkg/feeds/getter.go @@ -16,6 +16,8 @@ import ( "github.com/ethersphere/bee/v2/pkg/swarm" ) +var errNotLegacyPayload = errors.New("feed update is not in the legacy payload structure") + // Lookup is the interface for time based feed lookup type Lookup interface { At(ctx context.Context, at int64, after uint64) (chunk swarm.Chunk, currentIndex, nextIndex Index, err error) @@ -49,31 +51,47 @@ func (f *Getter) Get(ctx context.Context, i Index) (swarm.Chunk, error) { return f.getter.Get(ctx, addr) } -// FromChunk parses out the timestamp and the payload -func FromChunk(ch swarm.Chunk) (uint64, []byte, error) { - s, err := soc.FromChunk(ch) +func GetWrappedChunk(ctx context.Context, getter storage.Getter, ch swarm.Chunk) (swarm.Chunk, error) { + wc, err := FromChunk(ch) + if err != nil { + return nil, err + } + // try to split the timestamp and reference + // possible values right now: + // unencrypted ref: span+timestamp+ref => 8+8+32=48 + // encrypted ref: span+timestamp+ref+decryptKey => 8+8+64=80 + _, ref, err := LegacyPayload(wc) if err != nil { - return 0, nil, err + if errors.Is(err, errNotLegacyPayload) { + return wc, nil + } + return nil, err } - cac := s.WrappedChunk() - if len(cac.Data()) < 16 { - return 0, nil, errors.New("feed update payload too short") + wc, err = getter.Get(ctx, ref) + if err != nil { + return nil, err } - payload := cac.Data()[16:] - at := binary.BigEndian.Uint64(cac.Data()[8:16]) - return at, payload, nil + + return wc, nil } -// UpdatedAt extracts the time of feed other than update -func UpdatedAt(ch swarm.Chunk) (uint64, error) { - d := ch.Data() - if len(d) < 113 { - return 0, fmt.Errorf("too short: %d", len(d)) +// FromChunk parses out the wrapped chunk +func FromChunk(ch swarm.Chunk) (swarm.Chunk, error) { + s, err := soc.FromChunk(ch) + if err != nil { + return nil, fmt.Errorf("soc unmarshal: %w", err) } - // a soc chunk with time information in the wrapped content addressed chunk - // 0-32 index, - // 65-97 signature, - // 98-105 span of wrapped chunk - // 105-113 timestamp - return binary.BigEndian.Uint64(d[105:113]), nil + return s.WrappedChunk(), nil +} + +// LegacyPayload returns back the referenced chunk and datetime from the legacy feed payload +func LegacyPayload(wrappedChunk swarm.Chunk) (uint64, swarm.Address, error) { + cacData := wrappedChunk.Data() + if !(len(cacData) == 16+swarm.HashSize || len(cacData) == 16+swarm.HashSize*2) { + return 0, swarm.ZeroAddress, errNotLegacyPayload + } + address := swarm.NewAddress(cacData[16:]) + at := binary.BigEndian.Uint64(cacData[8:16]) + + return at, address, nil } diff --git a/pkg/feeds/putter.go b/pkg/feeds/putter.go index 633276f8f63..abe1972a0a5 100644 --- a/pkg/feeds/putter.go +++ b/pkg/feeds/putter.go @@ -6,7 +6,6 @@ package feeds import ( "context" - "encoding/binary" "github.com/ethersphere/bee/v2/pkg/cac" "github.com/ethersphere/bee/v2/pkg/crypto" @@ -39,12 +38,12 @@ func NewPutter(putter storage.Putter, signer crypto.Signer, topic []byte) (*Putt } // Put pushes an update to the feed through the chunk stores -func (u *Putter) Put(ctx context.Context, i Index, at int64, payload []byte) error { +func (u *Putter) Put(ctx context.Context, i Index, payload []byte) error { id, err := u.Feed.Update(i).Id() if err != nil { return err } - cac, err := toChunk(uint64(at), payload) + cac, err := toChunk(payload) if err != nil { return err } @@ -56,8 +55,6 @@ func (u *Putter) Put(ctx context.Context, i Index, at int64, payload []byte) err return u.putter.Put(ctx, ch) } -func toChunk(at uint64, payload []byte) (swarm.Chunk, error) { - ts := make([]byte, 8) - binary.BigEndian.PutUint64(ts, at) - return cac.New(append(ts, payload...)) +func toChunk(payload []byte) (swarm.Chunk, error) { + return cac.New(payload) } diff --git a/pkg/feeds/sequence/sequence.go b/pkg/feeds/sequence/sequence.go index 5361086de4b..5184885f1ab 100644 --- a/pkg/feeds/sequence/sequence.go +++ b/pkg/feeds/sequence/sequence.go @@ -79,14 +79,6 @@ func (f *finder) At(ctx context.Context, at int64, _ uint64) (ch swarm.Chunk, cu } return ch, current, &index{i}, nil } - ts, err := feeds.UpdatedAt(u) - if err != nil { - return nil, nil, nil, err - } - // if index is later than the `at` target index, then return previous chunk and index - if ts > uint64(at) { - return ch, &index{i - 1}, &index{i}, nil - } ch = u } } @@ -267,15 +259,6 @@ func (f *asyncFinder) get(ctx context.Context, at int64, idx uint64) (swarm.Chun // if 'not-found' error, then just silence and return nil chunk return nil, nil } - ts, err := feeds.UpdatedAt(u) - if err != nil { - return nil, err - } - // this means the update timestamp is later than the pivot time we are looking for - // handled as if the update was missing but with no uncertainty due to timeout - if at < int64(ts) { - return nil, nil - } return u, nil } @@ -297,7 +280,7 @@ func NewUpdater(putter storage.Putter, signer crypto.Signer, topic []byte) (feed // Update pushes an update to the feed through the chunk stores func (u *updater) Update(ctx context.Context, at int64, payload []byte) error { - err := u.Put(ctx, &index{u.next}, at, payload) + err := u.Put(ctx, &index{u.next}, payload) if err != nil { return err } diff --git a/pkg/feeds/testing/lookup.go b/pkg/feeds/testing/lookup.go index 8c71098f5a6..8fd852888c8 100644 --- a/pkg/feeds/testing/lookup.go +++ b/pkg/feeds/testing/lookup.go @@ -77,24 +77,22 @@ func TestFinderBasic(t *testing.T, finderf func(storage.Getter, *feeds.Feed) fee if err != nil { t.Fatal(err) } - ch, err := feeds.Latest(ctx, finder, 0) + soc, err := feeds.Latest(ctx, finder, 0) if err != nil { t.Fatal(err) } - if ch == nil { + if soc == nil { t.Fatalf("expected to find update, got none") } exp := payload - ts, payload, err := feeds.FromChunk(ch) + cac, err := feeds.FromChunk(soc) if err != nil { t.Fatal(err) } + payload = cac.Data()[swarm.SpanSize:] if !bytes.Equal(payload, exp) { t.Fatalf("result mismatch. want %8x... got %8x...", exp, payload) } - if ts != uint64(at) { - t.Fatalf("timestamp mismatch: expected %v, got %v", at, ts) - } }) } @@ -157,18 +155,6 @@ func TestFinderIntervals(t *testing.T, nextf func() (bool, int64), finderf func( if ch == nil { t.Fatalf("expected to find update, got none") } - ts, payload, err := feeds.FromChunk(ch) - if err != nil { - t.Fatal(err) - } - content := binary.BigEndian.Uint64(payload) - if content != uint64(at) { - t.Fatalf("payload mismatch: expected %v, got %v", at, content) - } - - if ts != uint64(at) { - t.Fatalf("timestamp mismatch: expected %v, got %v", at, ts) - } if current != nil { expectedId := ch.Data()[:32] diff --git a/pkg/file/joiner/joiner.go b/pkg/file/joiner/joiner.go index fcd7e790c10..c3a317854bc 100644 --- a/pkg/file/joiner/joiner.go +++ b/pkg/file/joiner/joiner.go @@ -116,6 +116,12 @@ func New(ctx context.Context, g storage.Getter, putter storage.Putter, address s return nil, 0, err } + return NewJoiner(ctx, g, putter, address, rootChunk) +} + +// NewJoiner creates a new Joiner with the already fetched root chunk. +// A Joiner provides Read, Seek and Size functionalities. +func NewJoiner(ctx context.Context, g storage.Getter, putter storage.Putter, address swarm.Address, rootChunk swarm.Chunk) (file.Joiner, int64, error) { chunkData := rootChunk.Data() rootData := chunkData[swarm.SpanSize:] refLength := len(address.Bytes()) diff --git a/pkg/file/loadsave/loadsave.go b/pkg/file/loadsave/loadsave.go index 6a2a0bbf782..2899d9ed6ab 100644 --- a/pkg/file/loadsave/loadsave.go +++ b/pkg/file/loadsave/loadsave.go @@ -29,6 +29,7 @@ type loadSave struct { getter storage.Getter putter storage.Putter pipelineFn func() pipeline.Interface + rootCh swarm.Chunk } // New returns a new read-write load-saver. @@ -48,14 +49,33 @@ func NewReadonly(getter storage.Getter) file.LoadSaver { } } +// NewReadonlyWithRootCh returns a new read-only load-saver +// which will error on write. +func NewReadonlyWithRootCh(getter storage.Getter, rootCh swarm.Chunk) file.LoadSaver { + return &loadSave{ + getter: getter, + rootCh: rootCh, + } +} + func (ls *loadSave) Load(ctx context.Context, ref []byte) ([]byte, error) { - j, _, err := joiner.New(ctx, ls.getter, ls.putter, swarm.NewAddress(ref)) - if err != nil { - return nil, err + var j file.Joiner + if ls.rootCh == nil || !bytes.Equal(ls.rootCh.Address().Bytes(), ref[:swarm.HashSize]) { + joiner, _, err := joiner.New(ctx, ls.getter, ls.putter, swarm.NewAddress(ref)) + if err != nil { + return nil, err + } + j = joiner + } else { + joiner, _, err := joiner.NewJoiner(ctx, ls.getter, ls.putter, swarm.NewAddress(ref), ls.rootCh) + if err != nil { + return nil, err + } + j = joiner } buf := bytes.NewBuffer(nil) - _, err = file.JoinReadAll(ctx, j, buf) + _, err := file.JoinReadAll(ctx, j, buf) if err != nil { return nil, err } diff --git a/pkg/node/bootstrap.go b/pkg/node/bootstrap.go index 41e0c0aae18..2ccc0d8972b 100644 --- a/pkg/node/bootstrap.go +++ b/pkg/node/bootstrap.go @@ -196,10 +196,10 @@ func bootstrapNode( logger.Info("bootstrap: trying to fetch stamps snapshot") var ( - snapshotReference swarm.Address - reader file.Joiner - l int64 - eventsJSON []byte + snapshotRootCh swarm.Chunk + reader file.Joiner + l int64 + eventsJSON []byte ) for i := 0; i < getSnapshotRetries; i++ { @@ -210,7 +210,7 @@ func bootstrapNode( ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() - snapshotReference, err = getLatestSnapshot(ctx, localStore.Download(true), snapshotFeed) + snapshotRootCh, err = getLatestSnapshot(ctx, localStore.Download(true), snapshotFeed) if err != nil { logger.Warning("bootstrap: fetching snapshot failed", "error", err) continue @@ -229,7 +229,7 @@ func bootstrapNode( ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() - reader, l, err = joiner.New(ctx, localStore.Download(true), localStore.Cache(), snapshotReference) + reader, l, err = joiner.NewJoiner(ctx, localStore.Download(true), localStore.Cache(), snapshotRootCh.Address(), snapshotRootCh) if err != nil { logger.Warning("bootstrap: file joiner failed", "error", err) continue @@ -278,7 +278,7 @@ func getLatestSnapshot( ctx context.Context, st storage.Getter, address swarm.Address, -) (swarm.Address, error) { +) (swarm.Chunk, error) { ls := loadsave.NewReadonly(st) feedFactory := factory.New(st) @@ -287,12 +287,12 @@ func getLatestSnapshot( ls, ) if err != nil { - return swarm.ZeroAddress, fmt.Errorf("not a manifest: %w", err) + return nil, fmt.Errorf("not a manifest: %w", err) } e, err := m.Lookup(ctx, "/") if err != nil { - return swarm.ZeroAddress, fmt.Errorf("node lookup: %w", err) + return nil, fmt.Errorf("node lookup: %w", err) } var ( @@ -303,42 +303,37 @@ func getLatestSnapshot( if e := meta["swarm-feed-owner"]; e != "" { owner, err = hex.DecodeString(e) if err != nil { - return swarm.ZeroAddress, err + return nil, err } } if e := meta["swarm-feed-topic"]; e != "" { topic, err = hex.DecodeString(e) if err != nil { - return swarm.ZeroAddress, err + return nil, err } } if e := meta["swarm-feed-type"]; e != "" { err := t.FromString(e) if err != nil { - return swarm.ZeroAddress, err + return nil, err } } if len(owner) == 0 || len(topic) == 0 { - return swarm.ZeroAddress, fmt.Errorf("node lookup: %s", "feed metadata absent") + return nil, fmt.Errorf("node lookup: %s", "feed metadata absent") } f := feeds.New(topic, common.BytesToAddress(owner)) l, err := feedFactory.NewLookup(*t, f) if err != nil { - return swarm.ZeroAddress, fmt.Errorf("feed lookup failed: %w", err) + return nil, fmt.Errorf("feed lookup failed: %w", err) } u, _, _, err := l.At(ctx, time.Now().Unix(), 0) if err != nil { - return swarm.ZeroAddress, err - } - - _, ref, err := feeds.FromChunk(u) - if err != nil { - return swarm.ZeroAddress, err + return nil, err } - return swarm.NewAddress(ref), nil + return feeds.GetWrappedChunk(ctx, st, u) } func batchStoreExists(s storage.StateStorer) (bool, error) { diff --git a/pkg/soc/testing/soc.go b/pkg/soc/testing/soc.go index a622bf2d1dc..b6f7e2a7176 100644 --- a/pkg/soc/testing/soc.go +++ b/pkg/soc/testing/soc.go @@ -70,17 +70,35 @@ func GenerateMockSocWithSigner(t *testing.T, data []byte, signer crypto.Signer) func GenerateMockSOC(t *testing.T, data []byte) *MockSOC { t.Helper() - privKey, err := crypto.GenerateSecp256k1Key() + ch, err := cac.New(data) if err != nil { t.Fatal(err) } - signer := crypto.NewDefaultSigner(privKey) - owner, err := signer.EthereumAddress() + + return generateMockSOC(t, ch) +} + +// GenerateMockSOC generates a valid mocked SOC from given chunk data (span + payload). +func GenerateMockSOCWithSpan(t *testing.T, data []byte) *MockSOC { + t.Helper() + + ch, err := cac.NewWithDataSpan(data) if err != nil { t.Fatal(err) } - ch, err := cac.New(data) + return generateMockSOC(t, ch) +} + +func generateMockSOC(t *testing.T, ch swarm.Chunk) *MockSOC { + t.Helper() + + privKey, err := crypto.GenerateSecp256k1Key() + if err != nil { + t.Fatal(err) + } + signer := crypto.NewDefaultSigner(privKey) + owner, err := signer.EthereumAddress() if err != nil { t.Fatal(err) }