diff --git a/cmd/bee/cmd/db.go b/cmd/bee/cmd/db.go index d491f815beb..44a994d1312 100644 --- a/cmd/bee/cmd/db.go +++ b/cmd/bee/cmd/db.go @@ -7,11 +7,13 @@ package cmd import ( "archive/tar" "bytes" + "context" "encoding/binary" "errors" "fmt" "io" "os" + "path" "path/filepath" "strings" "time" @@ -24,6 +26,8 @@ import ( "github.com/spf13/cobra" ) +const optionNameValidation = "validate" + func (c *command) initDBCmd() { cmd := &cobra.Command{ Use: "db", @@ -34,6 +38,7 @@ func (c *command) initDBCmd() { dbImportCmd(cmd) dbNukeCmd(cmd) dbInfoCmd(cmd) + dbCompactCmd(cmd) c.root.AddCommand(cmd) } @@ -75,6 +80,8 @@ func dbInfoCmd(cmd *cobra.Command) { } defer db.Close() + logger.Info("getting db info", "path", dataDir) + info, err := db.DebugInfo(cmd.Context()) if err != nil { return fmt.Errorf("fetching db info: %w", err) @@ -95,6 +102,55 @@ func dbInfoCmd(cmd *cobra.Command) { cmd.AddCommand(c) } +func dbCompactCmd(cmd *cobra.Command) { + c := &cobra.Command{ + Use: "compact", + Short: "Compacts the localstore sharky store.", + RunE: func(cmd *cobra.Command, args []string) (err error) { + v, err := cmd.Flags().GetString(optionNameVerbosity) + if err != nil { + return fmt.Errorf("get verbosity: %w", err) + } + v = strings.ToLower(v) + logger, err := newLogger(cmd, v) + if err != nil { + return fmt.Errorf("new logger: %w", err) + } + + dataDir, err := cmd.Flags().GetString(optionNameDataDir) + if err != nil { + return fmt.Errorf("get data-dir: %w", err) + } + if dataDir == "" { + return errors.New("no data-dir provided") + } + + validation, err := cmd.Flags().GetBool(optionNameValidation) + if err != nil { + return fmt.Errorf("get validation: %w", err) + } + + localstorePath := path.Join(dataDir, "localstore") + + err = storer.Compact(context.Background(), localstorePath, &storer.Options{ + Logger: logger, + RadiusSetter: noopRadiusSetter{}, + Batchstore: new(postage.NoOpBatchStore), + ReserveCapacity: node.ReserveCapacity, + }, validation) + if err != nil { + return fmt.Errorf("localstore: %w", err) + } + + return nil + }, + } + c.Flags().String(optionNameDataDir, "", "data directory") + c.Flags().String(optionNameVerbosity, "info", "verbosity level") + c.Flags().Bool(optionNameValidation, false, "run chunk validation checks before and after the compaction") + cmd.AddCommand(c) +} + func dbExportCmd(cmd *cobra.Command) { c := &cobra.Command{ Use: "export", diff --git a/pkg/sharky/recovery.go b/pkg/sharky/recovery.go index 196cb5d05f7..5a223e4c233 100644 --- a/pkg/sharky/recovery.go +++ b/pkg/sharky/recovery.go @@ -31,7 +31,7 @@ func NewRecovery(dir string, shardCnt int, datasize int) (*Recovery, error) { shardFiles := make([]*os.File, shardCnt) for i := 0; i < shardCnt; i++ { - file, err := os.OpenFile(path.Join(dir, fmt.Sprintf("shard_%03d", i)), os.O_RDONLY, 0666) + file, err := os.OpenFile(path.Join(dir, fmt.Sprintf("shard_%03d", i)), os.O_RDWR, 0666) if errors.Is(err, fs.ErrNotExist) { return nil, fmt.Errorf("index %d: %w", i, ErrShardNotFound) } @@ -86,6 +86,27 @@ func (r *Recovery) Read(ctx context.Context, loc Location, buf []byte) error { return err } +func (r *Recovery) Move(ctx context.Context, from Location, to Location) error { + r.mtx.Lock() + defer r.mtx.Unlock() + + chData := make([]byte, from.Length) + _, err := r.shardFiles[from.Shard].ReadAt(chData, int64(from.Slot)*int64(r.datasize)) + if err != nil { + return err + } + + _, err = r.shardFiles[to.Shard].WriteAt(chData, int64(to.Slot)*int64(r.datasize)) + return err +} + +func (r *Recovery) TruncateAt(ctx context.Context, shard uint8, slot uint32) error { + r.mtx.Lock() + defer r.mtx.Unlock() + + return r.shardFiles[shard].Truncate(int64(slot) * int64(r.datasize)) +} + // Save saves all free slots files of the recovery (without closing). func (r *Recovery) Save() error { r.mtx.Lock() diff --git a/pkg/storer/compact.go b/pkg/storer/compact.go new file mode 100644 index 00000000000..dbce17c220d --- /dev/null +++ b/pkg/storer/compact.go @@ -0,0 +1,192 @@ +// Copyright 2023 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package storer + +import ( + "context" + "errors" + "fmt" + "path" + "sort" + "time" + + "github.com/ethersphere/bee/pkg/cac" + "github.com/ethersphere/bee/pkg/log" + "github.com/ethersphere/bee/pkg/sharky" + "github.com/ethersphere/bee/pkg/soc" + "github.com/ethersphere/bee/pkg/storage" + "github.com/ethersphere/bee/pkg/storer/internal/chunkstore" + "github.com/ethersphere/bee/pkg/swarm" + "golang.org/x/sync/errgroup" +) + +// Compact minimizes sharky disk usage by, using the current sharky locations from the storer, +// relocating chunks starting from the end of the used slots to the first available slots. +func Compact(ctx context.Context, basePath string, opts *Options, validate bool) error { + + logger := opts.Logger + + store, err := initStore(basePath, opts) + if err != nil { + return fmt.Errorf("failed creating levelDB index store: %w", err) + } + defer func() { + if err := store.Close(); err != nil { + logger.Error(err, "failed closing store") + } + }() + + sharkyRecover, err := sharky.NewRecovery(path.Join(basePath, sharkyPath), sharkyNoOfShards, swarm.SocMaxChunkSize) + if err != nil { + return err + } + defer func() { + if err := sharkyRecover.Close(); err != nil { + logger.Error(err, "failed closing sharky recovery") + } + }() + + if validate { + logger.Info("performing chunk validation before compaction") + validationWork(logger, store, sharkyRecover) + } + + logger.Info("starting compaction") + + n := time.Now() + + for shard := 0; shard < sharkyNoOfShards; shard++ { + + select { + case <-ctx.Done(): + return errors.Join(ctx.Err(), sharkyRecover.Save()) + default: + } + + items := make([]*chunkstore.RetrievalIndexItem, 0, 1_000_000) + // we deliberately choose to iterate the whole store again for each shard + // so that we do not store all the items in memory (for operators with huge localstores) + _ = chunkstore.Iterate(store, func(item *chunkstore.RetrievalIndexItem) error { + if item.Location.Shard == uint8(shard) { + items = append(items, item) + } + return nil + }) + + sort.Slice(items, func(i, j int) bool { + return items[i].Location.Slot < items[j].Location.Slot + }) + + lastUsedSlot := items[len(items)-1].Location.Slot + slots := make([]*chunkstore.RetrievalIndexItem, lastUsedSlot+1) // marks free and used slots + for _, l := range items { + slots[l.Location.Slot] = l + } + + // start begins at the zero slot. The loop below will increment the position of start until a free slot is found. + // end points to the last slot, and the loop will decrement the position of end until a used slot is found. + // Once start and end point to free and used slots, respectively, the swap of the chunk location will occur. + start := uint32(0) + end := lastUsedSlot + + for start < end { + if slots[start] == nil { // free + if slots[end] != nil { // used + from := slots[end] + to := sharky.Location{Slot: start, Length: from.Location.Length, Shard: from.Location.Shard} + if err := sharkyRecover.Move(context.Background(), from.Location, to); err != nil { + return fmt.Errorf("sharky move: %w", err) + } + if err := sharkyRecover.Add(to); err != nil { + return fmt.Errorf("sharky add: %w", err) + } + + from.Location = to + if err := store.Put(from); err != nil { + return fmt.Errorf("store put: %w", err) + } + + start++ + end-- + } else { + end-- // keep moving to the left until a used slot is found + } + } else { + start++ // keep moving to the right until a free slot is found + } + } + + logger.Info("shard truncated", "shard", fmt.Sprintf("%d/%d", shard, sharkyNoOfShards-1), "slot", end) + + if err := sharkyRecover.TruncateAt(context.Background(), uint8(shard), end+1); err != nil { + return fmt.Errorf("sharky truncate: %w", err) + } + } + + if err := sharkyRecover.Save(); err != nil { + return fmt.Errorf("sharky save: %w", err) + } + + logger.Info("compaction finished", "duration", time.Since(n)) + + if validate { + logger.Info("performing chunk validation after compaction") + validationWork(logger, store, sharkyRecover) + } + + return nil +} + +func validationWork(logger log.Logger, store storage.Store, sharky *sharky.Recovery) { + + n := time.Now() + defer func() { + logger.Info("validation finished", "duration", time.Since(n)) + }() + + iteratateItemsC := make(chan *chunkstore.RetrievalIndexItem) + + validChunk := func(item *chunkstore.RetrievalIndexItem, buf []byte) error { + err := sharky.Read(context.Background(), item.Location, buf) + if err != nil { + return err + } + + ch := swarm.NewChunk(item.Address, buf) + if !cac.Valid(ch) && !soc.Valid(ch) { + return errors.New("invalid chunk") + } + + return nil + } + + eg := errgroup.Group{} + + for i := 0; i < 8; i++ { + eg.Go(func() error { + buf := make([]byte, swarm.SocMaxChunkSize) + for item := range iteratateItemsC { + if err := validChunk(item, buf[:item.Location.Length]); err != nil { + logger.Info("invalid chunk", "address", item.Address, "error", err) + } + } + return nil + }) + } + + count := 0 + _ = chunkstore.Iterate(store, func(item *chunkstore.RetrievalIndexItem) error { + iteratateItemsC <- item + count++ + if count%100_000 == 0 { + logger.Info("..still validating chunks", "count", count) + } + return nil + }) + + close(iteratateItemsC) + + _ = eg.Wait() +} diff --git a/pkg/storer/compact_test.go b/pkg/storer/compact_test.go new file mode 100644 index 00000000000..6544dd7d54b --- /dev/null +++ b/pkg/storer/compact_test.go @@ -0,0 +1,107 @@ +// Copyright 2023 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package storer_test + +import ( + "bytes" + "context" + "os" + "testing" + "time" + + "github.com/ethersphere/bee/pkg/log" + "github.com/ethersphere/bee/pkg/postage" + postagetesting "github.com/ethersphere/bee/pkg/postage/testing" + pullerMock "github.com/ethersphere/bee/pkg/puller/mock" + chunk "github.com/ethersphere/bee/pkg/storage/testing" + storer "github.com/ethersphere/bee/pkg/storer" + "github.com/ethersphere/bee/pkg/swarm" +) + +// TestCompact creates two batches and puts chunks belonging to both batches. +// The first batch is then expired, causing free slots to accumulate in sharky. +// Next, sharky is compacted, after which, it is tested that valid chunks can still be retrieved. +func TestCompact(t *testing.T) { + + baseAddr := swarm.RandAddress(t) + ctx := context.Background() + basePath := t.TempDir() + + opts := dbTestOps(baseAddr, 10_000, nil, nil, time.Second) + opts.CacheCapacity = 0 + opts.Logger = log.NewLogger("test", log.WithSink(os.Stdout)) + + st, err := storer.New(ctx, basePath, opts) + if err != nil { + t.Fatal(err) + } + st.StartReserveWorker(ctx, pullerMock.NewMockRateReporter(0), networkRadiusFunc(0)) + + var chunks []swarm.Chunk + var chunksPerPO uint64 = 50 + batches := []*postage.Batch{postagetesting.MustNewBatch(), postagetesting.MustNewBatch()} + evictBatch := batches[0] + + putter := st.ReservePutter() + + for b := 0; b < len(batches); b++ { + for i := uint64(0); i < chunksPerPO; i++ { + ch := chunk.GenerateValidRandomChunkAt(baseAddr, b) + ch = ch.WithStamp(postagetesting.MustNewBatchStamp(batches[b].ID)) + chunks = append(chunks, ch) + err := putter.Put(ctx, ch) + if err != nil { + t.Fatal(err) + } + } + } + + err = st.EvictBatch(ctx, evictBatch.ID) + if err != nil { + t.Fatal(err) + } + + c, unsub := st.Events().Subscribe("batchExpiryDone") + t.Cleanup(unsub) + <-c + + if err := st.Close(); err != nil { + t.Fatal(err) + } + + err = storer.Compact(ctx, basePath, opts, true) + if err != nil { + t.Fatal(err) + } + + st, err = storer.New(ctx, basePath, opts) + if err != nil { + t.Fatal(err) + } + + for _, ch := range chunks { + has, err := st.ReserveHas(ch.Address(), ch.Stamp().BatchID()) + if err != nil { + t.Fatal(err) + } + + if has { + checkSaved(t, st, ch, true, true) + } + + if bytes.Equal(ch.Stamp().BatchID(), evictBatch.ID) { + if has { + t.Fatal("store should NOT have chunk") + } + checkSaved(t, st, ch, false, false) + } else if !has { + t.Fatal("store should have chunk") + } + } + + if err := st.Close(); err != nil { + t.Fatal(err) + } +} diff --git a/pkg/storer/internal/cache/cache.go b/pkg/storer/internal/cache/cache.go index 20d07050405..c2bd9eb7b6e 100644 --- a/pkg/storer/internal/cache/cache.go +++ b/pkg/storer/internal/cache/cache.go @@ -396,7 +396,11 @@ func (c *Cache) MoveFromReserve( return nil } + //consider only the amount that can fit, the rest should be deleted from the chunkstore. if len(entriesToAdd) > c.capacity { + for _, e := range entriesToAdd[:len(entriesToAdd)-c.capacity] { + _ = store.ChunkStore().Delete(ctx, e.Address) + } entriesToAdd = entriesToAdd[len(entriesToAdd)-c.capacity:] } diff --git a/pkg/storer/internal/chunkstore/helpers.go b/pkg/storer/internal/chunkstore/helpers.go index 1e003ddf25f..8e0269144eb 100644 --- a/pkg/storer/internal/chunkstore/helpers.go +++ b/pkg/storer/internal/chunkstore/helpers.go @@ -17,6 +17,11 @@ type LocationResult struct { Location sharky.Location } +type IterateResult struct { + Err error + Item *RetrievalIndexItem +} + // IterateLocations iterates over entire retrieval index and plucks only sharky location. func IterateLocations( ctx context.Context, @@ -50,3 +55,13 @@ func IterateLocations( } }() } + +// Iterate iterates over entire retrieval index with a call back. +func Iterate(st storage.Store, callBackFunc func(*RetrievalIndexItem) error) error { + return st.Iterate(storage.Query{ + Factory: func() storage.Item { return new(RetrievalIndexItem) }, + }, func(r storage.Result) (bool, error) { + entry := r.Entry.(*RetrievalIndexItem) + return false, callBackFunc(entry) + }) +} diff --git a/pkg/storer/reserve_test.go b/pkg/storer/reserve_test.go index e00f57f457d..01e4c9116bc 100644 --- a/pkg/storer/reserve_test.go +++ b/pkg/storer/reserve_test.go @@ -166,7 +166,6 @@ func TestEvictBatch(t *testing.T) { baseAddr := swarm.RandAddress(t) - t.Cleanup(func() {}) st, err := diskStorer(t, dbTestOps(baseAddr, 100, nil, nil, time.Minute))() if err != nil { t.Fatal(err) @@ -679,10 +678,15 @@ func checkSaved(t *testing.T, st *storer.DB, ch swarm.Chunk, stampSaved, chunkSt if !chunkStoreSaved { chunkStoreWantedErr = storage.ErrNotFound } - _, err = st.Repo().ChunkStore().Get(context.Background(), ch.Address()) + gotCh, err := st.Repo().ChunkStore().Get(context.Background(), ch.Address()) if !errors.Is(err, chunkStoreWantedErr) { t.Fatalf("wanted err %s, got err %s", chunkStoreWantedErr, err) } + if chunkStoreSaved { + if !bytes.Equal(ch.Data(), gotCh.Data()) { + t.Fatalf("chunks are not equal: %s", ch.Address()) + } + } } func BenchmarkReservePutter(b *testing.B) {