Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: split input file to chunks with specified redundancy #4600

Merged
merged 6 commits into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 135 additions & 30 deletions cmd/bee/cmd/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,41 +9,62 @@ import (
"fmt"
"io"
"os"
"path/filepath"
"strings"
"sync/atomic"

"github.com/ethersphere/bee/pkg/file/pipeline/builder"
"github.com/ethersphere/bee/pkg/file/redundancy"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/spf13/cobra"
)

// putter is a putter that stores all the split chunk addresses of a file
type putter struct {
chunkAddresses []string
cb func(chunk swarm.Chunk) error
}

func (s *putter) Put(ctx context.Context, chunk swarm.Chunk) error {
s.chunkAddresses = append(s.chunkAddresses, chunk.Address().String())
return nil
func (s *putter) Put(_ context.Context, chunk swarm.Chunk) error {
return s.cb(chunk)
}
func newPutter(cb func(ch swarm.Chunk) error) *putter {
return &putter{
cb: cb,
}
}

var _ storage.Putter = (*putter)(nil)

type pipelineFunc func(context.Context, io.Reader) (swarm.Address, error)

func requestPipelineFn(s storage.Putter, encrypt bool) pipelineFunc {
func requestPipelineFn(s storage.Putter, encrypt bool, rLevel redundancy.Level) pipelineFunc {
return func(ctx context.Context, r io.Reader) (swarm.Address, error) {
pipe := builder.NewPipelineBuilder(ctx, s, encrypt, 0)
pipe := builder.NewPipelineBuilder(ctx, s, encrypt, rLevel)
return builder.FeedPipeline(ctx, pipe, r)
}
}

func (c *command) initSplitCmd() error {
optionNameInputFile := "input-file"
optionNameOutputFile := "output-file"
cmd := &cobra.Command{
Use: "split",
Short: "Split a file into a list chunks. The 1st line is the root hash",
Short: "Split a file into chunks",
}

splitRefs(cmd)
splitChunks(cmd)
c.root.AddCommand(cmd)
return nil
}

func splitRefs(cmd *cobra.Command) {
optionNameInputFile := "input-file"
optionNameOutputFile := "output-file"
optionNameRedundancyLevel := "r-level"

c := &cobra.Command{
Use: "refs",
Short: "Write only the chunk reference to the output file",
RunE: func(cmd *cobra.Command, args []string) error {
inputFileName, err := cmd.Flags().GetString(optionNameInputFile)
if err != nil {
Expand All @@ -53,6 +74,10 @@ func (c *command) initSplitCmd() error {
if err != nil {
return fmt.Errorf("get output file name: %w", err)
}
rLevel, err := cmd.Flags().GetInt(optionNameRedundancyLevel)
if err != nil {
return fmt.Errorf("get redundancy level: %w", err)
}

v, err := cmd.Flags().GetString(optionNameVerbosity)
if err != nil {
Expand All @@ -70,44 +95,124 @@ func (c *command) initSplitCmd() error {
}
defer reader.Close()

logger.Info("splitting", "file", inputFileName)
store := new(putter)

p := requestPipelineFn(store, false)
address, err := p(context.Background(), reader)
if err != nil {
return fmt.Errorf("bmt pipeline: %w", err)
}

logger.Info("splitting", "file", inputFileName, "rLevel", rLevel)
logger.Info("writing output", "file", outputFileName)

var refs []string
store := newPutter(func(ch swarm.Chunk) error {
refs = append(refs, ch.Address().String())
return nil
})
writer, err := os.OpenFile(outputFileName, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644)
if err != nil {
return fmt.Errorf("open output file: %w", err)
}
defer writer.Close()

logger.Debug("write root", "hash", address)
_, err = writer.WriteString(fmt.Sprintf("%s\n", address))
p := requestPipelineFn(store, false, redundancy.Level(rLevel))
rootRef, err := p(context.Background(), reader)
if err != nil {
return fmt.Errorf("pipeline: %w", err)
}

logger.Debug("write root", "hash", rootRef)
_, err = writer.WriteString(fmt.Sprintf("%s\n", rootRef))
if err != nil {
return fmt.Errorf("write root hash: %w", err)
}
for _, chunkAddress := range store.chunkAddresses {
logger.Debug("write chunk", "hash", chunkAddress)
_, err = writer.WriteString(fmt.Sprintf("%s\n", chunkAddress))
for _, ref := range refs {
logger.Debug("write chunk", "hash", ref)
_, err = writer.WriteString(fmt.Sprintf("%s\n", ref))
if err != nil {
return fmt.Errorf("write chunk address: %w", err)
}
}
logger.Info("done", "hashes", len(store.chunkAddresses))
logger.Info("done", "root", rootRef.String(), "chunks", len(refs))
return nil
},
}

cmd.Flags().String(optionNameVerbosity, "info", "verbosity level")
cmd.Flags().String(optionNameInputFile, "", "input file")
cmd.Flags().String(optionNameOutputFile, "", "output file")
cmd.MarkFlagsRequiredTogether(optionNameInputFile, optionNameOutputFile)
c.Flags().String(optionNameInputFile, "", "input file")
c.Flags().String(optionNameOutputFile, "", "output file")
c.Flags().Int(optionNameRedundancyLevel, 0, "redundancy level")
c.Flags().String(optionNameVerbosity, "info", "verbosity level")

c.root.AddCommand(cmd)
return nil
c.MarkFlagsRequiredTogether(optionNameInputFile, optionNameOutputFile)

cmd.AddCommand(c)
}

func splitChunks(cmd *cobra.Command) {
optionNameInputFile := "input-file"
optionNameOutputDir := "output-dir"
optionNameRedundancyLevel := "r-level"

c := &cobra.Command{
Use: "chunks",
Short: "Write the chunks to the output directory",
RunE: func(cmd *cobra.Command, args []string) error {
inputFileName, err := cmd.Flags().GetString(optionNameInputFile)
if err != nil {
return fmt.Errorf("get input file name: %w", err)
}
outputDir, err := cmd.Flags().GetString(optionNameOutputDir)
if err != nil {
return fmt.Errorf("get output file name: %w", err)
}
info, err := os.Stat(outputDir)
if err != nil {
return fmt.Errorf("stat output dir: %w", err)
}
if !info.IsDir() {
return fmt.Errorf("output dir %s is not a directory", outputDir)
}
rLevel, err := cmd.Flags().GetInt(optionNameRedundancyLevel)
if err != nil {
return fmt.Errorf("get redundancy level: %w", err)
}
v, err := cmd.Flags().GetString(optionNameVerbosity)
if err != nil {
return fmt.Errorf("get verbosity: %w", err)
}
v = strings.ToLower(v)
logger, err := newLogger(cmd, v)
if err != nil {
return fmt.Errorf("new logger: %w", err)
}
reader, err := os.Open(inputFileName)
if err != nil {
return fmt.Errorf("open input file: %w", err)
}
defer reader.Close()

logger.Info("splitting", "file", inputFileName, "rLevel", rLevel)
logger.Info("writing output", "dir", outputDir)

var chunksCount atomic.Int64
store := newPutter(func(chunk swarm.Chunk) error {
filePath := filepath.Join(outputDir, chunk.Address().String())
err := os.WriteFile(filePath, chunk.Data(), 0644)
if err != nil {
return err
}
chunksCount.Add(1)
return nil
})

p := requestPipelineFn(store, false, redundancy.Level(rLevel))
rootRef, err := p(context.Background(), reader)
if err != nil {
return fmt.Errorf("pipeline: %w", err)
}
logger.Info("done", "root", rootRef.String(), "chunks", chunksCount.Load())
return nil
},
}
c.Flags().String(optionNameInputFile, "", "input file")
c.Flags().String(optionNameOutputDir, "", "output dir")
c.Flags().Int(optionNameRedundancyLevel, 0, "redundancy level")
c.Flags().String(optionNameVerbosity, "info", "verbosity level")
c.MarkFlagsRequiredTogether(optionNameInputFile, optionNameOutputDir)

cmd.AddCommand(c)
}
116 changes: 114 additions & 2 deletions cmd/bee/cmd/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,26 @@ package cmd_test

import (
"bufio"
"bytes"
"context"
crand "crypto/rand"
"io"
"math/rand"
"os"
"path"
"path/filepath"
"sync"
"testing"

"github.com/ethersphere/bee/cmd/bee/cmd"
"github.com/ethersphere/bee/pkg/api"
"github.com/ethersphere/bee/pkg/file/pipeline/builder"
"github.com/ethersphere/bee/pkg/file/redundancy"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
)

func TestDBSplit(t *testing.T) {
func TestDBSplitRefs(t *testing.T) {
t.Parallel()

s := (rand.Intn(10) + 10) * 1024 // rand between 10 and 20 KB
Expand All @@ -34,7 +43,7 @@ func TestDBSplit(t *testing.T) {

outputFileName := path.Join(t.TempDir(), "output")

err = newCommand(t, cmd.WithArgs("split", "--input-file", inputFileName, "--output-file", outputFileName)).Execute()
err = newCommand(t, cmd.WithArgs("split", "refs", "--input-file", inputFileName, "--output-file", outputFileName)).Execute()
if err != nil {
t.Fatal(err)
}
Expand All @@ -60,3 +69,106 @@ func TestDBSplit(t *testing.T) {
t.Fatalf("got %d hashes, want %d", gotHashes, wantHashes)
}
}

func TestDBSplitChunks(t *testing.T) {
t.Parallel()

s := (rand.Intn(10) + 10) * 1024 // rand between 10 and 20 KB
buf := make([]byte, s)
_, err := crand.Read(buf)
if err != nil {
t.Fatal(err)
}

inputFileName := path.Join(t.TempDir(), "input")
err = os.WriteFile(inputFileName, buf, 0644)
if err != nil {
t.Fatal(err)
}

dir := path.Join(t.TempDir(), "chunks")
err = os.Mkdir(dir, os.ModePerm)
if err != nil {
t.Fatal(err)
}

err = newCommand(t, cmd.WithArgs("split", "chunks", "--input-file", inputFileName, "--output-dir", dir, "--r-level", "3")).Execute()
if err != nil {
t.Fatal(err)
}

// split the file manually and compare output with the split commands output.
putter := &putter{chunks: make(map[string]swarm.Chunk)}
p := requestPipelineFn(putter, false, redundancy.Level(3))
_, err = p(context.Background(), bytes.NewReader(buf))
if err != nil {
t.Fatal(err)
}

entries, err := os.ReadDir(dir)
if err != nil {
t.Fatal(err)
}

if len(entries) != len(putter.chunks) {
t.Fatal("number of chunks does not match")
}
for _, entry := range entries {
acha-bill marked this conversation as resolved.
Show resolved Hide resolved
ref := entry.Name()
if _, ok := putter.chunks[ref]; !ok {
t.Fatalf("chunk %s not found", ref)
}
err, ok := compare(filepath.Join(dir, ref), putter.chunks[ref])
if err != nil {
t.Fatal(err)
}
if !ok {
t.Fatalf("chunk %s does not match", ref)
}
delete(putter.chunks, ref)
}

if len(putter.chunks) != 0 {
t.Fatalf("want 0 chunks left, got %d", len(putter.chunks))
}
}

func compare(path string, chunk swarm.Chunk) (error, bool) {
f, err := os.Open(path)
if err != nil {
return err, false
}
defer f.Close()

b, err := io.ReadAll(f)
if err != nil {
return err, false
}

if !bytes.Equal(b, chunk.Data()) {
return nil, false
}

return nil, true
}

type putter struct {
chunks map[string]swarm.Chunk
mu sync.Mutex
}

func (s *putter) Put(_ context.Context, chunk swarm.Chunk) error {
s.mu.Lock()
defer s.mu.Unlock()
s.chunks[chunk.Address().String()] = chunk
return nil
}

type pipelineFunc func(context.Context, io.Reader) (swarm.Address, error)

func requestPipelineFn(s storage.Putter, encrypt bool, rLevel redundancy.Level) pipelineFunc {
return func(ctx context.Context, r io.Reader) (swarm.Address, error) {
pipe := builder.NewPipelineBuilder(ctx, s, encrypt, rLevel)
return builder.FeedPipeline(ctx, pipe, r)
}
}
Loading