From ba49f9f49699aeb3adfd8f431a5e93c31fc10a1b Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Wed, 20 Sep 2023 20:31:26 +0300 Subject: [PATCH 01/21] feat: sharky compaction cmd --- cmd/bee/cmd/db.go | 44 +++++++++ pkg/sharky/recovery.go | 27 +++++- pkg/storer/compact.go | 106 ++++++++++++++++++++++ pkg/storer/compact_test.go | 104 +++++++++++++++++++++ pkg/storer/internal/cache/cache.go | 6 ++ pkg/storer/internal/chunkstore/helpers.go | 38 ++++++++ pkg/storer/migration/step_04.go | 3 + pkg/storer/reserve_test.go | 8 +- 8 files changed, 333 insertions(+), 3 deletions(-) create mode 100644 pkg/storer/compact.go create mode 100644 pkg/storer/compact_test.go diff --git a/cmd/bee/cmd/db.go b/cmd/bee/cmd/db.go index d491f815beb..ae26e7d595c 100644 --- a/cmd/bee/cmd/db.go +++ b/cmd/bee/cmd/db.go @@ -95,6 +95,50 @@ func dbInfoCmd(cmd *cobra.Command) { cmd.AddCommand(c) } +func dbCompactCmd(cmd *cobra.Command) { + c := &cobra.Command{ + Use: "compact", + Short: "Compacts the localstore sharky store.", + RunE: func(cmd *cobra.Command, args []string) (err error) { + v, err := cmd.Flags().GetString(optionNameVerbosity) + if err != nil { + return fmt.Errorf("get verbosity: %w", err) + } + v = strings.ToLower(v) + logger, err := newLogger(cmd, v) + if err != nil { + return fmt.Errorf("new logger: %w", err) + } + + dataDir, err := cmd.Flags().GetString(optionNameDataDir) + if err != nil { + return fmt.Errorf("get data-dir: %w", err) + } + if dataDir == "" { + return errors.New("no data-dir provided") + } + + logger.Info("getting db indices with data-dir", "path", dataDir) + + db, err := storer.New(cmd.Context(), dataDir, &storer.Options{ + Logger: logger, + RadiusSetter: noopRadiusSetter{}, + Batchstore: new(postage.NoOpBatchStore), + ReserveCapacity: node.ReserveCapacity, + }) + if err != nil { + return fmt.Errorf("localstore: %w", err) + } + defer db.Close() + + return nil + }, + } + c.Flags().String(optionNameDataDir, "", "data directory") + c.Flags().String(optionNameVerbosity, "info", "verbosity level") + cmd.AddCommand(c) +} + func dbExportCmd(cmd *cobra.Command) { c := &cobra.Command{ Use: "export", diff --git a/pkg/sharky/recovery.go b/pkg/sharky/recovery.go index 196cb5d05f7..e7c367e2603 100644 --- a/pkg/sharky/recovery.go +++ b/pkg/sharky/recovery.go @@ -31,7 +31,7 @@ func NewRecovery(dir string, shardCnt int, datasize int) (*Recovery, error) { shardFiles := make([]*os.File, shardCnt) for i := 0; i < shardCnt; i++ { - file, err := os.OpenFile(path.Join(dir, fmt.Sprintf("shard_%03d", i)), os.O_RDONLY, 0666) + file, err := os.OpenFile(path.Join(dir, fmt.Sprintf("shard_%03d", i)), os.O_RDWR, 0666) if errors.Is(err, fs.ErrNotExist) { return nil, fmt.Errorf("index %d: %w", i, ErrShardNotFound) } @@ -86,6 +86,31 @@ func (r *Recovery) Read(ctx context.Context, loc Location, buf []byte) error { return err } +func (r *Recovery) Move(ctx context.Context, from Location, to Location) error { + r.mtx.Lock() + defer r.mtx.Unlock() + + chData := make([]byte, from.Length) + _, err := r.shardFiles[from.Shard].ReadAt(chData, int64(from.Slot)*int64(r.datasize)) + if err != nil { + return err + } + + _, err = r.shardFiles[to.Shard].WriteAt(chData, int64(to.Slot)*int64(r.datasize)) + return err +} + +func (r *Recovery) TruncateAt(ctx context.Context, shard uint8, slot uint32) error { + r.mtx.Lock() + defer r.mtx.Unlock() + + err := r.shardFiles[shard].Truncate(int64(slot) * int64(r.datasize)) + if err != nil { + return err + } + return r.shardFiles[shard].Sync() +} + // Save saves all free slots files of the recovery (without closing). func (r *Recovery) Save() error { r.mtx.Lock() diff --git a/pkg/storer/compact.go b/pkg/storer/compact.go new file mode 100644 index 00000000000..22798f4e136 --- /dev/null +++ b/pkg/storer/compact.go @@ -0,0 +1,106 @@ +// Copyright 2023 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package storer + +import ( + "context" + "fmt" + "path" + "sort" + + "github.com/ethersphere/bee/pkg/sharky" + "github.com/ethersphere/bee/pkg/storer/internal/chunkstore" + "github.com/ethersphere/bee/pkg/swarm" +) + +func Compact(ctx context.Context, basePath string, opts *Options) error { + + logger := opts.Logger + + store, err := initStore(basePath, opts) + if err != nil { + return fmt.Errorf("failed creating levelDB index store: %w", err) + } + + sharkyRecover, err := sharky.NewRecovery(path.Join(basePath, sharkyPath), sharkyNoOfShards, swarm.SocMaxChunkSize) + if err != nil { + return err + } + + defer func() { + if err := store.Close(); err != nil { + logger.Error(err, "failed closing store") + } + if err := sharkyRecover.Close(); err != nil { + logger.Error(err, "failed closing sharky recovery") + } + }() + + iteratateItemsC := make(chan chunkstore.IterateResult) + chunkstore.Iterate(ctx, store, iteratateItemsC) + + var shards [][]*chunkstore.RetrievalIndexItem + for i := 0; i < sharkyNoOfShards; i++ { + shards = append(shards, []*chunkstore.RetrievalIndexItem{}) + } + + for c := range iteratateItemsC { + if c.Err != nil { + return fmt.Errorf("location read: %w", err) + } + shards[c.Item.Location.Shard] = append(shards[c.Item.Location.Shard], c.Item) + } + + for shard := 0; shard < sharkyNoOfShards; shard++ { + + locs := shards[shard] + + sort.Slice(locs, func(i, j int) bool { + return locs[i].Location.Slot < locs[j].Location.Slot + }) + + lastUsedSlot := locs[len(locs)-1].Location.Slot + slots := make([]*chunkstore.RetrievalIndexItem, lastUsedSlot+1) // marks free and used slots + for _, l := range locs { + slots[l.Location.Slot] = l + fmt.Println(l.Location.Shard, l.Location.Slot) + } + + start := uint32(0) + end := lastUsedSlot + + for start < end { + if slots[start] == nil { // free + if slots[end] != nil { // used + from := slots[end] + to := sharky.Location{Slot: start, Length: from.Location.Length, Shard: from.Location.Shard} + if err := sharkyRecover.Move(ctx, from.Location, to); err != nil { + return fmt.Errorf("sharky move: %w", err) + } + if err := sharkyRecover.Add(to); err != nil { + return fmt.Errorf("sharky add: %w", err) + } + + from.Location = to + store.Put(from) + + start++ + end-- + } else { + end-- // keep moving to the left until a used slot is found + } + } else { + start++ // keep moving to the right until a free slot is found + } + } + + fmt.Println("truncate", shard, end) + if err := sharkyRecover.TruncateAt(ctx, uint8(shard), end+1); err != nil { + return fmt.Errorf("sharky truncate: %w", err) + } + } + + return sharkyRecover.Save() +} diff --git a/pkg/storer/compact_test.go b/pkg/storer/compact_test.go new file mode 100644 index 00000000000..0f443adbb10 --- /dev/null +++ b/pkg/storer/compact_test.go @@ -0,0 +1,104 @@ +// Copyright 2023 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package storer_test + +import ( + "bytes" + "context" + "testing" + "time" + + "github.com/ethersphere/bee/pkg/postage" + postagetesting "github.com/ethersphere/bee/pkg/postage/testing" + pullerMock "github.com/ethersphere/bee/pkg/puller/mock" + chunk "github.com/ethersphere/bee/pkg/storage/testing" + storer "github.com/ethersphere/bee/pkg/storer" + "github.com/ethersphere/bee/pkg/swarm" +) + +func TestCompact(t *testing.T) { + + baseAddr := swarm.RandAddress(t) + ctx := context.Background() + basePath := t.TempDir() + + opts := dbTestOps(baseAddr, 10_000, nil, nil, time.Second) + opts.CacheCapacity = 0 + + st, err := storer.New(ctx, basePath, opts) + if err != nil { + t.Fatal(err) + } + st.StartReserveWorker(ctx, pullerMock.NewMockRateReporter(0), networkRadiusFunc(0)) + + var chunks []swarm.Chunk + var chunksPerPO uint64 = 50 + batches := []*postage.Batch{postagetesting.MustNewBatch(), postagetesting.MustNewBatch()} + evictBatch := batches[0] + + putter := st.ReservePutter() + + for b := 0; b < len(batches); b++ { + for i := uint64(0); i < chunksPerPO; i++ { + ch := chunk.GenerateTestRandomChunkAt(t, baseAddr, b) + ch = ch.WithStamp(postagetesting.MustNewBatchStamp(batches[b].ID)) + chunks = append(chunks, ch) + err := putter.Put(ctx, ch) + if err != nil { + t.Fatal(err) + } + } + } + + err = st.EvictBatch(ctx, evictBatch.ID) + if err != nil { + t.Fatal(err) + } + + c, unsub := st.Events().Subscribe("batchExpiryDone") + t.Cleanup(unsub) + gotUnreserveSignal := make(chan struct{}) + go func() { + defer close(gotUnreserveSignal) + <-c + }() + <-gotUnreserveSignal + + err = st.Close() + if err != nil { + t.Fatal(err) + } + + err = storer.Compact(ctx, basePath, opts) + if err != nil { + t.Fatal(err) + } + + st, err = storer.New(ctx, basePath, opts) + if err != nil { + t.Fatal(err) + } + + for _, ch := range chunks { + has, err := st.ReserveHas(ch.Address(), ch.Stamp().BatchID()) + if err != nil { + t.Fatal(err) + } + + if has { + checkSaved(t, st, ch, true, true) + } + + if bytes.Equal(ch.Stamp().BatchID(), evictBatch.ID) { + if has { + t.Fatal("store should NOT have chunk") + } + checkSaved(t, st, ch, false, false) + } else if !has { + t.Fatal("store should have chunk") + } + } + +} diff --git a/pkg/storer/internal/cache/cache.go b/pkg/storer/internal/cache/cache.go index 20d07050405..b0d1a33bcc6 100644 --- a/pkg/storer/internal/cache/cache.go +++ b/pkg/storer/internal/cache/cache.go @@ -158,6 +158,8 @@ func New(ctx context.Context, store internal.Storage, capacity uint64) (*Cache, return nil, fmt.Errorf("failed counting cache entries: %w", err) } + fmt.Println("capacity", capacity) + if count > int(capacity) { err := removeOldest( ctx, @@ -396,7 +398,11 @@ func (c *Cache) MoveFromReserve( return nil } + //consider only the amount that can fit, the rest should be deleted from the chunkstore. if len(entriesToAdd) > c.capacity { + for _, e := range entriesToAdd[:len(entriesToAdd)-c.capacity] { + _ = store.ChunkStore().Delete(ctx, e.Address) + } entriesToAdd = entriesToAdd[len(entriesToAdd)-c.capacity:] } diff --git a/pkg/storer/internal/chunkstore/helpers.go b/pkg/storer/internal/chunkstore/helpers.go index 1e003ddf25f..5880467cb48 100644 --- a/pkg/storer/internal/chunkstore/helpers.go +++ b/pkg/storer/internal/chunkstore/helpers.go @@ -17,6 +17,11 @@ type LocationResult struct { Location sharky.Location } +type IterateResult struct { + Err error + Item *RetrievalIndexItem +} + // IterateLocations iterates over entire retrieval index and plucks only sharky location. func IterateLocations( ctx context.Context, @@ -50,3 +55,36 @@ func IterateLocations( } }() } + +// IterateLocations iterates over entire retrieval index and plucks only sharky location. +func Iterate( + ctx context.Context, + st storage.Store, + locationResultC chan<- IterateResult, +) { + go func() { + defer close(locationResultC) + + err := st.Iterate(storage.Query{ + Factory: func() storage.Item { return new(RetrievalIndexItem) }, + }, func(r storage.Result) (bool, error) { + entry := r.Entry.(*RetrievalIndexItem) + + select { + case <-ctx.Done(): + return true, ctx.Err() + case locationResultC <- IterateResult{Item: entry}: + } + + return false, nil + }) + if err != nil { + result := IterateResult{Err: fmt.Errorf("iterate retrieval index error: %w", err)} + + select { + case <-ctx.Done(): + case locationResultC <- result: + } + } + }() +} diff --git a/pkg/storer/migration/step_04.go b/pkg/storer/migration/step_04.go index e621ae3458a..4be1a053cd3 100644 --- a/pkg/storer/migration/step_04.go +++ b/pkg/storer/migration/step_04.go @@ -6,6 +6,7 @@ package migration import ( "context" + "fmt" "os" "github.com/ethersphere/bee/pkg/log" @@ -42,6 +43,8 @@ func step_04( return res.Err } + fmt.Println(res.Location.Shard, res.Location.Shard) + if err := sharkyRecover.Add(res.Location); err != nil { return err } diff --git a/pkg/storer/reserve_test.go b/pkg/storer/reserve_test.go index e00f57f457d..01e4c9116bc 100644 --- a/pkg/storer/reserve_test.go +++ b/pkg/storer/reserve_test.go @@ -166,7 +166,6 @@ func TestEvictBatch(t *testing.T) { baseAddr := swarm.RandAddress(t) - t.Cleanup(func() {}) st, err := diskStorer(t, dbTestOps(baseAddr, 100, nil, nil, time.Minute))() if err != nil { t.Fatal(err) @@ -679,10 +678,15 @@ func checkSaved(t *testing.T, st *storer.DB, ch swarm.Chunk, stampSaved, chunkSt if !chunkStoreSaved { chunkStoreWantedErr = storage.ErrNotFound } - _, err = st.Repo().ChunkStore().Get(context.Background(), ch.Address()) + gotCh, err := st.Repo().ChunkStore().Get(context.Background(), ch.Address()) if !errors.Is(err, chunkStoreWantedErr) { t.Fatalf("wanted err %s, got err %s", chunkStoreWantedErr, err) } + if chunkStoreSaved { + if !bytes.Equal(ch.Data(), gotCh.Data()) { + t.Fatalf("chunks are not equal: %s", ch.Address()) + } + } } func BenchmarkReservePutter(b *testing.B) { From 67b76dc514d7fb83c55e57011bbc683f6f16648e Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Wed, 20 Sep 2023 20:33:00 +0300 Subject: [PATCH 02/21] fix: add cmd --- cmd/bee/cmd/db.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cmd/bee/cmd/db.go b/cmd/bee/cmd/db.go index ae26e7d595c..3ba6f560642 100644 --- a/cmd/bee/cmd/db.go +++ b/cmd/bee/cmd/db.go @@ -34,6 +34,7 @@ func (c *command) initDBCmd() { dbImportCmd(cmd) dbNukeCmd(cmd) dbInfoCmd(cmd) + dbCompactCmd(cmd) c.root.AddCommand(cmd) } From bdaeafcad4fc2bb5af993f6f5b8baa3b2bf141c9 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Wed, 20 Sep 2023 21:00:02 +0300 Subject: [PATCH 03/21] fix: logs --- cmd/bee/cmd/db.go | 8 ++++---- pkg/sharky/recovery.go | 6 +----- pkg/storer/compact.go | 30 ++++++++++++++++++++++++------ pkg/storer/compact_test.go | 3 +++ pkg/storer/internal/cache/cache.go | 2 -- 5 files changed, 32 insertions(+), 17 deletions(-) diff --git a/cmd/bee/cmd/db.go b/cmd/bee/cmd/db.go index 3ba6f560642..44841c30cc4 100644 --- a/cmd/bee/cmd/db.go +++ b/cmd/bee/cmd/db.go @@ -7,6 +7,7 @@ package cmd import ( "archive/tar" "bytes" + "context" "encoding/binary" "errors" "fmt" @@ -76,6 +77,8 @@ func dbInfoCmd(cmd *cobra.Command) { } defer db.Close() + logger.Info("getting db info", "path", dataDir) + info, err := db.DebugInfo(cmd.Context()) if err != nil { return fmt.Errorf("fetching db info: %w", err) @@ -119,9 +122,7 @@ func dbCompactCmd(cmd *cobra.Command) { return errors.New("no data-dir provided") } - logger.Info("getting db indices with data-dir", "path", dataDir) - - db, err := storer.New(cmd.Context(), dataDir, &storer.Options{ + err = storer.Compact(context.Background(), dataDir, &storer.Options{ Logger: logger, RadiusSetter: noopRadiusSetter{}, Batchstore: new(postage.NoOpBatchStore), @@ -130,7 +131,6 @@ func dbCompactCmd(cmd *cobra.Command) { if err != nil { return fmt.Errorf("localstore: %w", err) } - defer db.Close() return nil }, diff --git a/pkg/sharky/recovery.go b/pkg/sharky/recovery.go index e7c367e2603..5a223e4c233 100644 --- a/pkg/sharky/recovery.go +++ b/pkg/sharky/recovery.go @@ -104,11 +104,7 @@ func (r *Recovery) TruncateAt(ctx context.Context, shard uint8, slot uint32) err r.mtx.Lock() defer r.mtx.Unlock() - err := r.shardFiles[shard].Truncate(int64(slot) * int64(r.datasize)) - if err != nil { - return err - } - return r.shardFiles[shard].Sync() + return r.shardFiles[shard].Truncate(int64(slot) * int64(r.datasize)) } // Save saves all free slots files of the recovery (without closing). diff --git a/pkg/storer/compact.go b/pkg/storer/compact.go index 22798f4e136..d6bd9addf5f 100644 --- a/pkg/storer/compact.go +++ b/pkg/storer/compact.go @@ -9,6 +9,7 @@ import ( "fmt" "path" "sort" + "time" "github.com/ethersphere/bee/pkg/sharky" "github.com/ethersphere/bee/pkg/storer/internal/chunkstore" @@ -19,20 +20,28 @@ func Compact(ctx context.Context, basePath string, opts *Options) error { logger := opts.Logger + n := time.Now() + defer func() { + logger.Info("compaction finished", "duration", time.Since(n)) + }() + + logger.Info("starting compaction") + store, err := initStore(basePath, opts) if err != nil { return fmt.Errorf("failed creating levelDB index store: %w", err) } + defer func() { + if err := store.Close(); err != nil { + logger.Error(err, "failed closing store") + } + }() sharkyRecover, err := sharky.NewRecovery(path.Join(basePath, sharkyPath), sharkyNoOfShards, swarm.SocMaxChunkSize) if err != nil { return err } - defer func() { - if err := store.Close(); err != nil { - logger.Error(err, "failed closing store") - } if err := sharkyRecover.Close(); err != nil { logger.Error(err, "failed closing sharky recovery") } @@ -46,17 +55,23 @@ func Compact(ctx context.Context, basePath string, opts *Options) error { shards = append(shards, []*chunkstore.RetrievalIndexItem{}) } + count := 0 for c := range iteratateItemsC { if c.Err != nil { return fmt.Errorf("location read: %w", err) } + count++ shards[c.Item.Location.Shard] = append(shards[c.Item.Location.Shard], c.Item) } + logger.Info("total items", "count", count) + for shard := 0; shard < sharkyNoOfShards; shard++ { locs := shards[shard] + logger.Info("starting shard", "shard", shard) + sort.Slice(locs, func(i, j int) bool { return locs[i].Location.Slot < locs[j].Location.Slot }) @@ -65,7 +80,6 @@ func Compact(ctx context.Context, basePath string, opts *Options) error { slots := make([]*chunkstore.RetrievalIndexItem, lastUsedSlot+1) // marks free and used slots for _, l := range locs { slots[l.Location.Slot] = l - fmt.Println(l.Location.Shard, l.Location.Slot) } start := uint32(0) @@ -84,7 +98,9 @@ func Compact(ctx context.Context, basePath string, opts *Options) error { } from.Location = to - store.Put(from) + if err := store.Put(from); err != nil { + return fmt.Errorf("sharky add: %w", err) + } start++ end-- @@ -96,6 +112,8 @@ func Compact(ctx context.Context, basePath string, opts *Options) error { } } + logger.Info("shard truncated", "shard", shard, "slot", end) + fmt.Println("truncate", shard, end) if err := sharkyRecover.TruncateAt(ctx, uint8(shard), end+1); err != nil { return fmt.Errorf("sharky truncate: %w", err) diff --git a/pkg/storer/compact_test.go b/pkg/storer/compact_test.go index 0f443adbb10..b15c46d94aa 100644 --- a/pkg/storer/compact_test.go +++ b/pkg/storer/compact_test.go @@ -18,6 +18,9 @@ import ( "github.com/ethersphere/bee/pkg/swarm" ) +// TestCompact creates two batches and puts chunks belonging to both batches. +// The first batch is then expired, causing free slots to accumulate in sharky. +// Next, sharky is compacted, after which, it is tested that valid chunks can still be retrieved. func TestCompact(t *testing.T) { baseAddr := swarm.RandAddress(t) diff --git a/pkg/storer/internal/cache/cache.go b/pkg/storer/internal/cache/cache.go index b0d1a33bcc6..c2bd9eb7b6e 100644 --- a/pkg/storer/internal/cache/cache.go +++ b/pkg/storer/internal/cache/cache.go @@ -158,8 +158,6 @@ func New(ctx context.Context, store internal.Storage, capacity uint64) (*Cache, return nil, fmt.Errorf("failed counting cache entries: %w", err) } - fmt.Println("capacity", capacity) - if count > int(capacity) { err := removeOldest( ctx, From b573d24226d2699ebf5f4ead2cbef1ae8d923460 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Thu, 21 Sep 2023 00:03:42 +0300 Subject: [PATCH 04/21] fix: unit test --- pkg/storer/compact.go | 1 - pkg/storer/migration/step_04.go | 3 --- 2 files changed, 4 deletions(-) diff --git a/pkg/storer/compact.go b/pkg/storer/compact.go index d6bd9addf5f..456ed3a5474 100644 --- a/pkg/storer/compact.go +++ b/pkg/storer/compact.go @@ -114,7 +114,6 @@ func Compact(ctx context.Context, basePath string, opts *Options) error { logger.Info("shard truncated", "shard", shard, "slot", end) - fmt.Println("truncate", shard, end) if err := sharkyRecover.TruncateAt(ctx, uint8(shard), end+1); err != nil { return fmt.Errorf("sharky truncate: %w", err) } diff --git a/pkg/storer/migration/step_04.go b/pkg/storer/migration/step_04.go index 4be1a053cd3..e621ae3458a 100644 --- a/pkg/storer/migration/step_04.go +++ b/pkg/storer/migration/step_04.go @@ -6,7 +6,6 @@ package migration import ( "context" - "fmt" "os" "github.com/ethersphere/bee/pkg/log" @@ -43,8 +42,6 @@ func step_04( return res.Err } - fmt.Println(res.Location.Shard, res.Location.Shard) - if err := sharkyRecover.Add(res.Location); err != nil { return err } From 2e8f48cbb946b4923549f43f40aca50d14a6f161 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Thu, 21 Sep 2023 00:16:45 +0300 Subject: [PATCH 05/21] fix: log --- pkg/storer/compact.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storer/compact.go b/pkg/storer/compact.go index 456ed3a5474..fd4f1f198f6 100644 --- a/pkg/storer/compact.go +++ b/pkg/storer/compact.go @@ -99,7 +99,7 @@ func Compact(ctx context.Context, basePath string, opts *Options) error { from.Location = to if err := store.Put(from); err != nil { - return fmt.Errorf("sharky add: %w", err) + return fmt.Errorf("store put: %w", err) } start++ From 9a604b0369f7c57c2692922e43def539659eccbd Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Thu, 21 Sep 2023 12:05:52 +0300 Subject: [PATCH 06/21] fix: close store unit test --- pkg/storer/compact_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/storer/compact_test.go b/pkg/storer/compact_test.go index b15c46d94aa..dde002975c2 100644 --- a/pkg/storer/compact_test.go +++ b/pkg/storer/compact_test.go @@ -69,8 +69,7 @@ func TestCompact(t *testing.T) { }() <-gotUnreserveSignal - err = st.Close() - if err != nil { + if err := st.Close(); err != nil { t.Fatal(err) } @@ -104,4 +103,7 @@ func TestCompact(t *testing.T) { } } + if err := st.Close(); err != nil { + t.Fatal(err) + } } From 73c9841e25548e00b2035ddf2f43ce7edc05d3a9 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Thu, 21 Sep 2023 12:24:05 +0300 Subject: [PATCH 07/21] fix: logs --- pkg/storer/compact.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pkg/storer/compact.go b/pkg/storer/compact.go index fd4f1f198f6..9416454b7ca 100644 --- a/pkg/storer/compact.go +++ b/pkg/storer/compact.go @@ -16,6 +16,8 @@ import ( "github.com/ethersphere/bee/pkg/swarm" ) +// Compact minimizes sharky disk usage by, using the current sharky locations from the storer, +// relocating chunks starting from the end of the used slots to the first available slots. func Compact(ctx context.Context, basePath string, opts *Options) error { logger := opts.Logger @@ -82,6 +84,9 @@ func Compact(ctx context.Context, basePath string, opts *Options) error { slots[l.Location.Slot] = l } + // start begins at the zero slot. The loop below will increment the position of start until a free slot is found. + // end points to the last slot, and the loop will decrement the position of end until a used slot is found. + // Once start and end point to free and used slots, respectively, the swap of the chunk location will occur. start := uint32(0) end := lastUsedSlot From 2ef1a71278d0beccd91fafa371caa6b09a6224e1 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Thu, 21 Sep 2023 12:34:36 +0300 Subject: [PATCH 08/21] fix: point to localstore --- cmd/bee/cmd/db.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/cmd/bee/cmd/db.go b/cmd/bee/cmd/db.go index 44841c30cc4..124a16898a3 100644 --- a/cmd/bee/cmd/db.go +++ b/cmd/bee/cmd/db.go @@ -13,6 +13,7 @@ import ( "fmt" "io" "os" + "path" "path/filepath" "strings" "time" @@ -122,7 +123,9 @@ func dbCompactCmd(cmd *cobra.Command) { return errors.New("no data-dir provided") } - err = storer.Compact(context.Background(), dataDir, &storer.Options{ + localstorePath := path.Join(dataDir, "localstore") + + err = storer.Compact(context.Background(), localstorePath, &storer.Options{ Logger: logger, RadiusSetter: noopRadiusSetter{}, Batchstore: new(postage.NoOpBatchStore), From b9f308f94c2fe7b21bf320f9efa77b0c2162684b Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Thu, 21 Sep 2023 14:46:09 +0300 Subject: [PATCH 09/21] fix: validation --- cmd/bee/cmd/db.go | 11 ++++++- pkg/storer/compact.go | 61 ++++++++++++++++++++++++++++++++++---- pkg/storer/compact_test.go | 2 +- 3 files changed, 67 insertions(+), 7 deletions(-) diff --git a/cmd/bee/cmd/db.go b/cmd/bee/cmd/db.go index 124a16898a3..6292462f438 100644 --- a/cmd/bee/cmd/db.go +++ b/cmd/bee/cmd/db.go @@ -101,6 +101,9 @@ func dbInfoCmd(cmd *cobra.Command) { } func dbCompactCmd(cmd *cobra.Command) { + + optionNameValidation := "validate" + c := &cobra.Command{ Use: "compact", Short: "Compacts the localstore sharky store.", @@ -123,6 +126,11 @@ func dbCompactCmd(cmd *cobra.Command) { return errors.New("no data-dir provided") } + validation, err := cmd.Flags().GetBool(optionNameValidation) + if err != nil { + return fmt.Errorf("get validation: %w", err) + } + localstorePath := path.Join(dataDir, "localstore") err = storer.Compact(context.Background(), localstorePath, &storer.Options{ @@ -130,7 +138,7 @@ func dbCompactCmd(cmd *cobra.Command) { RadiusSetter: noopRadiusSetter{}, Batchstore: new(postage.NoOpBatchStore), ReserveCapacity: node.ReserveCapacity, - }) + }, validation) if err != nil { return fmt.Errorf("localstore: %w", err) } @@ -140,6 +148,7 @@ func dbCompactCmd(cmd *cobra.Command) { } c.Flags().String(optionNameDataDir, "", "data directory") c.Flags().String(optionNameVerbosity, "info", "verbosity level") + c.Flags().Bool(optionNameValidation, true, "enable chunk validation") cmd.AddCommand(c) } diff --git a/pkg/storer/compact.go b/pkg/storer/compact.go index 9416454b7ca..f5d0db3e147 100644 --- a/pkg/storer/compact.go +++ b/pkg/storer/compact.go @@ -6,26 +6,29 @@ package storer import ( "context" + "errors" "fmt" "path" "sort" "time" + "github.com/ethersphere/bee/pkg/cac" + "github.com/ethersphere/bee/pkg/log" "github.com/ethersphere/bee/pkg/sharky" + "github.com/ethersphere/bee/pkg/soc" + "github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storer/internal/chunkstore" "github.com/ethersphere/bee/pkg/swarm" + "golang.org/x/sync/errgroup" ) // Compact minimizes sharky disk usage by, using the current sharky locations from the storer, // relocating chunks starting from the end of the used slots to the first available slots. -func Compact(ctx context.Context, basePath string, opts *Options) error { +func Compact(ctx context.Context, basePath string, opts *Options, validate bool) error { logger := opts.Logger n := time.Now() - defer func() { - logger.Info("compaction finished", "duration", time.Since(n)) - }() logger.Info("starting compaction") @@ -49,6 +52,11 @@ func Compact(ctx context.Context, basePath string, opts *Options) error { } }() + if validate { + logger.Info("performing chunk validation before compaction") + validationWork(ctx, logger, store, sharkyRecover) + } + iteratateItemsC := make(chan chunkstore.IterateResult) chunkstore.Iterate(ctx, store, iteratateItemsC) @@ -62,7 +70,7 @@ func Compact(ctx context.Context, basePath string, opts *Options) error { if c.Err != nil { return fmt.Errorf("location read: %w", err) } - count++ + shards[c.Item.Location.Shard] = append(shards[c.Item.Location.Shard], c.Item) } @@ -124,5 +132,48 @@ func Compact(ctx context.Context, basePath string, opts *Options) error { } } + logger.Info("compaction finished", "duration", time.Since(n)) + + if validate { + logger.Info("performing chunk validation after compaction") + validationWork(ctx, logger, store, sharkyRecover) + } + return sharkyRecover.Save() } + +func validationWork(ctx context.Context, logger log.Logger, store storage.Store, sharky *sharky.Recovery) { + + iteratateItemsC := make(chan chunkstore.IterateResult) + chunkstore.Iterate(ctx, store, iteratateItemsC) + + validChunk := func(item *chunkstore.RetrievalIndexItem, buf []byte) error { + err := sharky.Read(ctx, item.Location, buf) + if err != nil { + return err + } + + ch := swarm.NewChunk(item.Address, buf[:item.Location.Length]) + if !cac.Valid(ch) && !soc.Valid(ch) { + return errors.New("invalid chunk") + } + + return nil + } + + eg, ctx := errgroup.WithContext(ctx) + + for i := 0; i < 4; i++ { + eg.Go(func() error { + buf := make([]byte, swarm.SocMaxChunkSize) + for item := range iteratateItemsC { + if err := validChunk(item.Item, buf); err != nil { + logger.Info("invalid chunk", "address", item.Item.Address, "error", err) + } + } + return nil + }) + } + + _ = eg.Wait() +} diff --git a/pkg/storer/compact_test.go b/pkg/storer/compact_test.go index dde002975c2..64b29ea2dcd 100644 --- a/pkg/storer/compact_test.go +++ b/pkg/storer/compact_test.go @@ -73,7 +73,7 @@ func TestCompact(t *testing.T) { t.Fatal(err) } - err = storer.Compact(ctx, basePath, opts) + err = storer.Compact(ctx, basePath, opts, true) if err != nil { t.Fatal(err) } From f8adc4b711f225c04237a9c0f73516245a1794dc Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Thu, 21 Sep 2023 14:48:03 +0300 Subject: [PATCH 10/21] fix: timer --- pkg/storer/compact.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/pkg/storer/compact.go b/pkg/storer/compact.go index f5d0db3e147..d535befd2c0 100644 --- a/pkg/storer/compact.go +++ b/pkg/storer/compact.go @@ -28,8 +28,6 @@ func Compact(ctx context.Context, basePath string, opts *Options, validate bool) logger := opts.Logger - n := time.Now() - logger.Info("starting compaction") store, err := initStore(basePath, opts) @@ -57,6 +55,8 @@ func Compact(ctx context.Context, basePath string, opts *Options, validate bool) validationWork(ctx, logger, store, sharkyRecover) } + n := time.Now() + iteratateItemsC := make(chan chunkstore.IterateResult) chunkstore.Iterate(ctx, store, iteratateItemsC) @@ -144,6 +144,11 @@ func Compact(ctx context.Context, basePath string, opts *Options, validate bool) func validationWork(ctx context.Context, logger log.Logger, store storage.Store, sharky *sharky.Recovery) { + n := time.Now() + defer func() { + logger.Info("validation finished", "duration", time.Since(n)) + }() + iteratateItemsC := make(chan chunkstore.IterateResult) chunkstore.Iterate(ctx, store, iteratateItemsC) From 906d9ac61ddd49949985389b26ead1bbbd480fcd Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Thu, 21 Sep 2023 14:50:38 +0300 Subject: [PATCH 11/21] fix: logs --- pkg/storer/compact.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/storer/compact.go b/pkg/storer/compact.go index d535befd2c0..580484d0eb1 100644 --- a/pkg/storer/compact.go +++ b/pkg/storer/compact.go @@ -80,8 +80,6 @@ func Compact(ctx context.Context, basePath string, opts *Options, validate bool) locs := shards[shard] - logger.Info("starting shard", "shard", shard) - sort.Slice(locs, func(i, j int) bool { return locs[i].Location.Slot < locs[j].Location.Slot }) From 703d7682614e60377af857e0f9a304d59ac2d542 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Thu, 21 Sep 2023 14:59:00 +0300 Subject: [PATCH 12/21] fix: logs --- pkg/storer/compact.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/pkg/storer/compact.go b/pkg/storer/compact.go index 580484d0eb1..dae9e7cbd62 100644 --- a/pkg/storer/compact.go +++ b/pkg/storer/compact.go @@ -28,8 +28,6 @@ func Compact(ctx context.Context, basePath string, opts *Options, validate bool) logger := opts.Logger - logger.Info("starting compaction") - store, err := initStore(basePath, opts) if err != nil { return fmt.Errorf("failed creating levelDB index store: %w", err) @@ -55,6 +53,8 @@ func Compact(ctx context.Context, basePath string, opts *Options, validate bool) validationWork(ctx, logger, store, sharkyRecover) } + logger.Info("starting compaction") + n := time.Now() iteratateItemsC := make(chan chunkstore.IterateResult) @@ -130,6 +130,10 @@ func Compact(ctx context.Context, basePath string, opts *Options, validate bool) } } + if err := sharkyRecover.Save(); err != nil { + return fmt.Errorf("sharky save: %w", err) + } + logger.Info("compaction finished", "duration", time.Since(n)) if validate { @@ -137,7 +141,7 @@ func Compact(ctx context.Context, basePath string, opts *Options, validate bool) validationWork(ctx, logger, store, sharkyRecover) } - return sharkyRecover.Save() + return nil } func validationWork(ctx context.Context, logger log.Logger, store storage.Store, sharky *sharky.Recovery) { From 7a0711208363aba0043b6b030963d16effe7ef2a Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Thu, 21 Sep 2023 15:00:04 +0300 Subject: [PATCH 13/21] fix: increase parallelism --- pkg/storer/compact.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storer/compact.go b/pkg/storer/compact.go index dae9e7cbd62..c7acc8186ba 100644 --- a/pkg/storer/compact.go +++ b/pkg/storer/compact.go @@ -170,7 +170,7 @@ func validationWork(ctx context.Context, logger log.Logger, store storage.Store, eg, ctx := errgroup.WithContext(ctx) - for i := 0; i < 4; i++ { + for i := 0; i < 8; i++ { eg.Go(func() error { buf := make([]byte, swarm.SocMaxChunkSize) for item := range iteratateItemsC { From 87500b065d9d9a03b07ee7d2ba6b88879a7ae197 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Thu, 21 Sep 2023 22:55:01 +0300 Subject: [PATCH 14/21] fix: valid fix --- pkg/storer/compact.go | 56 +++++++++++------------ pkg/storer/internal/chunkstore/helpers.go | 37 +++------------ 2 files changed, 34 insertions(+), 59 deletions(-) diff --git a/pkg/storer/compact.go b/pkg/storer/compact.go index c7acc8186ba..c65976cb356 100644 --- a/pkg/storer/compact.go +++ b/pkg/storer/compact.go @@ -57,36 +57,23 @@ func Compact(ctx context.Context, basePath string, opts *Options, validate bool) n := time.Now() - iteratateItemsC := make(chan chunkstore.IterateResult) - chunkstore.Iterate(ctx, store, iteratateItemsC) - - var shards [][]*chunkstore.RetrievalIndexItem - for i := 0; i < sharkyNoOfShards; i++ { - shards = append(shards, []*chunkstore.RetrievalIndexItem{}) - } - - count := 0 - for c := range iteratateItemsC { - if c.Err != nil { - return fmt.Errorf("location read: %w", err) - } - - shards[c.Item.Location.Shard] = append(shards[c.Item.Location.Shard], c.Item) - } - - logger.Info("total items", "count", count) - for shard := 0; shard < sharkyNoOfShards; shard++ { - locs := shards[shard] + items := make([]*chunkstore.RetrievalIndexItem, 0, 1_000_000) + chunkstore.Iterate(store, func(item *chunkstore.RetrievalIndexItem) error { + if item.Location.Shard == uint8(shard) { + items = append(items, item) + } + return nil + }) - sort.Slice(locs, func(i, j int) bool { - return locs[i].Location.Slot < locs[j].Location.Slot + sort.Slice(items, func(i, j int) bool { + return items[i].Location.Slot < items[j].Location.Slot }) - lastUsedSlot := locs[len(locs)-1].Location.Slot + lastUsedSlot := items[len(items)-1].Location.Slot slots := make([]*chunkstore.RetrievalIndexItem, lastUsedSlot+1) // marks free and used slots - for _, l := range locs { + for _, l := range items { slots[l.Location.Slot] = l } @@ -151,8 +138,7 @@ func validationWork(ctx context.Context, logger log.Logger, store storage.Store, logger.Info("validation finished", "duration", time.Since(n)) }() - iteratateItemsC := make(chan chunkstore.IterateResult) - chunkstore.Iterate(ctx, store, iteratateItemsC) + iteratateItemsC := make(chan *chunkstore.RetrievalIndexItem) validChunk := func(item *chunkstore.RetrievalIndexItem, buf []byte) error { err := sharky.Read(ctx, item.Location, buf) @@ -160,7 +146,7 @@ func validationWork(ctx context.Context, logger log.Logger, store storage.Store, return err } - ch := swarm.NewChunk(item.Address, buf[:item.Location.Length]) + ch := swarm.NewChunk(item.Address, buf) if !cac.Valid(ch) && !soc.Valid(ch) { return errors.New("invalid chunk") } @@ -174,13 +160,25 @@ func validationWork(ctx context.Context, logger log.Logger, store storage.Store, eg.Go(func() error { buf := make([]byte, swarm.SocMaxChunkSize) for item := range iteratateItemsC { - if err := validChunk(item.Item, buf); err != nil { - logger.Info("invalid chunk", "address", item.Item.Address, "error", err) + if err := validChunk(item, buf[:item.Location.Length]); err != nil { + logger.Info("invalid chunk", "address", item.Address, "error", err) } } return nil }) } + count := 0 + chunkstore.Iterate(store, func(item *chunkstore.RetrievalIndexItem) error { + iteratateItemsC <- item + count++ + if count%100_000 == 0 { + logger.Info("..still validating chunks", "count", count) + } + return nil + }) + + close(iteratateItemsC) + _ = eg.Wait() } diff --git a/pkg/storer/internal/chunkstore/helpers.go b/pkg/storer/internal/chunkstore/helpers.go index 5880467cb48..40f29140e68 100644 --- a/pkg/storer/internal/chunkstore/helpers.go +++ b/pkg/storer/internal/chunkstore/helpers.go @@ -57,34 +57,11 @@ func IterateLocations( } // IterateLocations iterates over entire retrieval index and plucks only sharky location. -func Iterate( - ctx context.Context, - st storage.Store, - locationResultC chan<- IterateResult, -) { - go func() { - defer close(locationResultC) - - err := st.Iterate(storage.Query{ - Factory: func() storage.Item { return new(RetrievalIndexItem) }, - }, func(r storage.Result) (bool, error) { - entry := r.Entry.(*RetrievalIndexItem) - - select { - case <-ctx.Done(): - return true, ctx.Err() - case locationResultC <- IterateResult{Item: entry}: - } - - return false, nil - }) - if err != nil { - result := IterateResult{Err: fmt.Errorf("iterate retrieval index error: %w", err)} - - select { - case <-ctx.Done(): - case locationResultC <- result: - } - } - }() +func Iterate(st storage.Store, cb func(*RetrievalIndexItem) error) error { + return st.Iterate(storage.Query{ + Factory: func() storage.Item { return new(RetrievalIndexItem) }, + }, func(r storage.Result) (bool, error) { + entry := r.Entry.(*RetrievalIndexItem) + return false, cb(entry) + }) } From af57050ae5e81fa4478f1e4a9f214e2109b89532 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Thu, 21 Sep 2023 22:56:02 +0300 Subject: [PATCH 15/21] fix: lint --- pkg/storer/compact.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/storer/compact.go b/pkg/storer/compact.go index c65976cb356..0f2c5f12cf6 100644 --- a/pkg/storer/compact.go +++ b/pkg/storer/compact.go @@ -60,7 +60,7 @@ func Compact(ctx context.Context, basePath string, opts *Options, validate bool) for shard := 0; shard < sharkyNoOfShards; shard++ { items := make([]*chunkstore.RetrievalIndexItem, 0, 1_000_000) - chunkstore.Iterate(store, func(item *chunkstore.RetrievalIndexItem) error { + _ = chunkstore.Iterate(store, func(item *chunkstore.RetrievalIndexItem) error { if item.Location.Shard == uint8(shard) { items = append(items, item) } @@ -169,7 +169,7 @@ func validationWork(ctx context.Context, logger log.Logger, store storage.Store, } count := 0 - chunkstore.Iterate(store, func(item *chunkstore.RetrievalIndexItem) error { + _ = chunkstore.Iterate(store, func(item *chunkstore.RetrievalIndexItem) error { iteratateItemsC <- item count++ if count%100_000 == 0 { From 89c4d8e6b8a766d4cb5d5aa069a44d25960ad7ea Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Thu, 21 Sep 2023 22:59:25 +0300 Subject: [PATCH 16/21] fix: unit test --- pkg/storer/compact_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/storer/compact_test.go b/pkg/storer/compact_test.go index 64b29ea2dcd..ab7ec1fdab0 100644 --- a/pkg/storer/compact_test.go +++ b/pkg/storer/compact_test.go @@ -7,9 +7,11 @@ package storer_test import ( "bytes" "context" + "os" "testing" "time" + "github.com/ethersphere/bee/pkg/log" "github.com/ethersphere/bee/pkg/postage" postagetesting "github.com/ethersphere/bee/pkg/postage/testing" pullerMock "github.com/ethersphere/bee/pkg/puller/mock" @@ -29,6 +31,7 @@ func TestCompact(t *testing.T) { opts := dbTestOps(baseAddr, 10_000, nil, nil, time.Second) opts.CacheCapacity = 0 + opts.Logger = log.NewLogger("test", log.WithSink(os.Stdout)) st, err := storer.New(ctx, basePath, opts) if err != nil { @@ -45,7 +48,7 @@ func TestCompact(t *testing.T) { for b := 0; b < len(batches); b++ { for i := uint64(0); i < chunksPerPO; i++ { - ch := chunk.GenerateTestRandomChunkAt(t, baseAddr, b) + ch := chunk.GenerateValidRandomChunkAt(baseAddr, b) ch = ch.WithStamp(postagetesting.MustNewBatchStamp(batches[b].ID)) chunks = append(chunks, ch) err := putter.Put(ctx, ch) From 4f8414a7f8f063445d0b3bd99d7a4888246029ac Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Fri, 22 Sep 2023 00:07:02 +0300 Subject: [PATCH 17/21] fix: comment --- pkg/storer/compact.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/storer/compact.go b/pkg/storer/compact.go index 0f2c5f12cf6..5ad7bd500bf 100644 --- a/pkg/storer/compact.go +++ b/pkg/storer/compact.go @@ -60,6 +60,8 @@ func Compact(ctx context.Context, basePath string, opts *Options, validate bool) for shard := 0; shard < sharkyNoOfShards; shard++ { items := make([]*chunkstore.RetrievalIndexItem, 0, 1_000_000) + // we deliberately choose to iterate the whole store again for each shard + // so that we do not store all the items in memory (for operators with huge localstores) _ = chunkstore.Iterate(store, func(item *chunkstore.RetrievalIndexItem) error { if item.Location.Shard == uint8(shard) { items = append(items, item) From 733b8fca9f34e79649cc47ce9bb681567628e022 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Sat, 23 Sep 2023 01:51:14 +0300 Subject: [PATCH 18/21] fix: better ctx handling --- cmd/bee/cmd/db.go | 5 ++--- pkg/storer/compact.go | 20 +++++++++++++------- pkg/storer/compact_test.go | 7 +------ pkg/storer/internal/chunkstore/helpers.go | 6 +++--- 4 files changed, 19 insertions(+), 19 deletions(-) diff --git a/cmd/bee/cmd/db.go b/cmd/bee/cmd/db.go index 6292462f438..21c89587c74 100644 --- a/cmd/bee/cmd/db.go +++ b/cmd/bee/cmd/db.go @@ -26,6 +26,8 @@ import ( "github.com/spf13/cobra" ) +const optionNameValidation = "validate" + func (c *command) initDBCmd() { cmd := &cobra.Command{ Use: "db", @@ -101,9 +103,6 @@ func dbInfoCmd(cmd *cobra.Command) { } func dbCompactCmd(cmd *cobra.Command) { - - optionNameValidation := "validate" - c := &cobra.Command{ Use: "compact", Short: "Compacts the localstore sharky store.", diff --git a/pkg/storer/compact.go b/pkg/storer/compact.go index 5ad7bd500bf..b6288212268 100644 --- a/pkg/storer/compact.go +++ b/pkg/storer/compact.go @@ -50,7 +50,7 @@ func Compact(ctx context.Context, basePath string, opts *Options, validate bool) if validate { logger.Info("performing chunk validation before compaction") - validationWork(ctx, logger, store, sharkyRecover) + validationWork(logger, store, sharkyRecover) } logger.Info("starting compaction") @@ -59,6 +59,12 @@ func Compact(ctx context.Context, basePath string, opts *Options, validate bool) for shard := 0; shard < sharkyNoOfShards; shard++ { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + items := make([]*chunkstore.RetrievalIndexItem, 0, 1_000_000) // we deliberately choose to iterate the whole store again for each shard // so that we do not store all the items in memory (for operators with huge localstores) @@ -90,7 +96,7 @@ func Compact(ctx context.Context, basePath string, opts *Options, validate bool) if slots[end] != nil { // used from := slots[end] to := sharky.Location{Slot: start, Length: from.Location.Length, Shard: from.Location.Shard} - if err := sharkyRecover.Move(ctx, from.Location, to); err != nil { + if err := sharkyRecover.Move(context.Background(), from.Location, to); err != nil { return fmt.Errorf("sharky move: %w", err) } if err := sharkyRecover.Add(to); err != nil { @@ -114,7 +120,7 @@ func Compact(ctx context.Context, basePath string, opts *Options, validate bool) logger.Info("shard truncated", "shard", shard, "slot", end) - if err := sharkyRecover.TruncateAt(ctx, uint8(shard), end+1); err != nil { + if err := sharkyRecover.TruncateAt(context.Background(), uint8(shard), end+1); err != nil { return fmt.Errorf("sharky truncate: %w", err) } } @@ -127,13 +133,13 @@ func Compact(ctx context.Context, basePath string, opts *Options, validate bool) if validate { logger.Info("performing chunk validation after compaction") - validationWork(ctx, logger, store, sharkyRecover) + validationWork(logger, store, sharkyRecover) } return nil } -func validationWork(ctx context.Context, logger log.Logger, store storage.Store, sharky *sharky.Recovery) { +func validationWork(logger log.Logger, store storage.Store, sharky *sharky.Recovery) { n := time.Now() defer func() { @@ -143,7 +149,7 @@ func validationWork(ctx context.Context, logger log.Logger, store storage.Store, iteratateItemsC := make(chan *chunkstore.RetrievalIndexItem) validChunk := func(item *chunkstore.RetrievalIndexItem, buf []byte) error { - err := sharky.Read(ctx, item.Location, buf) + err := sharky.Read(context.Background(), item.Location, buf) if err != nil { return err } @@ -156,7 +162,7 @@ func validationWork(ctx context.Context, logger log.Logger, store storage.Store, return nil } - eg, ctx := errgroup.WithContext(ctx) + eg := errgroup.Group{} for i := 0; i < 8; i++ { eg.Go(func() error { diff --git a/pkg/storer/compact_test.go b/pkg/storer/compact_test.go index ab7ec1fdab0..6544dd7d54b 100644 --- a/pkg/storer/compact_test.go +++ b/pkg/storer/compact_test.go @@ -65,12 +65,7 @@ func TestCompact(t *testing.T) { c, unsub := st.Events().Subscribe("batchExpiryDone") t.Cleanup(unsub) - gotUnreserveSignal := make(chan struct{}) - go func() { - defer close(gotUnreserveSignal) - <-c - }() - <-gotUnreserveSignal + <-c if err := st.Close(); err != nil { t.Fatal(err) diff --git a/pkg/storer/internal/chunkstore/helpers.go b/pkg/storer/internal/chunkstore/helpers.go index 40f29140e68..8e0269144eb 100644 --- a/pkg/storer/internal/chunkstore/helpers.go +++ b/pkg/storer/internal/chunkstore/helpers.go @@ -56,12 +56,12 @@ func IterateLocations( }() } -// IterateLocations iterates over entire retrieval index and plucks only sharky location. -func Iterate(st storage.Store, cb func(*RetrievalIndexItem) error) error { +// Iterate iterates over entire retrieval index with a call back. +func Iterate(st storage.Store, callBackFunc func(*RetrievalIndexItem) error) error { return st.Iterate(storage.Query{ Factory: func() storage.Item { return new(RetrievalIndexItem) }, }, func(r storage.Result) (bool, error) { entry := r.Entry.(*RetrievalIndexItem) - return false, cb(entry) + return false, callBackFunc(entry) }) } From d2a737e982aaa80ed4de4e4073232d61e1ea0865 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Sat, 23 Sep 2023 01:56:50 +0300 Subject: [PATCH 19/21] fix: save --- pkg/storer/compact.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storer/compact.go b/pkg/storer/compact.go index b6288212268..c124dcdb62a 100644 --- a/pkg/storer/compact.go +++ b/pkg/storer/compact.go @@ -61,7 +61,7 @@ func Compact(ctx context.Context, basePath string, opts *Options, validate bool) select { case <-ctx.Done(): - return ctx.Err() + return errors.Join(ctx.Err(), sharkyRecover.Save()) default: } From bdaf9b7f57f6d80649b009c70fc1258ca46ca58c Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Sat, 23 Sep 2023 03:16:19 +0300 Subject: [PATCH 20/21] fix: disable compaction --- cmd/bee/cmd/db.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/bee/cmd/db.go b/cmd/bee/cmd/db.go index 21c89587c74..44a994d1312 100644 --- a/cmd/bee/cmd/db.go +++ b/cmd/bee/cmd/db.go @@ -147,7 +147,7 @@ func dbCompactCmd(cmd *cobra.Command) { } c.Flags().String(optionNameDataDir, "", "data directory") c.Flags().String(optionNameVerbosity, "info", "verbosity level") - c.Flags().Bool(optionNameValidation, true, "enable chunk validation") + c.Flags().Bool(optionNameValidation, false, "run chunk validation checks before and after the compaction") cmd.AddCommand(c) } From 3c856f2b587f013d4523866a59db9230b54509a7 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Sat, 23 Sep 2023 12:19:58 +0300 Subject: [PATCH 21/21] fix: better logs --- pkg/storer/compact.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storer/compact.go b/pkg/storer/compact.go index c124dcdb62a..dbce17c220d 100644 --- a/pkg/storer/compact.go +++ b/pkg/storer/compact.go @@ -118,7 +118,7 @@ func Compact(ctx context.Context, basePath string, opts *Options, validate bool) } } - logger.Info("shard truncated", "shard", shard, "slot", end) + logger.Info("shard truncated", "shard", fmt.Sprintf("%d/%d", shard, sharkyNoOfShards-1), "slot", end) if err := sharkyRecover.TruncateAt(context.Background(), uint8(shard), end+1); err != nil { return fmt.Errorf("sharky truncate: %w", err)