Skip to content

Commit

Permalink
fix: better ctx handling
Browse files Browse the repository at this point in the history
  • Loading branch information
istae committed Sep 22, 2023
1 parent 4f8414a commit 733b8fc
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 19 deletions.
5 changes: 2 additions & 3 deletions cmd/bee/cmd/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"github.com/spf13/cobra"
)

const optionNameValidation = "validate"

func (c *command) initDBCmd() {
cmd := &cobra.Command{
Use: "db",
Expand Down Expand Up @@ -101,9 +103,6 @@ func dbInfoCmd(cmd *cobra.Command) {
}

func dbCompactCmd(cmd *cobra.Command) {

optionNameValidation := "validate"

c := &cobra.Command{
Use: "compact",
Short: "Compacts the localstore sharky store.",
Expand Down
20 changes: 13 additions & 7 deletions pkg/storer/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func Compact(ctx context.Context, basePath string, opts *Options, validate bool)

if validate {
logger.Info("performing chunk validation before compaction")
validationWork(ctx, logger, store, sharkyRecover)
validationWork(logger, store, sharkyRecover)
}

logger.Info("starting compaction")
Expand All @@ -59,6 +59,12 @@ func Compact(ctx context.Context, basePath string, opts *Options, validate bool)

for shard := 0; shard < sharkyNoOfShards; shard++ {

select {
case <-ctx.Done():
return ctx.Err()
default:
}

items := make([]*chunkstore.RetrievalIndexItem, 0, 1_000_000)
// we deliberately choose to iterate the whole store again for each shard
// so that we do not store all the items in memory (for operators with huge localstores)
Expand Down Expand Up @@ -90,7 +96,7 @@ func Compact(ctx context.Context, basePath string, opts *Options, validate bool)
if slots[end] != nil { // used
from := slots[end]
to := sharky.Location{Slot: start, Length: from.Location.Length, Shard: from.Location.Shard}
if err := sharkyRecover.Move(ctx, from.Location, to); err != nil {
if err := sharkyRecover.Move(context.Background(), from.Location, to); err != nil {
return fmt.Errorf("sharky move: %w", err)
}
if err := sharkyRecover.Add(to); err != nil {
Expand All @@ -114,7 +120,7 @@ func Compact(ctx context.Context, basePath string, opts *Options, validate bool)

logger.Info("shard truncated", "shard", shard, "slot", end)

if err := sharkyRecover.TruncateAt(ctx, uint8(shard), end+1); err != nil {
if err := sharkyRecover.TruncateAt(context.Background(), uint8(shard), end+1); err != nil {
return fmt.Errorf("sharky truncate: %w", err)
}
}
Expand All @@ -127,13 +133,13 @@ func Compact(ctx context.Context, basePath string, opts *Options, validate bool)

if validate {
logger.Info("performing chunk validation after compaction")
validationWork(ctx, logger, store, sharkyRecover)
validationWork(logger, store, sharkyRecover)
}

return nil
}

func validationWork(ctx context.Context, logger log.Logger, store storage.Store, sharky *sharky.Recovery) {
func validationWork(logger log.Logger, store storage.Store, sharky *sharky.Recovery) {

n := time.Now()
defer func() {
Expand All @@ -143,7 +149,7 @@ func validationWork(ctx context.Context, logger log.Logger, store storage.Store,
iteratateItemsC := make(chan *chunkstore.RetrievalIndexItem)

validChunk := func(item *chunkstore.RetrievalIndexItem, buf []byte) error {
err := sharky.Read(ctx, item.Location, buf)
err := sharky.Read(context.Background(), item.Location, buf)
if err != nil {
return err
}
Expand All @@ -156,7 +162,7 @@ func validationWork(ctx context.Context, logger log.Logger, store storage.Store,
return nil
}

eg, ctx := errgroup.WithContext(ctx)
eg := errgroup.Group{}

for i := 0; i < 8; i++ {
eg.Go(func() error {
Expand Down
7 changes: 1 addition & 6 deletions pkg/storer/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,7 @@ func TestCompact(t *testing.T) {

c, unsub := st.Events().Subscribe("batchExpiryDone")
t.Cleanup(unsub)
gotUnreserveSignal := make(chan struct{})
go func() {
defer close(gotUnreserveSignal)
<-c
}()
<-gotUnreserveSignal
<-c

if err := st.Close(); err != nil {
t.Fatal(err)
Expand Down
6 changes: 3 additions & 3 deletions pkg/storer/internal/chunkstore/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,12 @@ func IterateLocations(
}()
}

// IterateLocations iterates over entire retrieval index and plucks only sharky location.
func Iterate(st storage.Store, cb func(*RetrievalIndexItem) error) error {
// Iterate iterates over entire retrieval index with a call back.
func Iterate(st storage.Store, callBackFunc func(*RetrievalIndexItem) error) error {
return st.Iterate(storage.Query{
Factory: func() storage.Item { return new(RetrievalIndexItem) },
}, func(r storage.Result) (bool, error) {
entry := r.Entry.(*RetrievalIndexItem)
return false, cb(entry)
return false, callBackFunc(entry)
})
}

0 comments on commit 733b8fc

Please sign in to comment.