Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: split input file to chunks with specified redundancy #4600

Merged
merged 6 commits into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 142 additions & 28 deletions cmd/bee/cmd/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,41 +9,62 @@ import (
"fmt"
"io"
"os"
"path/filepath"
"strings"

"github.com/ethersphere/bee/pkg/file/pipeline/builder"
"github.com/ethersphere/bee/pkg/file/redundancy"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/spf13/cobra"
)

// putter is a putter that stores all the split chunk addresses of a file
type putter struct {
chunkAddresses []string
c chan swarm.Chunk
}

func (s *putter) Put(ctx context.Context, chunk swarm.Chunk) error {
s.chunkAddresses = append(s.chunkAddresses, chunk.Address().String())
s.c <- chunk
return nil
}
func newPutter() *putter {
return &putter{
c: make(chan swarm.Chunk),
}
}

var _ storage.Putter = (*putter)(nil)

type pipelineFunc func(context.Context, io.Reader) (swarm.Address, error)

func requestPipelineFn(s storage.Putter, encrypt bool) pipelineFunc {
func requestPipelineFn(s storage.Putter, encrypt bool, rLevel redundancy.Level) pipelineFunc {
return func(ctx context.Context, r io.Reader) (swarm.Address, error) {
pipe := builder.NewPipelineBuilder(ctx, s, encrypt, 0)
pipe := builder.NewPipelineBuilder(ctx, s, encrypt, rLevel)
return builder.FeedPipeline(ctx, pipe, r)
}
}

func (c *command) initSplitCmd() error {
optionNameInputFile := "input-file"
optionNameOutputFile := "output-file"
cmd := &cobra.Command{
Use: "split",
Short: "Split a file into a list chunks. The 1st line is the root hash",
Short: "Split a file into chunks",
}

splitRefs(cmd)
splitChunks(cmd)
c.root.AddCommand(cmd)
return nil
}

func splitRefs(cmd *cobra.Command) {
optionNameInputFile := "input-file"
optionNameOutputFile := "output-file"
optionNameRedundancyLevel := "r-level"

c := &cobra.Command{
Use: "refs",
Short: "Write only the chunk reference to the output file",
RunE: func(cmd *cobra.Command, args []string) error {
inputFileName, err := cmd.Flags().GetString(optionNameInputFile)
if err != nil {
Expand All @@ -53,6 +74,10 @@ func (c *command) initSplitCmd() error {
if err != nil {
return fmt.Errorf("get output file name: %w", err)
}
rLevel, err := cmd.Flags().GetInt(optionNameRedundancyLevel)
if err != nil {
return fmt.Errorf("get redundancy level: %w", err)
}

v, err := cmd.Flags().GetString(optionNameVerbosity)
if err != nil {
Expand All @@ -70,44 +95,133 @@ func (c *command) initSplitCmd() error {
}
defer reader.Close()

logger.Info("splitting", "file", inputFileName)
store := new(putter)

p := requestPipelineFn(store, false)
address, err := p(context.Background(), reader)
if err != nil {
return fmt.Errorf("bmt pipeline: %w", err)
}

logger.Info("splitting", "file", inputFileName, "rLevel", rLevel)
logger.Info("writing output", "file", outputFileName)

store := newPutter()
writer, err := os.OpenFile(outputFileName, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644)
if err != nil {
return fmt.Errorf("open output file: %w", err)
}
defer writer.Close()

logger.Debug("write root", "hash", address)
_, err = writer.WriteString(fmt.Sprintf("%s\n", address))
var refs []string
go func() {
for chunk := range store.c {
refs = append(refs, chunk.Address().String())
}
}()

p := requestPipelineFn(store, false, redundancy.Level(rLevel))
rootRef, err := p(context.Background(), reader)
close(store.c)
if err != nil {
return fmt.Errorf("pipeline: %w", err)
}

logger.Debug("write root", "hash", rootRef)
_, err = writer.WriteString(fmt.Sprintf("%s\n", rootRef))
if err != nil {
return fmt.Errorf("write root hash: %w", err)
}
for _, chunkAddress := range store.chunkAddresses {
logger.Debug("write chunk", "hash", chunkAddress)
_, err = writer.WriteString(fmt.Sprintf("%s\n", chunkAddress))
for _, ref := range refs {
logger.Debug("write chunk", "hash", ref)
_, err = writer.WriteString(fmt.Sprintf("%s\n", ref))
if err != nil {
return fmt.Errorf("write chunk address: %w", err)
}
}
logger.Info("done", "hashes", len(store.chunkAddresses))
logger.Info("done", "root", rootRef.String(), "chunks", len(refs))
return nil
},
}

cmd.Flags().String(optionNameVerbosity, "info", "verbosity level")
cmd.Flags().String(optionNameInputFile, "", "input file")
cmd.Flags().String(optionNameOutputFile, "", "output file")
cmd.MarkFlagsRequiredTogether(optionNameInputFile, optionNameOutputFile)
c.Flags().String(optionNameInputFile, "", "input file")
c.Flags().String(optionNameOutputFile, "", "output file")
c.Flags().Int(optionNameRedundancyLevel, 0, "redundancy level")
c.Flags().String(optionNameVerbosity, "info", "verbosity level")

c.root.AddCommand(cmd)
return nil
c.MarkFlagsRequiredTogether(optionNameInputFile, optionNameOutputFile)

cmd.AddCommand(c)
}

func splitChunks(cmd *cobra.Command) {
optionNameInputFile := "input-file"
optionNameOutputDir := "output-dir"
optionNameRedundancyLevel := "r-level"

c := &cobra.Command{
Use: "chunks",
Short: "Write the chunks to the output directory",
RunE: func(cmd *cobra.Command, args []string) error {
inputFileName, err := cmd.Flags().GetString(optionNameInputFile)
if err != nil {
return fmt.Errorf("get input file name: %w", err)
}
outputDir, err := cmd.Flags().GetString(optionNameOutputDir)
if err != nil {
return fmt.Errorf("get output file name: %w", err)
}
info, err := os.Stat(outputDir)
if err != nil {
return fmt.Errorf("stat output dir: %w", err)
}
if !info.IsDir() {
return fmt.Errorf("output dir %s is not a directory", outputDir)
}
rLevel, err := cmd.Flags().GetInt(optionNameRedundancyLevel)
if err != nil {
return fmt.Errorf("get redundancy level: %w", err)
}
v, err := cmd.Flags().GetString(optionNameVerbosity)
if err != nil {
return fmt.Errorf("get verbosity: %w", err)
}
v = strings.ToLower(v)
logger, err := newLogger(cmd, v)
if err != nil {
return fmt.Errorf("new logger: %w", err)
}
reader, err := os.Open(inputFileName)
if err != nil {
return fmt.Errorf("open input file: %w", err)
}
defer reader.Close()

logger.Info("splitting", "file", inputFileName, "rLevel", rLevel)
logger.Info("writing output", "dir", outputDir)

store := newPutter()
ctx, cancel := context.WithCancel(context.Background())
var chunksCount int64
go func() {
for chunk := range store.c {
filePath := filepath.Join(outputDir, chunk.Address().String())
err := os.WriteFile(filePath, chunk.Data(), 0644)
if err != nil {
logger.Error(err, "write chunk")
cancel()
}
chunksCount++
}
}()

p := requestPipelineFn(store, false, redundancy.Level(rLevel))
rootRef, err := p(ctx, reader)
close(store.c)
if err != nil {
return fmt.Errorf("pipeline: %w", err)
}
logger.Info("done", "root", rootRef.String(), "chunks", chunksCount)
return nil
},
}
c.Flags().String(optionNameInputFile, "", "input file")
c.Flags().String(optionNameOutputDir, "", "output dir")
c.Flags().Int(optionNameRedundancyLevel, 0, "redundancy level")
c.Flags().String(optionNameVerbosity, "info", "verbosity level")
c.MarkFlagsRequiredTogether(optionNameInputFile, optionNameOutputDir)

cmd.AddCommand(c)
}
77 changes: 75 additions & 2 deletions cmd/bee/cmd/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,17 @@ import (
"math/rand"
"os"
"path"
"path/filepath"
"testing"

"github.com/ethersphere/bee/cmd/bee/cmd"
"github.com/ethersphere/bee/pkg/api"
"github.com/ethersphere/bee/pkg/cac"
"github.com/ethersphere/bee/pkg/soc"
"github.com/ethersphere/bee/pkg/swarm"
)

func TestDBSplit(t *testing.T) {
func TestDBSplitRefs(t *testing.T) {
t.Parallel()

s := (rand.Intn(10) + 10) * 1024 // rand between 10 and 20 KB
Expand All @@ -34,7 +38,7 @@ func TestDBSplit(t *testing.T) {

outputFileName := path.Join(t.TempDir(), "output")

err = newCommand(t, cmd.WithArgs("split", "--input-file", inputFileName, "--output-file", outputFileName)).Execute()
err = newCommand(t, cmd.WithArgs("split", "refs", "--input-file", inputFileName, "--output-file", outputFileName)).Execute()
if err != nil {
t.Fatal(err)
}
Expand All @@ -60,3 +64,72 @@ func TestDBSplit(t *testing.T) {
t.Fatalf("got %d hashes, want %d", gotHashes, wantHashes)
}
}

func TestDBSplitChunks(t *testing.T) {
t.Parallel()

s := (rand.Intn(10) + 10) * 1024 // rand between 10 and 20 KB
buf := make([]byte, s)
_, err := crand.Read(buf)
if err != nil {
t.Fatal(err)
}

inputFileName := path.Join(t.TempDir(), "input")
err = os.WriteFile(inputFileName, buf, 0644)
if err != nil {
t.Fatal(err)
}

dir := path.Join(t.TempDir(), "chunks")
err = os.Mkdir(dir, os.ModePerm)
if err != nil {
t.Fatal(err)
}

err = newCommand(t, cmd.WithArgs("split", "chunks", "--input-file", inputFileName, "--output-dir", dir, "--r-level", "3")).Execute()
if err != nil {
t.Fatal(err)
}

stat, err := os.Stat(inputFileName)
if err != nil {
t.Fatal(err)
}
want := api.CalculateNumberOfChunks(stat.Size(), false)
acha-bill marked this conversation as resolved.
Show resolved Hide resolved

entries, err := os.ReadDir(dir)
if err != nil {
t.Fatal(err)
}

if int64(len(entries)) < want {
t.Fatalf("want at least %d chunks", want)
}

for _, entry := range entries {
acha-bill marked this conversation as resolved.
Show resolved Hide resolved
d, err := os.ReadFile(filepath.Join(dir, entry.Name()))
if err != nil {
t.Fatal(err)
}

ch, err := cac.NewWithDataSpan(d)
if err != nil {
sch, err := soc.FromChunk(swarm.NewChunk(swarm.EmptyAddress, d))
if err != nil {
t.Fatal("invalid cac/soc chunk", err)
}
ch, err = sch.Chunk()
if err != nil {
t.Fatal(err)
}
if !soc.Valid(ch) {
t.Fatal("invalid soc chunk")
}
}

if ch.Address().String() != entry.Name() {
t.Fatal("expected chunk reference to equal file name")
}
}
}
Loading