From f7b3586c4ddc909b2c9ce10547ce50fb4c392854 Mon Sep 17 00:00:00 2001 From: Acha Bill <57879913+acha-bill@users.noreply.github.com> Date: Mon, 4 Mar 2024 17:05:40 +0100 Subject: [PATCH] feat: split input file to chunks with specified redundancy (#4600) --- cmd/bee/cmd/split.go | 165 +++++++++++++++++++++++++++++++------- cmd/bee/cmd/split_test.go | 116 ++++++++++++++++++++++++++- 2 files changed, 249 insertions(+), 32 deletions(-) diff --git a/cmd/bee/cmd/split.go b/cmd/bee/cmd/split.go index e173ae186fa..cee464b0d1a 100644 --- a/cmd/bee/cmd/split.go +++ b/cmd/bee/cmd/split.go @@ -9,9 +9,12 @@ import ( "fmt" "io" "os" + "path/filepath" "strings" + "sync/atomic" "github.com/ethersphere/bee/pkg/file/pipeline/builder" + "github.com/ethersphere/bee/pkg/file/redundancy" "github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/swarm" "github.com/spf13/cobra" @@ -19,31 +22,49 @@ import ( // putter is a putter that stores all the split chunk addresses of a file type putter struct { - chunkAddresses []string + cb func(chunk swarm.Chunk) error } -func (s *putter) Put(ctx context.Context, chunk swarm.Chunk) error { - s.chunkAddresses = append(s.chunkAddresses, chunk.Address().String()) - return nil +func (s *putter) Put(_ context.Context, chunk swarm.Chunk) error { + return s.cb(chunk) +} +func newPutter(cb func(ch swarm.Chunk) error) *putter { + return &putter{ + cb: cb, + } } var _ storage.Putter = (*putter)(nil) type pipelineFunc func(context.Context, io.Reader) (swarm.Address, error) -func requestPipelineFn(s storage.Putter, encrypt bool) pipelineFunc { +func requestPipelineFn(s storage.Putter, encrypt bool, rLevel redundancy.Level) pipelineFunc { return func(ctx context.Context, r io.Reader) (swarm.Address, error) { - pipe := builder.NewPipelineBuilder(ctx, s, encrypt, 0) + pipe := builder.NewPipelineBuilder(ctx, s, encrypt, rLevel) return builder.FeedPipeline(ctx, pipe, r) } } func (c *command) initSplitCmd() error { - optionNameInputFile := "input-file" - optionNameOutputFile := "output-file" cmd := &cobra.Command{ Use: "split", - Short: "Split a file into a list chunks. The 1st line is the root hash", + Short: "Split a file into chunks", + } + + splitRefs(cmd) + splitChunks(cmd) + c.root.AddCommand(cmd) + return nil +} + +func splitRefs(cmd *cobra.Command) { + optionNameInputFile := "input-file" + optionNameOutputFile := "output-file" + optionNameRedundancyLevel := "r-level" + + c := &cobra.Command{ + Use: "refs", + Short: "Write only the chunk reference to the output file", RunE: func(cmd *cobra.Command, args []string) error { inputFileName, err := cmd.Flags().GetString(optionNameInputFile) if err != nil { @@ -53,6 +74,10 @@ func (c *command) initSplitCmd() error { if err != nil { return fmt.Errorf("get output file name: %w", err) } + rLevel, err := cmd.Flags().GetInt(optionNameRedundancyLevel) + if err != nil { + return fmt.Errorf("get redundancy level: %w", err) + } v, err := cmd.Flags().GetString(optionNameVerbosity) if err != nil { @@ -70,44 +95,124 @@ func (c *command) initSplitCmd() error { } defer reader.Close() - logger.Info("splitting", "file", inputFileName) - store := new(putter) - - p := requestPipelineFn(store, false) - address, err := p(context.Background(), reader) - if err != nil { - return fmt.Errorf("bmt pipeline: %w", err) - } - + logger.Info("splitting", "file", inputFileName, "rLevel", rLevel) logger.Info("writing output", "file", outputFileName) + + var refs []string + store := newPutter(func(ch swarm.Chunk) error { + refs = append(refs, ch.Address().String()) + return nil + }) writer, err := os.OpenFile(outputFileName, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644) if err != nil { return fmt.Errorf("open output file: %w", err) } defer writer.Close() - logger.Debug("write root", "hash", address) - _, err = writer.WriteString(fmt.Sprintf("%s\n", address)) + p := requestPipelineFn(store, false, redundancy.Level(rLevel)) + rootRef, err := p(context.Background(), reader) + if err != nil { + return fmt.Errorf("pipeline: %w", err) + } + + logger.Debug("write root", "hash", rootRef) + _, err = writer.WriteString(fmt.Sprintf("%s\n", rootRef)) if err != nil { return fmt.Errorf("write root hash: %w", err) } - for _, chunkAddress := range store.chunkAddresses { - logger.Debug("write chunk", "hash", chunkAddress) - _, err = writer.WriteString(fmt.Sprintf("%s\n", chunkAddress)) + for _, ref := range refs { + logger.Debug("write chunk", "hash", ref) + _, err = writer.WriteString(fmt.Sprintf("%s\n", ref)) if err != nil { return fmt.Errorf("write chunk address: %w", err) } } - logger.Info("done", "hashes", len(store.chunkAddresses)) + logger.Info("done", "root", rootRef.String(), "chunks", len(refs)) return nil }, } - cmd.Flags().String(optionNameVerbosity, "info", "verbosity level") - cmd.Flags().String(optionNameInputFile, "", "input file") - cmd.Flags().String(optionNameOutputFile, "", "output file") - cmd.MarkFlagsRequiredTogether(optionNameInputFile, optionNameOutputFile) + c.Flags().String(optionNameInputFile, "", "input file") + c.Flags().String(optionNameOutputFile, "", "output file") + c.Flags().Int(optionNameRedundancyLevel, 0, "redundancy level") + c.Flags().String(optionNameVerbosity, "info", "verbosity level") - c.root.AddCommand(cmd) - return nil + c.MarkFlagsRequiredTogether(optionNameInputFile, optionNameOutputFile) + + cmd.AddCommand(c) +} + +func splitChunks(cmd *cobra.Command) { + optionNameInputFile := "input-file" + optionNameOutputDir := "output-dir" + optionNameRedundancyLevel := "r-level" + + c := &cobra.Command{ + Use: "chunks", + Short: "Write the chunks to the output directory", + RunE: func(cmd *cobra.Command, args []string) error { + inputFileName, err := cmd.Flags().GetString(optionNameInputFile) + if err != nil { + return fmt.Errorf("get input file name: %w", err) + } + outputDir, err := cmd.Flags().GetString(optionNameOutputDir) + if err != nil { + return fmt.Errorf("get output file name: %w", err) + } + info, err := os.Stat(outputDir) + if err != nil { + return fmt.Errorf("stat output dir: %w", err) + } + if !info.IsDir() { + return fmt.Errorf("output dir %s is not a directory", outputDir) + } + rLevel, err := cmd.Flags().GetInt(optionNameRedundancyLevel) + if err != nil { + return fmt.Errorf("get redundancy level: %w", err) + } + v, err := cmd.Flags().GetString(optionNameVerbosity) + if err != nil { + return fmt.Errorf("get verbosity: %w", err) + } + v = strings.ToLower(v) + logger, err := newLogger(cmd, v) + if err != nil { + return fmt.Errorf("new logger: %w", err) + } + reader, err := os.Open(inputFileName) + if err != nil { + return fmt.Errorf("open input file: %w", err) + } + defer reader.Close() + + logger.Info("splitting", "file", inputFileName, "rLevel", rLevel) + logger.Info("writing output", "dir", outputDir) + + var chunksCount atomic.Int64 + store := newPutter(func(chunk swarm.Chunk) error { + filePath := filepath.Join(outputDir, chunk.Address().String()) + err := os.WriteFile(filePath, chunk.Data(), 0644) + if err != nil { + return err + } + chunksCount.Add(1) + return nil + }) + + p := requestPipelineFn(store, false, redundancy.Level(rLevel)) + rootRef, err := p(context.Background(), reader) + if err != nil { + return fmt.Errorf("pipeline: %w", err) + } + logger.Info("done", "root", rootRef.String(), "chunks", chunksCount.Load()) + return nil + }, + } + c.Flags().String(optionNameInputFile, "", "input file") + c.Flags().String(optionNameOutputDir, "", "output dir") + c.Flags().Int(optionNameRedundancyLevel, 0, "redundancy level") + c.Flags().String(optionNameVerbosity, "info", "verbosity level") + c.MarkFlagsRequiredTogether(optionNameInputFile, optionNameOutputDir) + + cmd.AddCommand(c) } diff --git a/cmd/bee/cmd/split_test.go b/cmd/bee/cmd/split_test.go index 4151a69b5b7..3dde5c386f2 100644 --- a/cmd/bee/cmd/split_test.go +++ b/cmd/bee/cmd/split_test.go @@ -6,17 +6,26 @@ package cmd_test import ( "bufio" + "bytes" + "context" crand "crypto/rand" + "io" "math/rand" "os" "path" + "path/filepath" + "sync" "testing" "github.com/ethersphere/bee/cmd/bee/cmd" "github.com/ethersphere/bee/pkg/api" + "github.com/ethersphere/bee/pkg/file/pipeline/builder" + "github.com/ethersphere/bee/pkg/file/redundancy" + "github.com/ethersphere/bee/pkg/storage" + "github.com/ethersphere/bee/pkg/swarm" ) -func TestDBSplit(t *testing.T) { +func TestDBSplitRefs(t *testing.T) { t.Parallel() s := (rand.Intn(10) + 10) * 1024 // rand between 10 and 20 KB @@ -34,7 +43,7 @@ func TestDBSplit(t *testing.T) { outputFileName := path.Join(t.TempDir(), "output") - err = newCommand(t, cmd.WithArgs("split", "--input-file", inputFileName, "--output-file", outputFileName)).Execute() + err = newCommand(t, cmd.WithArgs("split", "refs", "--input-file", inputFileName, "--output-file", outputFileName)).Execute() if err != nil { t.Fatal(err) } @@ -60,3 +69,106 @@ func TestDBSplit(t *testing.T) { t.Fatalf("got %d hashes, want %d", gotHashes, wantHashes) } } + +func TestDBSplitChunks(t *testing.T) { + t.Parallel() + + s := (rand.Intn(10) + 10) * 1024 // rand between 10 and 20 KB + buf := make([]byte, s) + _, err := crand.Read(buf) + if err != nil { + t.Fatal(err) + } + + inputFileName := path.Join(t.TempDir(), "input") + err = os.WriteFile(inputFileName, buf, 0644) + if err != nil { + t.Fatal(err) + } + + dir := path.Join(t.TempDir(), "chunks") + err = os.Mkdir(dir, os.ModePerm) + if err != nil { + t.Fatal(err) + } + + err = newCommand(t, cmd.WithArgs("split", "chunks", "--input-file", inputFileName, "--output-dir", dir, "--r-level", "3")).Execute() + if err != nil { + t.Fatal(err) + } + + // split the file manually and compare output with the split commands output. + putter := &putter{chunks: make(map[string]swarm.Chunk)} + p := requestPipelineFn(putter, false, redundancy.Level(3)) + _, err = p(context.Background(), bytes.NewReader(buf)) + if err != nil { + t.Fatal(err) + } + + entries, err := os.ReadDir(dir) + if err != nil { + t.Fatal(err) + } + + if len(entries) != len(putter.chunks) { + t.Fatal("number of chunks does not match") + } + for _, entry := range entries { + ref := entry.Name() + if _, ok := putter.chunks[ref]; !ok { + t.Fatalf("chunk %s not found", ref) + } + err, ok := compare(filepath.Join(dir, ref), putter.chunks[ref]) + if err != nil { + t.Fatal(err) + } + if !ok { + t.Fatalf("chunk %s does not match", ref) + } + delete(putter.chunks, ref) + } + + if len(putter.chunks) != 0 { + t.Fatalf("want 0 chunks left, got %d", len(putter.chunks)) + } +} + +func compare(path string, chunk swarm.Chunk) (error, bool) { + f, err := os.Open(path) + if err != nil { + return err, false + } + defer f.Close() + + b, err := io.ReadAll(f) + if err != nil { + return err, false + } + + if !bytes.Equal(b, chunk.Data()) { + return nil, false + } + + return nil, true +} + +type putter struct { + chunks map[string]swarm.Chunk + mu sync.Mutex +} + +func (s *putter) Put(_ context.Context, chunk swarm.Chunk) error { + s.mu.Lock() + defer s.mu.Unlock() + s.chunks[chunk.Address().String()] = chunk + return nil +} + +type pipelineFunc func(context.Context, io.Reader) (swarm.Address, error) + +func requestPipelineFn(s storage.Putter, encrypt bool, rLevel redundancy.Level) pipelineFunc { + return func(ctx context.Context, r io.Reader) (swarm.Address, error) { + pipe := builder.NewPipelineBuilder(ctx, s, encrypt, rLevel) + return builder.FeedPipeline(ctx, pipe, r) + } +}