From 733b8fca9f34e79649cc47ce9bb681567628e022 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Sat, 23 Sep 2023 01:51:14 +0300 Subject: [PATCH] fix: better ctx handling --- cmd/bee/cmd/db.go | 5 ++--- pkg/storer/compact.go | 20 +++++++++++++------- pkg/storer/compact_test.go | 7 +------ pkg/storer/internal/chunkstore/helpers.go | 6 +++--- 4 files changed, 19 insertions(+), 19 deletions(-) diff --git a/cmd/bee/cmd/db.go b/cmd/bee/cmd/db.go index 6292462f438..21c89587c74 100644 --- a/cmd/bee/cmd/db.go +++ b/cmd/bee/cmd/db.go @@ -26,6 +26,8 @@ import ( "github.com/spf13/cobra" ) +const optionNameValidation = "validate" + func (c *command) initDBCmd() { cmd := &cobra.Command{ Use: "db", @@ -101,9 +103,6 @@ func dbInfoCmd(cmd *cobra.Command) { } func dbCompactCmd(cmd *cobra.Command) { - - optionNameValidation := "validate" - c := &cobra.Command{ Use: "compact", Short: "Compacts the localstore sharky store.", diff --git a/pkg/storer/compact.go b/pkg/storer/compact.go index 5ad7bd500bf..b6288212268 100644 --- a/pkg/storer/compact.go +++ b/pkg/storer/compact.go @@ -50,7 +50,7 @@ func Compact(ctx context.Context, basePath string, opts *Options, validate bool) if validate { logger.Info("performing chunk validation before compaction") - validationWork(ctx, logger, store, sharkyRecover) + validationWork(logger, store, sharkyRecover) } logger.Info("starting compaction") @@ -59,6 +59,12 @@ func Compact(ctx context.Context, basePath string, opts *Options, validate bool) for shard := 0; shard < sharkyNoOfShards; shard++ { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + items := make([]*chunkstore.RetrievalIndexItem, 0, 1_000_000) // we deliberately choose to iterate the whole store again for each shard // so that we do not store all the items in memory (for operators with huge localstores) @@ -90,7 +96,7 @@ func Compact(ctx context.Context, basePath string, opts *Options, validate bool) if slots[end] != nil { // used from := slots[end] to := sharky.Location{Slot: start, Length: from.Location.Length, Shard: from.Location.Shard} - if err := sharkyRecover.Move(ctx, from.Location, to); err != nil { + if err := sharkyRecover.Move(context.Background(), from.Location, to); err != nil { return fmt.Errorf("sharky move: %w", err) } if err := sharkyRecover.Add(to); err != nil { @@ -114,7 +120,7 @@ func Compact(ctx context.Context, basePath string, opts *Options, validate bool) logger.Info("shard truncated", "shard", shard, "slot", end) - if err := sharkyRecover.TruncateAt(ctx, uint8(shard), end+1); err != nil { + if err := sharkyRecover.TruncateAt(context.Background(), uint8(shard), end+1); err != nil { return fmt.Errorf("sharky truncate: %w", err) } } @@ -127,13 +133,13 @@ func Compact(ctx context.Context, basePath string, opts *Options, validate bool) if validate { logger.Info("performing chunk validation after compaction") - validationWork(ctx, logger, store, sharkyRecover) + validationWork(logger, store, sharkyRecover) } return nil } -func validationWork(ctx context.Context, logger log.Logger, store storage.Store, sharky *sharky.Recovery) { +func validationWork(logger log.Logger, store storage.Store, sharky *sharky.Recovery) { n := time.Now() defer func() { @@ -143,7 +149,7 @@ func validationWork(ctx context.Context, logger log.Logger, store storage.Store, iteratateItemsC := make(chan *chunkstore.RetrievalIndexItem) validChunk := func(item *chunkstore.RetrievalIndexItem, buf []byte) error { - err := sharky.Read(ctx, item.Location, buf) + err := sharky.Read(context.Background(), item.Location, buf) if err != nil { return err } @@ -156,7 +162,7 @@ func validationWork(ctx context.Context, logger log.Logger, store storage.Store, return nil } - eg, ctx := errgroup.WithContext(ctx) + eg := errgroup.Group{} for i := 0; i < 8; i++ { eg.Go(func() error { diff --git a/pkg/storer/compact_test.go b/pkg/storer/compact_test.go index ab7ec1fdab0..6544dd7d54b 100644 --- a/pkg/storer/compact_test.go +++ b/pkg/storer/compact_test.go @@ -65,12 +65,7 @@ func TestCompact(t *testing.T) { c, unsub := st.Events().Subscribe("batchExpiryDone") t.Cleanup(unsub) - gotUnreserveSignal := make(chan struct{}) - go func() { - defer close(gotUnreserveSignal) - <-c - }() - <-gotUnreserveSignal + <-c if err := st.Close(); err != nil { t.Fatal(err) diff --git a/pkg/storer/internal/chunkstore/helpers.go b/pkg/storer/internal/chunkstore/helpers.go index 40f29140e68..8e0269144eb 100644 --- a/pkg/storer/internal/chunkstore/helpers.go +++ b/pkg/storer/internal/chunkstore/helpers.go @@ -56,12 +56,12 @@ func IterateLocations( }() } -// IterateLocations iterates over entire retrieval index and plucks only sharky location. -func Iterate(st storage.Store, cb func(*RetrievalIndexItem) error) error { +// Iterate iterates over entire retrieval index with a call back. +func Iterate(st storage.Store, callBackFunc func(*RetrievalIndexItem) error) error { return st.Iterate(storage.Query{ Factory: func() storage.Item { return new(RetrievalIndexItem) }, }, func(r storage.Result) (bool, error) { entry := r.Entry.(*RetrievalIndexItem) - return false, cb(entry) + return false, callBackFunc(entry) }) }