diff --git a/openapi/Swarm.yaml b/openapi/Swarm.yaml index 0eb2e44c727..c85425a0b28 100644 --- a/openapi/Swarm.yaml +++ b/openapi/Swarm.yaml @@ -880,11 +880,12 @@ paths: $ref: "SwarmCommon.yaml#/components/schemas/HexString" required: true description: Signature + - $ref: "SwarmCommon.yaml#/components/parameters/SwarmPinParameter" - in: header name: swarm-postage-batch-id schema: $ref: "SwarmCommon.yaml#/components/parameters/SwarmPostageBatchId" - required: true + required: false - $ref: "SwarmCommon.yaml#/components/parameters/SwarmPostageStamp" - $ref: "SwarmCommon.yaml#/components/parameters/SwarmAct" - $ref: "SwarmCommon.yaml#/components/parameters/SwarmActHistoryAddress" @@ -916,47 +917,6 @@ paths: $ref: "SwarmCommon.yaml#/components/responses/500" default: description: Default response - get: - summary: Resolve Single Owner Chunk data - tags: - - Single owner chunk - parameters: - - in: path - name: owner - schema: - $ref: "SwarmCommon.yaml#/components/schemas/EthereumAddress" - required: true - description: Ethereum address of the Owner of the SOC - - in: path - name: id - schema: - $ref: "SwarmCommon.yaml#/components/schemas/HexString" - required: true - description: Arbitrary identifier of the related data - - $ref: "SwarmCommon.yaml#/components/parameters/SwarmOnlyRootChunkParameter" - - $ref: "SwarmCommon.yaml#/components/parameters/SwarmCache" - - $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyStrategyParameter" - - $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyFallbackModeParameter" - - $ref: "SwarmCommon.yaml#/components/parameters/SwarmChunkRetrievalTimeoutParameter" - responses: - "200": - description: Related Single Owner Chunk data - headers: - "swarm-soc-signature": - $ref: "SwarmCommon.yaml#/components/headers/SwarmSocSignature" - content: - application/octet-stream: - schema: - type: string - format: binary - "400": - $ref: "SwarmCommon.yaml#/components/responses/400" - "401": - $ref: "SwarmCommon.yaml#/components/responses/401" - "500": - $ref: "SwarmCommon.yaml#/components/responses/500" - default: - description: Default response "/feeds/{owner}/{topic}": post: @@ -1041,26 +1001,18 @@ paths: $ref: "SwarmCommon.yaml#/components/schemas/FeedType" required: false description: "Feed indexing scheme (default: sequence)" - - $ref: "SwarmCommon.yaml#/components/parameters/SwarmOnlyRootChunkParameter" - - $ref: "SwarmCommon.yaml#/components/parameters/SwarmCache" - - $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyStrategyParameter" - - $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyFallbackModeParameter" - - $ref: "SwarmCommon.yaml#/components/parameters/SwarmChunkRetrievalTimeoutParameter" responses: "200": description: Latest feed update headers: - "swarm-soc-signature": - $ref: "SwarmCommon.yaml#/components/headers/SwarmSocSignature" "swarm-feed-index": $ref: "SwarmCommon.yaml#/components/headers/SwarmFeedIndex" "swarm-feed-index-next": $ref: "SwarmCommon.yaml#/components/headers/SwarmFeedIndexNext" content: - application/octet-stream: + application/json: schema: - type: string - format: binary + $ref: "SwarmCommon.yaml#/components/schemas/ReferenceResponse" "400": $ref: "SwarmCommon.yaml#/components/responses/400" "401": diff --git a/openapi/SwarmCommon.yaml b/openapi/SwarmCommon.yaml index dcd54c3853a..563b52288ff 100644 --- a/openapi/SwarmCommon.yaml +++ b/openapi/SwarmCommon.yaml @@ -1062,11 +1062,6 @@ components: schema: $ref: "#/components/schemas/HexString" - SwarmSocSignature: - description: "Attached digital signature of the Single Owner Chunk" - schema: - $ref: "#/components/schemas/HexString" - SwarmActHistoryAddress: description: "Swarm address reference to the new ACT history entry" schema: @@ -1172,14 +1167,6 @@ components: description: > Specify the timeout for chunk retrieval. The default is 30 seconds. - SwarmOnlyRootChunkParameter: - in: header - name: swarm-only-root-chunk - schema: - type: boolean - required: false - description: "Returns only the root chunk of the content" - ContentTypePreserved: in: header name: Content-Type diff --git a/pkg/api/api.go b/pkg/api/api.go index b226a5cff24..50bc5bb5713 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -75,10 +75,8 @@ const ( SwarmEncryptHeader = "Swarm-Encrypt" SwarmIndexDocumentHeader = "Swarm-Index-Document" SwarmErrorDocumentHeader = "Swarm-Error-Document" - SwarmSocSignatureHeader = "Swarm-Soc-Signature" SwarmFeedIndexHeader = "Swarm-Feed-Index" SwarmFeedIndexNextHeader = "Swarm-Feed-Index-Next" - SwarmOnlyRootChunk = "Swarm-Only-Root-Chunk" SwarmCollectionHeader = "Swarm-Collection" SwarmPostageBatchIdHeader = "Swarm-Postage-Batch-Id" SwarmPostageStampHeader = "Swarm-Postage-Stamp" @@ -528,7 +526,7 @@ func (s *Service) corsHandler(h http.Handler) http.Handler { allowedHeaders := []string{ "User-Agent", "Accept", "X-Requested-With", "Access-Control-Request-Headers", "Access-Control-Request-Method", "Accept-Ranges", "Content-Encoding", AuthorizationHeader, AcceptEncodingHeader, ContentTypeHeader, ContentDispositionHeader, RangeHeader, OriginHeader, - SwarmTagHeader, SwarmPinHeader, SwarmEncryptHeader, SwarmIndexDocumentHeader, SwarmErrorDocumentHeader, SwarmCollectionHeader, SwarmPostageBatchIdHeader, SwarmPostageStampHeader, SwarmDeferredUploadHeader, SwarmRedundancyLevelHeader, SwarmRedundancyStrategyHeader, SwarmRedundancyFallbackModeHeader, SwarmChunkRetrievalTimeoutHeader, SwarmLookAheadBufferSizeHeader, SwarmFeedIndexHeader, SwarmFeedIndexNextHeader, SwarmSocSignatureHeader, SwarmOnlyRootChunk, GasPriceHeader, GasLimitHeader, ImmutableHeader, + SwarmTagHeader, SwarmPinHeader, SwarmEncryptHeader, SwarmIndexDocumentHeader, SwarmErrorDocumentHeader, SwarmCollectionHeader, SwarmPostageBatchIdHeader, SwarmPostageStampHeader, SwarmDeferredUploadHeader, SwarmRedundancyLevelHeader, SwarmRedundancyStrategyHeader, SwarmRedundancyFallbackModeHeader, SwarmChunkRetrievalTimeoutHeader, SwarmLookAheadBufferSizeHeader, SwarmFeedIndexHeader, SwarmFeedIndexNextHeader, GasPriceHeader, GasLimitHeader, ImmutableHeader, } allowedHeadersStr := strings.Join(allowedHeaders, ", ") diff --git a/pkg/api/bytes.go b/pkg/api/bytes.go index d0d2886683b..5a65c5c2fa2 100644 --- a/pkg/api/bytes.go +++ b/pkg/api/bytes.go @@ -182,7 +182,7 @@ func (s *Service) bytesGetHandler(w http.ResponseWriter, r *http.Request) { ContentTypeHeader: {"application/octet-stream"}, } - s.downloadHandler(logger, w, r, address, additionalHeaders, true, false, nil) + s.downloadHandler(logger, w, r, address, additionalHeaders, true, false) } func (s *Service) bytesHeadHandler(w http.ResponseWriter, r *http.Request) { diff --git a/pkg/api/bzz.go b/pkg/api/bzz.go index a07f3f0f54f..9fdb5dda3a0 100644 --- a/pkg/api/bzz.go +++ b/pkg/api/bzz.go @@ -23,7 +23,6 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethersphere/bee/v2/pkg/accesscontrol" "github.com/ethersphere/bee/v2/pkg/feeds" - "github.com/ethersphere/bee/v2/pkg/file" "github.com/ethersphere/bee/v2/pkg/file/joiner" "github.com/ethersphere/bee/v2/pkg/file/loadsave" "github.com/ethersphere/bee/v2/pkg/file/redundancy" @@ -426,17 +425,14 @@ FETCH: jsonhttp.NotFound(w, "no update found") return } - wc, err := feeds.GetWrappedChunk(ctx, s.storer.ChunkStore(), ch) + ref, _, err := parseFeedUpdate(ch) if err != nil { logger.Debug("bzz download: mapStructure feed update failed", "error", err) logger.Error(nil, "bzz download: mapStructure feed update failed") jsonhttp.InternalServerError(w, "mapStructure feed update") return } - address = wc.Address() - // modify ls and init with non-existing wrapped chunk - ls = loadsave.NewReadonlyWithRootCh(s.storer.Download(cache), wc) - + address = ref feedDereferenced = true curBytes, err := cur.MarshalBinary() if err != nil { @@ -558,11 +554,11 @@ func (s *Service) serveManifestEntry( additionalHeaders[ContentTypeHeader] = []string{mimeType} } - s.downloadHandler(logger, w, r, manifestEntry.Reference(), additionalHeaders, etag, headersOnly, nil) + s.downloadHandler(logger, w, r, manifestEntry.Reference(), additionalHeaders, etag, headersOnly) } // downloadHandler contains common logic for downloading Swarm file from API -func (s *Service) downloadHandler(logger log.Logger, w http.ResponseWriter, r *http.Request, reference swarm.Address, additionalHeaders http.Header, etag, headersOnly bool, rootCh swarm.Chunk) { +func (s *Service) downloadHandler(logger log.Logger, w http.ResponseWriter, r *http.Request, reference swarm.Address, additionalHeaders http.Header, etag, headersOnly bool) { headers := struct { Strategy *getter.Strategy `map:"Swarm-Redundancy-Strategy"` RLevel *redundancy.Level `map:"Swarm-Redundancy-Level"` @@ -592,15 +588,7 @@ func (s *Service) downloadHandler(logger log.Logger, w http.ResponseWriter, r *h ctx = redundancy.SetLevelInContext(ctx, *headers.RLevel) } - var ( - reader file.Joiner - l int64 - ) - if rootCh != nil { - reader, l, err = joiner.NewJoiner(ctx, s.storer.Download(cache), s.storer.Cache(), reference, rootCh) - } else { - reader, l, err = joiner.New(ctx, s.storer.Download(cache), s.storer.Cache(), reference) - } + reader, l, err := joiner.New(ctx, s.storer.Download(cache), s.storer.Cache(), reference) if err != nil { if errors.Is(err, storage.ErrNotFound) || errors.Is(err, topology.ErrNotFound) { logger.Debug("api download: not found ", "address", reference, "error", err) diff --git a/pkg/api/feed.go b/pkg/api/feed.go index 7c92f3fd855..7750fd7605c 100644 --- a/pkg/api/feed.go +++ b/pkg/api/feed.go @@ -5,13 +5,11 @@ package api import ( - "bytes" + "encoding/binary" "encoding/hex" "errors" - "io" + "fmt" "net/http" - "strconv" - "strings" "time" "github.com/ethereum/go-ethereum/common" @@ -36,6 +34,8 @@ const ( feedMetadataEntryType = "swarm-feed-type" ) +var errInvalidFeedUpdate = errors.New("invalid feed update") + type feedReferenceResponse struct { Reference swarm.Address `json:"reference"` } @@ -64,14 +64,6 @@ func (s *Service) feedGetHandler(w http.ResponseWriter, r *http.Request) { queries.At = time.Now().Unix() } - headers := struct { - OnlyRootChunk bool `map:"Swarm-Only-Root-Chunk"` - }{} - if response := s.mapStructure(r.Header, &headers); response != nil { - response("invalid header params", logger, w) - return - } - f := feeds.New(paths.Topic, paths.Owner) lookup, err := s.feedFactory.NewLookup(feeds.Sequence, f) if err != nil { @@ -102,10 +94,11 @@ func (s *Service) feedGetHandler(w http.ResponseWriter, r *http.Request) { return } - wc, err := feeds.GetWrappedChunk(r.Context(), s.storer.ChunkStore(), ch) + ref, _, err := parseFeedUpdate(ch) if err != nil { - logger.Error(nil, "wrapped chunk cannot be retrieved") - jsonhttp.NotFound(w, "wrapped chunk cannot be retrieved") + logger.Debug("mapStructure feed update failed", "error", err) + logger.Error(nil, "mapStructure feed update failed") + jsonhttp.InternalServerError(w, "mapStructure feed update failed") return } @@ -125,33 +118,11 @@ func (s *Service) feedGetHandler(w http.ResponseWriter, r *http.Request) { return } - socCh, err := soc.FromChunk(ch) - if err != nil { - logger.Error(nil, "wrapped chunk cannot be retrieved") - jsonhttp.NotFound(w, "wrapped chunk cannot be retrieved") - return - } - sig := socCh.Signature() - - additionalHeaders := http.Header{ - ContentTypeHeader: {"application/octet-stream"}, - SwarmFeedIndexHeader: {hex.EncodeToString(curBytes)}, - SwarmFeedIndexNextHeader: {hex.EncodeToString(nextBytes)}, - SwarmSocSignatureHeader: {hex.EncodeToString(sig)}, - "Access-Control-Expose-Headers": {SwarmFeedIndexHeader, SwarmFeedIndexNextHeader, SwarmSocSignatureHeader}, - } - - if headers.OnlyRootChunk { - w.Header().Set(ContentLengthHeader, strconv.Itoa(len(wc.Data()))) - // include additional headers - for name, values := range additionalHeaders { - w.Header().Set(name, strings.Join(values, ", ")) - } - _, _ = io.Copy(w, bytes.NewReader(wc.Data())) - return - } + w.Header().Set(SwarmFeedIndexHeader, hex.EncodeToString(curBytes)) + w.Header().Set(SwarmFeedIndexNextHeader, hex.EncodeToString(nextBytes)) + w.Header().Set("Access-Control-Expose-Headers", fmt.Sprintf("%s, %s", SwarmFeedIndexHeader, SwarmFeedIndexNextHeader)) - s.downloadHandler(logger, w, r, wc.Address(), additionalHeaders, true, false, wc) + jsonhttp.OK(w, feedReferenceResponse{Reference: ref}) } func (s *Service) feedPostHandler(w http.ResponseWriter, r *http.Request) { @@ -307,3 +278,22 @@ func (s *Service) feedPostHandler(w http.ResponseWriter, r *http.Request) { jsonhttp.Created(w, feedReferenceResponse{Reference: encryptedReference}) } + +func parseFeedUpdate(ch swarm.Chunk) (swarm.Address, int64, error) { + s, err := soc.FromChunk(ch) + if err != nil { + return swarm.ZeroAddress, 0, fmt.Errorf("soc unmarshal: %w", err) + } + + update := s.WrappedChunk().Data() + // split the timestamp and reference + // possible values right now: + // unencrypted ref: span+timestamp+ref => 8+8+32=48 + // encrypted ref: span+timestamp+ref+decryptKey => 8+8+64=80 + if len(update) != 48 && len(update) != 80 { + return swarm.ZeroAddress, 0, errInvalidFeedUpdate + } + ts := binary.BigEndian.Uint64(update[8:16]) + ref := swarm.NewAddress(update[16:]) + return ref, int64(ts), nil +} diff --git a/pkg/api/feed_test.go b/pkg/api/feed_test.go index 843756d7237..a35b9ce3423 100644 --- a/pkg/api/feed_test.go +++ b/pkg/api/feed_test.go @@ -5,13 +5,11 @@ package api_test import ( - "bytes" "context" "encoding/binary" "encoding/hex" "errors" "fmt" - "io" "math/big" "net/http" "testing" @@ -19,7 +17,6 @@ import ( "github.com/ethersphere/bee/v2/pkg/api" "github.com/ethersphere/bee/v2/pkg/feeds" "github.com/ethersphere/bee/v2/pkg/file/loadsave" - "github.com/ethersphere/bee/v2/pkg/file/splitter" "github.com/ethersphere/bee/v2/pkg/jsonhttp" "github.com/ethersphere/bee/v2/pkg/jsonhttp/jsonhttptest" "github.com/ethersphere/bee/v2/pkg/log" @@ -27,10 +24,8 @@ import ( "github.com/ethersphere/bee/v2/pkg/postage" mockpost "github.com/ethersphere/bee/v2/pkg/postage/mock" testingsoc "github.com/ethersphere/bee/v2/pkg/soc/testing" - testingc "github.com/ethersphere/bee/v2/pkg/storage/testing" mockstorer "github.com/ethersphere/bee/v2/pkg/storer/mock" "github.com/ethersphere/bee/v2/pkg/swarm" - "github.com/ethersphere/bee/v2/pkg/util/testutil" ) const ownerString = "8d3766440f0d7b949a5e32995d09619a7f86e632" @@ -49,22 +44,13 @@ func TestFeed_Get(t *testing.T) { } mockStorer = mockstorer.New() ) - putter, err := mockStorer.Upload(context.Background(), false, 0) - if err != nil { - t.Fatal(err) - } - mockWrappedCh := testingc.FixtureChunk("0033") - err = putter.Put(context.Background(), mockWrappedCh) - if err != nil { - t.Fatal(err) - } t.Run("with at", func(t *testing.T) { t.Parallel() var ( timestamp = int64(12121212) - ch = toChunk(t, uint64(timestamp), mockWrappedCh.Address().Bytes()) + ch = toChunk(t, uint64(timestamp), expReference.Bytes()) look = newMockLookup(12, 0, ch, nil, &id{}, &id{}) factory = newMockFactory(look) idBytes, _ = (&id{}).MarshalBinary() @@ -75,7 +61,7 @@ func TestFeed_Get(t *testing.T) { ) jsonhttptest.Request(t, client, http.MethodGet, feedResource(ownerString, "aabbcc", "12"), http.StatusOK, - jsonhttptest.WithExpectedResponse(mockWrappedCh.Data()[swarm.SpanSize:]), + jsonhttptest.WithExpectedJSONResponse(api.FeedReferenceResponse{Reference: expReference}), jsonhttptest.WithExpectedResponseHeader(api.SwarmFeedIndexHeader, hex.EncodeToString(idBytes)), ) }) @@ -85,7 +71,7 @@ func TestFeed_Get(t *testing.T) { var ( timestamp = int64(12121212) - ch = toChunk(t, uint64(timestamp), mockWrappedCh.Address().Bytes()) + ch = toChunk(t, uint64(timestamp), expReference.Bytes()) look = newMockLookup(-1, 2, ch, nil, &id{}, &id{}) factory = newMockFactory(look) idBytes, _ = (&id{}).MarshalBinary() @@ -97,102 +83,10 @@ func TestFeed_Get(t *testing.T) { ) jsonhttptest.Request(t, client, http.MethodGet, feedResource(ownerString, "aabbcc", ""), http.StatusOK, - jsonhttptest.WithExpectedResponse(mockWrappedCh.Data()[swarm.SpanSize:]), - jsonhttptest.WithExpectedContentLength(len(mockWrappedCh.Data()[swarm.SpanSize:])), + jsonhttptest.WithExpectedJSONResponse(api.FeedReferenceResponse{Reference: expReference}), jsonhttptest.WithExpectedResponseHeader(api.SwarmFeedIndexHeader, hex.EncodeToString(idBytes)), ) }) - - t.Run("chunk wrapping", func(t *testing.T) { - t.Parallel() - - testData := []byte{0, 1, 2, 3, 4, 5, 6, 7, 8} - - var ( - ch = testingsoc.GenerateMockSOC(t, testData).Chunk() - look = newMockLookup(-1, 2, ch, nil, &id{}, &id{}) - factory = newMockFactory(look) - idBytes, _ = (&id{}).MarshalBinary() - - client, _, _, _ = newTestServer(t, testServerOptions{ - Storer: mockStorer, - Feeds: factory, - }) - ) - - jsonhttptest.Request(t, client, http.MethodGet, feedResource(ownerString, "aabbcc", ""), http.StatusOK, - jsonhttptest.WithExpectedResponse(testData), - jsonhttptest.WithExpectedContentLength(len(testData)), - jsonhttptest.WithExpectedResponseHeader(api.SwarmFeedIndexHeader, hex.EncodeToString(idBytes)), - ) - }) - - t.Run("legacy payload with non existing wrapped chunk", func(t *testing.T) { - t.Parallel() - - wrappedRef := make([]byte, swarm.HashSize) - _ = copy(wrappedRef, mockWrappedCh.Address().Bytes()) - wrappedRef[0]++ - - var ( - ch = toChunk(t, uint64(12121212), wrappedRef) - look = newMockLookup(-1, 2, ch, nil, &id{}, &id{}) - factory = newMockFactory(look) - - client, _, _, _ = newTestServer(t, testServerOptions{ - Storer: mockStorer, - Feeds: factory, - }) - ) - - jsonhttptest.Request(t, client, http.MethodGet, feedResource(ownerString, "aabbcc", ""), http.StatusNotFound) - }) - - t.Run("bigger payload than one chunk", func(t *testing.T) { - t.Parallel() - - testDataLen := 5000 - testData := testutil.RandBytesWithSeed(t, testDataLen, 1) - s := splitter.NewSimpleSplitter(putter) - addr, err := s.Split(context.Background(), io.NopCloser(bytes.NewReader(testData)), int64(testDataLen), false) - if err != nil { - t.Fatal(err) - } - - // get root ch addr then add wrap it with soc - testRootCh, err := mockStorer.ChunkStore().Get(context.Background(), addr) - if err != nil { - t.Fatal(err) - } - var ( - ch = testingsoc.GenerateMockSOCWithSpan(t, testRootCh.Data()).Chunk() - look = newMockLookup(-1, 2, ch, nil, &id{}, &id{}) - factory = newMockFactory(look) - idBytes, _ = (&id{}).MarshalBinary() - - client, _, _, _ = newTestServer(t, testServerOptions{ - Storer: mockStorer, - Feeds: factory, - }) - ) - - t.Run("retrieve chunk tree", func(t *testing.T) { - jsonhttptest.Request(t, client, http.MethodGet, feedResource(ownerString, "aabbcc", ""), http.StatusOK, - jsonhttptest.WithExpectedResponse(testData), - jsonhttptest.WithExpectedContentLength(testDataLen), - jsonhttptest.WithExpectedResponseHeader(api.SwarmFeedIndexHeader, hex.EncodeToString(idBytes)), - ) - }) - - t.Run("retrieve only wrapped chunk", func(t *testing.T) { - jsonhttptest.Request(t, client, http.MethodGet, feedResource(ownerString, "aabbcc", ""), http.StatusOK, - jsonhttptest.WithRequestHeader(api.SwarmOnlyRootChunk, "true"), - jsonhttptest.WithExpectedResponse(testRootCh.Data()), - jsonhttptest.WithExpectedContentLength(len(testRootCh.Data())), - jsonhttptest.WithExpectedResponseHeader(api.SwarmFeedIndexHeader, hex.EncodeToString(idBytes)), - ) - }) - }) } // nolint:paralleltest diff --git a/pkg/api/router.go b/pkg/api/router.go index 8cffd8ac32e..3c1c1604c6e 100644 --- a/pkg/api/router.go +++ b/pkg/api/router.go @@ -62,13 +62,9 @@ func (s *Service) EnableFullAPI() { "/bzz", "/bytes", "/chunks", - "/feeds", - "/soc", rootPath + "/bzz", rootPath + "/bytes", rootPath + "/chunks", - rootPath + "/feeds", - rootPath + "/soc", } return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -280,7 +276,6 @@ func (s *Service) mountAPI() { }) handle("/soc/{owner}/{id}", jsonhttp.MethodHandler{ - "GET": http.HandlerFunc(s.socGetHandler), "POST": web.ChainHandlers( jsonhttp.NewMaxBodyBytesHandler(swarm.ChunkWithSpanSize), web.FinalHandlerFunc(s.socUploadHandler), diff --git a/pkg/api/soc.go b/pkg/api/soc.go index 9f0f838bfc9..d36ae201c2b 100644 --- a/pkg/api/soc.go +++ b/pkg/api/soc.go @@ -5,13 +5,9 @@ package api import ( - "bytes" - "encoding/hex" "errors" "io" "net/http" - "strconv" - "strings" "github.com/ethersphere/bee/v2/pkg/accesscontrol" "github.com/ethersphere/bee/v2/pkg/cac" @@ -211,66 +207,3 @@ func (s *Service) socUploadHandler(w http.ResponseWriter, r *http.Request) { jsonhttp.Created(w, socPostResponse{Reference: reference}) } - -func (s *Service) socGetHandler(w http.ResponseWriter, r *http.Request) { - logger := s.logger.WithName("get_soc").Build() - - paths := struct { - Owner []byte `map:"owner" validate:"required"` - ID []byte `map:"id" validate:"required"` - }{} - if response := s.mapStructure(mux.Vars(r), &paths); response != nil { - response("invalid path params", logger, w) - return - } - - headers := struct { - OnlyRootChunk bool `map:"Swarm-Only-Root-Chunk"` - }{} - if response := s.mapStructure(r.Header, &headers); response != nil { - response("invalid header params", logger, w) - return - } - - address, err := soc.CreateAddress(paths.ID, paths.Owner) - if err != nil { - logger.Error(err, "soc address cannot be created") - jsonhttp.BadRequest(w, "soc address cannot be created") - return - } - - getter := s.storer.Download(true) - sch, err := getter.Get(r.Context(), address) - if err != nil { - logger.Error(err, "soc retrieval has been failed") - jsonhttp.NotFound(w, "requested chunk cannot be retrieved") - return - } - socCh, err := soc.FromChunk(sch) - if err != nil { - logger.Error(err, "chunk is not a single owner chunk") - jsonhttp.InternalServerError(w, "chunk is not a single owner chunk") - return - } - - sig := socCh.Signature() - wc := socCh.WrappedChunk() - - additionalHeaders := http.Header{ - ContentTypeHeader: {"application/octet-stream"}, - SwarmSocSignatureHeader: {hex.EncodeToString(sig)}, - "Access-Control-Expose-Headers": {SwarmSocSignatureHeader}, - } - - if headers.OnlyRootChunk { - w.Header().Set(ContentLengthHeader, strconv.Itoa(len(wc.Data()))) - // include additional headers - for name, values := range additionalHeaders { - w.Header().Set(name, strings.Join(values, ", ")) - } - _, _ = io.Copy(w, bytes.NewReader(wc.Data())) - return - } - - s.downloadHandler(logger, w, r, wc.Address(), additionalHeaders, true, false, wc) -} diff --git a/pkg/api/soc_test.go b/pkg/api/soc_test.go index 6c0d6fa0449..cf2b3fa5f66 100644 --- a/pkg/api/soc_test.go +++ b/pkg/api/soc_test.go @@ -97,31 +97,16 @@ func TestSOC(t *testing.T) { ) // try to fetch the same chunk - t.Run("chunks fetch", func(t *testing.T) { - rsrc := fmt.Sprintf("/chunks/%s", s.Address().String()) - resp := request(t, client, http.MethodGet, rsrc, nil, http.StatusOK) - data, err := io.ReadAll(resp.Body) - if err != nil { - t.Fatal(err) - } - - if !bytes.Equal(s.Chunk().Data(), data) { - t.Fatal("data retrieved doesn't match uploaded content") - } - }) - - t.Run("soc fetch", func(t *testing.T) { - rsrc := fmt.Sprintf("/soc/%s/%s", hex.EncodeToString(s.Owner), hex.EncodeToString(s.ID)) - resp := request(t, client, http.MethodGet, rsrc, nil, http.StatusOK) - data, err := io.ReadAll(resp.Body) - if err != nil { - t.Fatal(err) - } - - if !bytes.Equal(s.WrappedChunk.Data()[swarm.SpanSize:], data) { - t.Fatal("data retrieved doesn't match uploaded content") - } - }) + rsrc := fmt.Sprintf("/chunks/" + s.Address().String()) + resp := request(t, client, http.MethodGet, rsrc, nil, http.StatusOK) + data, err := io.ReadAll(resp.Body) + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(s.Chunk().Data(), data) { + t.Fatal("data retrieved doesn't match uploaded content") + } }) t.Run("postage", func(t *testing.T) { diff --git a/pkg/feeds/epochs/finder.go b/pkg/feeds/epochs/finder.go index d5133fdaa57..a85ab309e76 100644 --- a/pkg/feeds/epochs/finder.go +++ b/pkg/feeds/epochs/finder.go @@ -51,7 +51,10 @@ func (f *finder) common(ctx context.Context, at int64, after uint64) (*epoch, sw } return e, nil, err } - ts := e.length() * e.start + ts, err := feeds.UpdatedAt(ch) + if err != nil { + return e, nil, err + } if ts <= uint64(at) { return e, ch, nil } @@ -75,7 +78,10 @@ func (f *finder) at(ctx context.Context, at uint64, e *epoch, ch swarm.Chunk) (s } // epoch found // check if timestamp is later then target - ts := e.length() * e.start + ts, err := feeds.UpdatedAt(uch) + if err != nil { + return nil, err + } if ts > at { if e.isLeft() { return ch, nil @@ -125,7 +131,10 @@ func (f *asyncFinder) get(ctx context.Context, at int64, e *epoch) (swarm.Chunk, } return nil, nil } - ts := e.length() * e.start + ts, err := feeds.UpdatedAt(u) + if err != nil { + return nil, err + } diff := at - int64(ts) if diff < 0 { return nil, nil diff --git a/pkg/feeds/epochs/updater.go b/pkg/feeds/epochs/updater.go index f3588b98f1b..b36d77e7d96 100644 --- a/pkg/feeds/epochs/updater.go +++ b/pkg/feeds/epochs/updater.go @@ -34,7 +34,7 @@ func NewUpdater(putter storage.Putter, signer crypto.Signer, topic []byte) (feed // Update pushes an update to the feed through the chunk stores func (u *updater) Update(ctx context.Context, at int64, payload []byte) error { e := next(u.epoch, u.last, uint64(at)) - err := u.Put(ctx, e, payload) + err := u.Put(ctx, e, at, payload) if err != nil { return err } diff --git a/pkg/feeds/feed.go b/pkg/feeds/feed.go index ac8d232f5ce..7c9495cdfac 100644 --- a/pkg/feeds/feed.go +++ b/pkg/feeds/feed.go @@ -107,7 +107,7 @@ func NewUpdate(f *Feed, idx Index, timestamp int64, payload, sig []byte) (swarm. if err != nil { return nil, fmt.Errorf("update: %w", err) } - cac, err := toChunk(payload) + cac, err := toChunk(uint64(timestamp), payload) if err != nil { return nil, fmt.Errorf("toChunk: %w", err) } diff --git a/pkg/feeds/getter.go b/pkg/feeds/getter.go index 9fcb2ccae77..77b7599dd9e 100644 --- a/pkg/feeds/getter.go +++ b/pkg/feeds/getter.go @@ -16,8 +16,6 @@ import ( "github.com/ethersphere/bee/v2/pkg/swarm" ) -var errNotLegacyPayload = errors.New("feed update is not in the legacy payload structure") - // Lookup is the interface for time based feed lookup type Lookup interface { At(ctx context.Context, at int64, after uint64) (chunk swarm.Chunk, currentIndex, nextIndex Index, err error) @@ -51,47 +49,31 @@ func (f *Getter) Get(ctx context.Context, i Index) (swarm.Chunk, error) { return f.getter.Get(ctx, addr) } -func GetWrappedChunk(ctx context.Context, getter storage.Getter, ch swarm.Chunk) (swarm.Chunk, error) { - wc, err := FromChunk(ch) - if err != nil { - return nil, err - } - // try to split the timestamp and reference - // possible values right now: - // unencrypted ref: span+timestamp+ref => 8+8+32=48 - // encrypted ref: span+timestamp+ref+decryptKey => 8+8+64=80 - _, ref, err := LegacyPayload(wc) - if err != nil { - if errors.Is(err, errNotLegacyPayload) { - return wc, nil - } - return nil, err - } - wc, err = getter.Get(ctx, ref) - if err != nil { - return nil, err - } - - return wc, nil -} - -// FromChunk parses out the wrapped chunk -func FromChunk(ch swarm.Chunk) (swarm.Chunk, error) { +// FromChunk parses out the timestamp and the payload +func FromChunk(ch swarm.Chunk) (uint64, []byte, error) { s, err := soc.FromChunk(ch) if err != nil { - return nil, fmt.Errorf("soc unmarshal: %w", err) + return 0, nil, err + } + cac := s.WrappedChunk() + if len(cac.Data()) < 16 { + return 0, nil, errors.New("feed update payload too short") } - return s.WrappedChunk(), nil + payload := cac.Data()[16:] + at := binary.BigEndian.Uint64(cac.Data()[8:16]) + return at, payload, nil } -// LegacyPayload returns back the referenced chunk and datetime from the legacy feed payload -func LegacyPayload(wrappedChunk swarm.Chunk) (uint64, swarm.Address, error) { - cacData := wrappedChunk.Data() - if !(len(cacData) == 16+swarm.HashSize || len(cacData) == 16+swarm.HashSize*2) { - return 0, swarm.ZeroAddress, errNotLegacyPayload +// UpdatedAt extracts the time of feed other than update +func UpdatedAt(ch swarm.Chunk) (uint64, error) { + d := ch.Data() + if len(d) < 113 { + return 0, fmt.Errorf("too short: %d", len(d)) } - address := swarm.NewAddress(cacData[16:]) - at := binary.BigEndian.Uint64(cacData[8:16]) - - return at, address, nil + // a soc chunk with time information in the wrapped content addressed chunk + // 0-32 index, + // 65-97 signature, + // 98-105 span of wrapped chunk + // 105-113 timestamp + return binary.BigEndian.Uint64(d[105:113]), nil } diff --git a/pkg/feeds/putter.go b/pkg/feeds/putter.go index abe1972a0a5..633276f8f63 100644 --- a/pkg/feeds/putter.go +++ b/pkg/feeds/putter.go @@ -6,6 +6,7 @@ package feeds import ( "context" + "encoding/binary" "github.com/ethersphere/bee/v2/pkg/cac" "github.com/ethersphere/bee/v2/pkg/crypto" @@ -38,12 +39,12 @@ func NewPutter(putter storage.Putter, signer crypto.Signer, topic []byte) (*Putt } // Put pushes an update to the feed through the chunk stores -func (u *Putter) Put(ctx context.Context, i Index, payload []byte) error { +func (u *Putter) Put(ctx context.Context, i Index, at int64, payload []byte) error { id, err := u.Feed.Update(i).Id() if err != nil { return err } - cac, err := toChunk(payload) + cac, err := toChunk(uint64(at), payload) if err != nil { return err } @@ -55,6 +56,8 @@ func (u *Putter) Put(ctx context.Context, i Index, payload []byte) error { return u.putter.Put(ctx, ch) } -func toChunk(payload []byte) (swarm.Chunk, error) { - return cac.New(payload) +func toChunk(at uint64, payload []byte) (swarm.Chunk, error) { + ts := make([]byte, 8) + binary.BigEndian.PutUint64(ts, at) + return cac.New(append(ts, payload...)) } diff --git a/pkg/feeds/sequence/sequence.go b/pkg/feeds/sequence/sequence.go index f1f254309ca..07667ce3bd8 100644 --- a/pkg/feeds/sequence/sequence.go +++ b/pkg/feeds/sequence/sequence.go @@ -79,6 +79,14 @@ func (f *finder) At(ctx context.Context, at int64, _ uint64) (ch swarm.Chunk, cu } return ch, current, &index{i}, nil } + ts, err := feeds.UpdatedAt(u) + if err != nil { + return nil, nil, nil, err + } + // if index is later than the `at` target index, then return previous chunk and index + if ts > uint64(at) { + return ch, &index{i - 1}, &index{i}, nil + } ch = u } } @@ -259,6 +267,15 @@ func (f *asyncFinder) get(ctx context.Context, at int64, idx uint64) (swarm.Chun // if 'not-found' error, then just silence and return nil chunk return nil, nil } + ts, err := feeds.UpdatedAt(u) + if err != nil { + return nil, err + } + // this means the update timestamp is later than the pivot time we are looking for + // handled as if the update was missing but with no uncertainty due to timeout + if at < int64(ts) { + return nil, nil + } return u, nil } @@ -280,7 +297,7 @@ func NewUpdater(putter storage.Putter, signer crypto.Signer, topic []byte) (feed // Update pushes an update to the feed through the chunk stores func (u *updater) Update(ctx context.Context, at int64, payload []byte) error { - err := u.Put(ctx, &index{u.next}, payload) + err := u.Put(ctx, &index{u.next}, at, payload) if err != nil { return err } diff --git a/pkg/feeds/testing/lookup.go b/pkg/feeds/testing/lookup.go index 21656be5b64..18739e6a0ab 100644 --- a/pkg/feeds/testing/lookup.go +++ b/pkg/feeds/testing/lookup.go @@ -77,22 +77,24 @@ func TestFinderBasic(t *testing.T, finderf func(storage.Getter, *feeds.Feed) fee if err != nil { t.Fatal(err) } - soc, err := feeds.Latest(ctx, finder, 0) + ch, err := feeds.Latest(ctx, finder, 0) if err != nil { t.Fatal(err) } - if soc == nil { + if ch == nil { t.Fatalf("expected to find update, got none") } exp := payload - cac, err := feeds.FromChunk(soc) + ts, payload, err := feeds.FromChunk(ch) if err != nil { t.Fatal(err) } - payload = cac.Data()[swarm.SpanSize:] if !bytes.Equal(payload, exp) { t.Fatalf("result mismatch. want %8x... got %8x...", exp, payload) } + if ts != uint64(at) { + t.Fatalf("timestamp mismatch: expected %v, got %v", at, ts) + } }) } @@ -154,6 +156,18 @@ func TestFinderIntervals(t *testing.T, nextf func() (bool, int64), finderf func( if ch == nil { t.Fatalf("expected to find update, got none") } + ts, payload, err := feeds.FromChunk(ch) + if err != nil { + t.Fatal(err) + } + content := binary.BigEndian.Uint64(payload) + if content != uint64(at) { + t.Fatalf("payload mismatch: expected %v, got %v", at, content) + } + + if ts != uint64(at) { + t.Fatalf("timestamp mismatch: expected %v, got %v", at, ts) + } if current != nil { expectedId := ch.Data()[:32] diff --git a/pkg/file/joiner/joiner.go b/pkg/file/joiner/joiner.go index c3a317854bc..fcd7e790c10 100644 --- a/pkg/file/joiner/joiner.go +++ b/pkg/file/joiner/joiner.go @@ -116,12 +116,6 @@ func New(ctx context.Context, g storage.Getter, putter storage.Putter, address s return nil, 0, err } - return NewJoiner(ctx, g, putter, address, rootChunk) -} - -// NewJoiner creates a new Joiner with the already fetched root chunk. -// A Joiner provides Read, Seek and Size functionalities. -func NewJoiner(ctx context.Context, g storage.Getter, putter storage.Putter, address swarm.Address, rootChunk swarm.Chunk) (file.Joiner, int64, error) { chunkData := rootChunk.Data() rootData := chunkData[swarm.SpanSize:] refLength := len(address.Bytes()) diff --git a/pkg/file/loadsave/loadsave.go b/pkg/file/loadsave/loadsave.go index 2899d9ed6ab..6a2a0bbf782 100644 --- a/pkg/file/loadsave/loadsave.go +++ b/pkg/file/loadsave/loadsave.go @@ -29,7 +29,6 @@ type loadSave struct { getter storage.Getter putter storage.Putter pipelineFn func() pipeline.Interface - rootCh swarm.Chunk } // New returns a new read-write load-saver. @@ -49,33 +48,14 @@ func NewReadonly(getter storage.Getter) file.LoadSaver { } } -// NewReadonlyWithRootCh returns a new read-only load-saver -// which will error on write. -func NewReadonlyWithRootCh(getter storage.Getter, rootCh swarm.Chunk) file.LoadSaver { - return &loadSave{ - getter: getter, - rootCh: rootCh, - } -} - func (ls *loadSave) Load(ctx context.Context, ref []byte) ([]byte, error) { - var j file.Joiner - if ls.rootCh == nil || !bytes.Equal(ls.rootCh.Address().Bytes(), ref[:swarm.HashSize]) { - joiner, _, err := joiner.New(ctx, ls.getter, ls.putter, swarm.NewAddress(ref)) - if err != nil { - return nil, err - } - j = joiner - } else { - joiner, _, err := joiner.NewJoiner(ctx, ls.getter, ls.putter, swarm.NewAddress(ref), ls.rootCh) - if err != nil { - return nil, err - } - j = joiner + j, _, err := joiner.New(ctx, ls.getter, ls.putter, swarm.NewAddress(ref)) + if err != nil { + return nil, err } buf := bytes.NewBuffer(nil) - _, err := file.JoinReadAll(ctx, j, buf) + _, err = file.JoinReadAll(ctx, j, buf) if err != nil { return nil, err } diff --git a/pkg/node/bootstrap.go b/pkg/node/bootstrap.go index 2ccc0d8972b..41e0c0aae18 100644 --- a/pkg/node/bootstrap.go +++ b/pkg/node/bootstrap.go @@ -196,10 +196,10 @@ func bootstrapNode( logger.Info("bootstrap: trying to fetch stamps snapshot") var ( - snapshotRootCh swarm.Chunk - reader file.Joiner - l int64 - eventsJSON []byte + snapshotReference swarm.Address + reader file.Joiner + l int64 + eventsJSON []byte ) for i := 0; i < getSnapshotRetries; i++ { @@ -210,7 +210,7 @@ func bootstrapNode( ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() - snapshotRootCh, err = getLatestSnapshot(ctx, localStore.Download(true), snapshotFeed) + snapshotReference, err = getLatestSnapshot(ctx, localStore.Download(true), snapshotFeed) if err != nil { logger.Warning("bootstrap: fetching snapshot failed", "error", err) continue @@ -229,7 +229,7 @@ func bootstrapNode( ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() - reader, l, err = joiner.NewJoiner(ctx, localStore.Download(true), localStore.Cache(), snapshotRootCh.Address(), snapshotRootCh) + reader, l, err = joiner.New(ctx, localStore.Download(true), localStore.Cache(), snapshotReference) if err != nil { logger.Warning("bootstrap: file joiner failed", "error", err) continue @@ -278,7 +278,7 @@ func getLatestSnapshot( ctx context.Context, st storage.Getter, address swarm.Address, -) (swarm.Chunk, error) { +) (swarm.Address, error) { ls := loadsave.NewReadonly(st) feedFactory := factory.New(st) @@ -287,12 +287,12 @@ func getLatestSnapshot( ls, ) if err != nil { - return nil, fmt.Errorf("not a manifest: %w", err) + return swarm.ZeroAddress, fmt.Errorf("not a manifest: %w", err) } e, err := m.Lookup(ctx, "/") if err != nil { - return nil, fmt.Errorf("node lookup: %w", err) + return swarm.ZeroAddress, fmt.Errorf("node lookup: %w", err) } var ( @@ -303,37 +303,42 @@ func getLatestSnapshot( if e := meta["swarm-feed-owner"]; e != "" { owner, err = hex.DecodeString(e) if err != nil { - return nil, err + return swarm.ZeroAddress, err } } if e := meta["swarm-feed-topic"]; e != "" { topic, err = hex.DecodeString(e) if err != nil { - return nil, err + return swarm.ZeroAddress, err } } if e := meta["swarm-feed-type"]; e != "" { err := t.FromString(e) if err != nil { - return nil, err + return swarm.ZeroAddress, err } } if len(owner) == 0 || len(topic) == 0 { - return nil, fmt.Errorf("node lookup: %s", "feed metadata absent") + return swarm.ZeroAddress, fmt.Errorf("node lookup: %s", "feed metadata absent") } f := feeds.New(topic, common.BytesToAddress(owner)) l, err := feedFactory.NewLookup(*t, f) if err != nil { - return nil, fmt.Errorf("feed lookup failed: %w", err) + return swarm.ZeroAddress, fmt.Errorf("feed lookup failed: %w", err) } u, _, _, err := l.At(ctx, time.Now().Unix(), 0) if err != nil { - return nil, err + return swarm.ZeroAddress, err + } + + _, ref, err := feeds.FromChunk(u) + if err != nil { + return swarm.ZeroAddress, err } - return feeds.GetWrappedChunk(ctx, st, u) + return swarm.NewAddress(ref), nil } func batchStoreExists(s storage.StateStorer) (bool, error) { diff --git a/pkg/soc/testing/soc.go b/pkg/soc/testing/soc.go index b6f7e2a7176..a622bf2d1dc 100644 --- a/pkg/soc/testing/soc.go +++ b/pkg/soc/testing/soc.go @@ -70,35 +70,17 @@ func GenerateMockSocWithSigner(t *testing.T, data []byte, signer crypto.Signer) func GenerateMockSOC(t *testing.T, data []byte) *MockSOC { t.Helper() - ch, err := cac.New(data) + privKey, err := crypto.GenerateSecp256k1Key() if err != nil { t.Fatal(err) } - - return generateMockSOC(t, ch) -} - -// GenerateMockSOC generates a valid mocked SOC from given chunk data (span + payload). -func GenerateMockSOCWithSpan(t *testing.T, data []byte) *MockSOC { - t.Helper() - - ch, err := cac.NewWithDataSpan(data) + signer := crypto.NewDefaultSigner(privKey) + owner, err := signer.EthereumAddress() if err != nil { t.Fatal(err) } - return generateMockSOC(t, ch) -} - -func generateMockSOC(t *testing.T, ch swarm.Chunk) *MockSOC { - t.Helper() - - privKey, err := crypto.GenerateSecp256k1Key() - if err != nil { - t.Fatal(err) - } - signer := crypto.NewDefaultSigner(privKey) - owner, err := signer.EthereumAddress() + ch, err := cac.New(data) if err != nil { t.Fatal(err) }