From 6fcdc6b10bf3769e6784cd636d53260e35dc867f Mon Sep 17 00:00:00 2001 From: Acha Bill Date: Fri, 23 Feb 2024 15:56:36 +0100 Subject: [PATCH] feat: split data into chunks with redundancy --- openapi/SwarmDebug.yaml | 31 +++++++++++++++++++ pkg/api/router.go | 4 +++ pkg/api/split.go | 67 +++++++++++++++++++++++++++++++++++++++++ pkg/api/split_test.go | 66 ++++++++++++++++++++++++++++++++++++++++ 4 files changed, 168 insertions(+) create mode 100644 pkg/api/split.go create mode 100644 pkg/api/split_test.go diff --git a/openapi/SwarmDebug.yaml b/openapi/SwarmDebug.yaml index 11d7b1a5fdb..a7b9f1f5131 100644 --- a/openapi/SwarmDebug.yaml +++ b/openapi/SwarmDebug.yaml @@ -1133,3 +1133,34 @@ paths: $ref: "SwarmCommon.yaml#/components/responses/400" default: description: Default response. + + "/split": + post: + summary: "Split data into chunks with redundancy" + tags: + - Chunk + parameters: + - $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyLevelParameter" + requestBody: + description: Binary data to be split into chunks + content: + application/octet-stream: + schema: + type: string + format: binary + responses: + "200": + description: Chunks [reference+data] + content: + application/octet-stream: + schema: + type: string + format: binary + "400": + $ref: "SwarmCommon.yaml#/components/responses/400" + "402": + $ref: "SwarmCommon.yaml#/components/responses/402" + "500": + $ref: "SwarmCommon.yaml#/components/responses/500" + default: + description: Default response diff --git a/pkg/api/router.go b/pkg/api/router.go index 7c854543b88..4f4238f5f8a 100644 --- a/pkg/api/router.go +++ b/pkg/api/router.go @@ -403,6 +403,10 @@ func (s *Service) mountBusinessDebug(restricted bool) { "GET": http.HandlerFunc(s.hasChunkHandler), }) + handle("/split", jsonhttp.MethodHandler{ + "POST": http.HandlerFunc(s.splitHandler), + }) + handle("/topology", jsonhttp.MethodHandler{ "GET": http.HandlerFunc(s.topologyHandler), }) diff --git a/pkg/api/split.go b/pkg/api/split.go new file mode 100644 index 00000000000..5627794f4ce --- /dev/null +++ b/pkg/api/split.go @@ -0,0 +1,67 @@ +// Copyright 2024 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package api + +import ( + "context" + "net/http" + + "github.com/ethersphere/bee/pkg/file/redundancy" + "github.com/ethersphere/bee/pkg/jsonhttp" + "github.com/ethersphere/bee/pkg/swarm" +) + +type splitPutter struct { + chunks []swarm.Chunk +} + +func (s *splitPutter) Put(_ context.Context, chunk swarm.Chunk) error { + s.chunks = append(s.chunks, chunk) + return nil +} + +func (s *Service) splitHandler(w http.ResponseWriter, r *http.Request) { + logger := s.logger.WithName("split").Build() + flusher, ok := w.(http.Flusher) + if !ok { + logger.Error(nil, "streaming unsupported") + jsonhttp.InternalServerError(w, "streaming unsupported") + return + } + + headers := struct { + RLevel redundancy.Level `map:"Swarm-Redundancy-Level"` + }{} + if response := s.mapStructure(r.Header, &headers); response != nil { + response("invalid header params", logger, w) + return + } + + putter := &splitPutter{ + chunks: make([]swarm.Chunk, 0), + } + + p := requestPipelineFn(putter, false, headers.RLevel) + _, err := p(r.Context(), r.Body) + if err != nil { + logger.Error(err, "split: pipeline error") + jsonhttp.InternalServerError(w, "split failed") + return + } + + w.Header().Set("X-Content-Type-Options", "nosniff") + + buf := make([]byte, swarm.HashSize+swarm.ChunkSize) + for _, c := range putter.chunks { + copy(buf[:swarm.HashSize], c.Address().Bytes()) + copy(buf[swarm.HashSize:], c.Data()) + _, err = w.Write(buf) + if err != nil { + logger.Error(err, "split: write error") + return + } + flusher.Flush() + } +} diff --git a/pkg/api/split_test.go b/pkg/api/split_test.go new file mode 100644 index 00000000000..e73d0a8eb82 --- /dev/null +++ b/pkg/api/split_test.go @@ -0,0 +1,66 @@ +// Copyright 2024 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package api_test + +import ( + "bytes" + "crypto/rand" + "errors" + "io" + "net/http" + "testing" + + "github.com/ethersphere/bee/pkg/api" + "github.com/ethersphere/bee/pkg/log" + mockpost "github.com/ethersphere/bee/pkg/postage/mock" + mockstorer "github.com/ethersphere/bee/pkg/storer/mock" + "github.com/ethersphere/bee/pkg/swarm" +) + +func TestSplit(t *testing.T) { + t.Parallel() + + var ( + client, _, _, _ = newTestServer(t, testServerOptions{ + Storer: mockstorer.New(), + Logger: log.Noop, + Post: mockpost.New(mockpost.WithAcceptAll()), + BeeMode: api.DevMode, + DebugAPI: true, + }) + ) + buf := make([]byte, swarm.ChunkSize*2) + _, err := rand.Read(buf) + if err != nil { + t.Fatal(err) + } + + resp := request(t, client, http.MethodPost, "/split", bytes.NewReader(buf), http.StatusOK) + defer resp.Body.Close() + var addressCount, chunkCount int + for { + addr := make([]byte, swarm.HashSize) + _, err = resp.Body.Read(addr) + if err != nil && !errors.Is(err, io.EOF) { + t.Fatal(err) + } + if errors.Is(err, io.EOF) { + break + } + addressCount++ + ch := make([]byte, swarm.ChunkSize) + _, err = resp.Body.Read(ch) + if err != nil { + t.Fatal(err) + } + chunkCount++ + } + if addressCount != 3 { + t.Fatalf("number of addresses %d, expected 3", addressCount) + } + if addressCount != chunkCount { + t.Fatalf("number of chunks and addresses do not match") + } +}