Skip to content

Commit

Permalink
feat: split data into chunks with redundancy
Browse files Browse the repository at this point in the history
  • Loading branch information
acha-bill committed Feb 23, 2024
1 parent 5de732c commit 6fcdc6b
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 0 deletions.
31 changes: 31 additions & 0 deletions openapi/SwarmDebug.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1133,3 +1133,34 @@ paths:
$ref: "SwarmCommon.yaml#/components/responses/400"
default:
description: Default response.

"/split":
post:
summary: "Split data into chunks with redundancy"
tags:
- Chunk
parameters:
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyLevelParameter"
requestBody:
description: Binary data to be split into chunks
content:
application/octet-stream:
schema:
type: string
format: binary
responses:
"200":
description: Chunks [reference+data]
content:
application/octet-stream:
schema:
type: string
format: binary
"400":
$ref: "SwarmCommon.yaml#/components/responses/400"
"402":
$ref: "SwarmCommon.yaml#/components/responses/402"
"500":
$ref: "SwarmCommon.yaml#/components/responses/500"
default:
description: Default response
4 changes: 4 additions & 0 deletions pkg/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,10 @@ func (s *Service) mountBusinessDebug(restricted bool) {
"GET": http.HandlerFunc(s.hasChunkHandler),
})

handle("/split", jsonhttp.MethodHandler{
"POST": http.HandlerFunc(s.splitHandler),
})

handle("/topology", jsonhttp.MethodHandler{
"GET": http.HandlerFunc(s.topologyHandler),
})
Expand Down
67 changes: 67 additions & 0 deletions pkg/api/split.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright 2024 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package api

import (
"context"
"net/http"

"github.com/ethersphere/bee/pkg/file/redundancy"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/swarm"
)

type splitPutter struct {
chunks []swarm.Chunk
}

func (s *splitPutter) Put(_ context.Context, chunk swarm.Chunk) error {
s.chunks = append(s.chunks, chunk)
return nil
}

func (s *Service) splitHandler(w http.ResponseWriter, r *http.Request) {
logger := s.logger.WithName("split").Build()
flusher, ok := w.(http.Flusher)
if !ok {
logger.Error(nil, "streaming unsupported")
jsonhttp.InternalServerError(w, "streaming unsupported")
return
}

headers := struct {
RLevel redundancy.Level `map:"Swarm-Redundancy-Level"`
}{}
if response := s.mapStructure(r.Header, &headers); response != nil {
response("invalid header params", logger, w)
return
}

putter := &splitPutter{
chunks: make([]swarm.Chunk, 0),
}

p := requestPipelineFn(putter, false, headers.RLevel)
_, err := p(r.Context(), r.Body)
if err != nil {
logger.Error(err, "split: pipeline error")
jsonhttp.InternalServerError(w, "split failed")
return
}

w.Header().Set("X-Content-Type-Options", "nosniff")

buf := make([]byte, swarm.HashSize+swarm.ChunkSize)
for _, c := range putter.chunks {
copy(buf[:swarm.HashSize], c.Address().Bytes())
copy(buf[swarm.HashSize:], c.Data())
_, err = w.Write(buf)
if err != nil {
logger.Error(err, "split: write error")
return
}
flusher.Flush()
}
}
66 changes: 66 additions & 0 deletions pkg/api/split_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright 2024 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package api_test

import (
"bytes"
"crypto/rand"
"errors"
"io"
"net/http"
"testing"

"github.com/ethersphere/bee/pkg/api"
"github.com/ethersphere/bee/pkg/log"
mockpost "github.com/ethersphere/bee/pkg/postage/mock"
mockstorer "github.com/ethersphere/bee/pkg/storer/mock"
"github.com/ethersphere/bee/pkg/swarm"
)

func TestSplit(t *testing.T) {
t.Parallel()

var (
client, _, _, _ = newTestServer(t, testServerOptions{
Storer: mockstorer.New(),
Logger: log.Noop,
Post: mockpost.New(mockpost.WithAcceptAll()),
BeeMode: api.DevMode,
DebugAPI: true,
})
)
buf := make([]byte, swarm.ChunkSize*2)
_, err := rand.Read(buf)
if err != nil {
t.Fatal(err)
}

resp := request(t, client, http.MethodPost, "/split", bytes.NewReader(buf), http.StatusOK)
defer resp.Body.Close()
var addressCount, chunkCount int
for {
addr := make([]byte, swarm.HashSize)
_, err = resp.Body.Read(addr)
if err != nil && !errors.Is(err, io.EOF) {
t.Fatal(err)
}
if errors.Is(err, io.EOF) {
break
}
addressCount++
ch := make([]byte, swarm.ChunkSize)
_, err = resp.Body.Read(ch)
if err != nil {
t.Fatal(err)
}
chunkCount++
}
if addressCount != 3 {
t.Fatalf("number of addresses %d, expected 3", addressCount)
}
if addressCount != chunkCount {
t.Fatalf("number of chunks and addresses do not match")
}
}

0 comments on commit 6fcdc6b

Please sign in to comment.