Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: db compaction cmd #4329

Merged
merged 21 commits into from
Sep 23, 2023
56 changes: 56 additions & 0 deletions cmd/bee/cmd/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ package cmd
import (
"archive/tar"
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"io"
"os"
"path"
"path/filepath"
"strings"
"time"
Expand All @@ -24,6 +26,8 @@ import (
"github.com/spf13/cobra"
)

const optionNameValidation = "validate"

func (c *command) initDBCmd() {
cmd := &cobra.Command{
Use: "db",
Expand All @@ -34,6 +38,7 @@ func (c *command) initDBCmd() {
dbImportCmd(cmd)
dbNukeCmd(cmd)
dbInfoCmd(cmd)
dbCompactCmd(cmd)

c.root.AddCommand(cmd)
}
Expand Down Expand Up @@ -75,6 +80,8 @@ func dbInfoCmd(cmd *cobra.Command) {
}
defer db.Close()

logger.Info("getting db info", "path", dataDir)

info, err := db.DebugInfo(cmd.Context())
if err != nil {
return fmt.Errorf("fetching db info: %w", err)
Expand All @@ -95,6 +102,55 @@ func dbInfoCmd(cmd *cobra.Command) {
cmd.AddCommand(c)
}

func dbCompactCmd(cmd *cobra.Command) {
c := &cobra.Command{
Use: "compact",
Short: "Compacts the localstore sharky store.",
RunE: func(cmd *cobra.Command, args []string) (err error) {
v, err := cmd.Flags().GetString(optionNameVerbosity)
if err != nil {
return fmt.Errorf("get verbosity: %w", err)
}
v = strings.ToLower(v)
logger, err := newLogger(cmd, v)
if err != nil {
return fmt.Errorf("new logger: %w", err)
}

dataDir, err := cmd.Flags().GetString(optionNameDataDir)
if err != nil {
return fmt.Errorf("get data-dir: %w", err)
}
if dataDir == "" {
return errors.New("no data-dir provided")
}

validation, err := cmd.Flags().GetBool(optionNameValidation)
if err != nil {
return fmt.Errorf("get validation: %w", err)
}

localstorePath := path.Join(dataDir, "localstore")

err = storer.Compact(context.Background(), localstorePath, &storer.Options{
Logger: logger,
RadiusSetter: noopRadiusSetter{},
Batchstore: new(postage.NoOpBatchStore),
ReserveCapacity: node.ReserveCapacity,
}, validation)
if err != nil {
return fmt.Errorf("localstore: %w", err)
}

return nil
},
}
c.Flags().String(optionNameDataDir, "", "data directory")
c.Flags().String(optionNameVerbosity, "info", "verbosity level")
c.Flags().Bool(optionNameValidation, false, "run chunk validation checks before and after the compaction")
cmd.AddCommand(c)
}

func dbExportCmd(cmd *cobra.Command) {
c := &cobra.Command{
Use: "export",
Expand Down
23 changes: 22 additions & 1 deletion pkg/sharky/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func NewRecovery(dir string, shardCnt int, datasize int) (*Recovery, error) {
shardFiles := make([]*os.File, shardCnt)

for i := 0; i < shardCnt; i++ {
file, err := os.OpenFile(path.Join(dir, fmt.Sprintf("shard_%03d", i)), os.O_RDONLY, 0666)
file, err := os.OpenFile(path.Join(dir, fmt.Sprintf("shard_%03d", i)), os.O_RDWR, 0666)
if errors.Is(err, fs.ErrNotExist) {
return nil, fmt.Errorf("index %d: %w", i, ErrShardNotFound)
}
Expand Down Expand Up @@ -86,6 +86,27 @@ func (r *Recovery) Read(ctx context.Context, loc Location, buf []byte) error {
return err
}

func (r *Recovery) Move(ctx context.Context, from Location, to Location) error {
r.mtx.Lock()
defer r.mtx.Unlock()

chData := make([]byte, from.Length)
_, err := r.shardFiles[from.Shard].ReadAt(chData, int64(from.Slot)*int64(r.datasize))
if err != nil {
return err
}

_, err = r.shardFiles[to.Shard].WriteAt(chData, int64(to.Slot)*int64(r.datasize))
return err
}

func (r *Recovery) TruncateAt(ctx context.Context, shard uint8, slot uint32) error {
r.mtx.Lock()
defer r.mtx.Unlock()

return r.shardFiles[shard].Truncate(int64(slot) * int64(r.datasize))
}

// Save saves all free slots files of the recovery (without closing).
func (r *Recovery) Save() error {
r.mtx.Lock()
Expand Down
192 changes: 192 additions & 0 deletions pkg/storer/compact.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
// Copyright 2023 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package storer

import (
"context"
"errors"
"fmt"
"path"
"sort"
"time"

"github.com/ethersphere/bee/pkg/cac"
"github.com/ethersphere/bee/pkg/log"
"github.com/ethersphere/bee/pkg/sharky"
"github.com/ethersphere/bee/pkg/soc"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storer/internal/chunkstore"
"github.com/ethersphere/bee/pkg/swarm"
"golang.org/x/sync/errgroup"
)

// Compact minimizes sharky disk usage by, using the current sharky locations from the storer,
// relocating chunks starting from the end of the used slots to the first available slots.
func Compact(ctx context.Context, basePath string, opts *Options, validate bool) error {

logger := opts.Logger

store, err := initStore(basePath, opts)
if err != nil {
return fmt.Errorf("failed creating levelDB index store: %w", err)
}
defer func() {
if err := store.Close(); err != nil {
logger.Error(err, "failed closing store")
}
}()

sharkyRecover, err := sharky.NewRecovery(path.Join(basePath, sharkyPath), sharkyNoOfShards, swarm.SocMaxChunkSize)
if err != nil {
return err
}
defer func() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this defer not come before the NewRecovery call?

if err := sharkyRecover.Close(); err != nil {
logger.Error(err, "failed closing sharky recovery")
}
}()

if validate {
logger.Info("performing chunk validation before compaction")
validationWork(logger, store, sharkyRecover)
}

logger.Info("starting compaction")

n := time.Now()

for shard := 0; shard < sharkyNoOfShards; shard++ {

select {
case <-ctx.Done():
return errors.Join(ctx.Err(), sharkyRecover.Save())
default:
}

items := make([]*chunkstore.RetrievalIndexItem, 0, 1_000_000)
// we deliberately choose to iterate the whole store again for each shard
// so that we do not store all the items in memory (for operators with huge localstores)
_ = chunkstore.Iterate(store, func(item *chunkstore.RetrievalIndexItem) error {
if item.Location.Shard == uint8(shard) {
items = append(items, item)
}
return nil
})

sort.Slice(items, func(i, j int) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use the new slices.SortFunc() instead.

return items[i].Location.Slot < items[j].Location.Slot
})

lastUsedSlot := items[len(items)-1].Location.Slot
slots := make([]*chunkstore.RetrievalIndexItem, lastUsedSlot+1) // marks free and used slots
for _, l := range items {
slots[l.Location.Slot] = l
}

// start begins at the zero slot. The loop below will increment the position of start until a free slot is found.
// end points to the last slot, and the loop will decrement the position of end until a used slot is found.
// Once start and end point to free and used slots, respectively, the swap of the chunk location will occur.
start := uint32(0)
end := lastUsedSlot

for start < end {
if slots[start] == nil { // free
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you need to make sure end is always set to a used slot otherwise you end up with suboptimal truncation:

imagine

0 1 2 3 4 5 6 7 8 9 ........
+ + + + + + + - - + - - - - - 

after the first move at start = 7; end = 9, the loop terminates at start = 8; end = 8 instead of end = 7

i suggest:

end := lastUsedSlot
for start  := uint(32); start < end;  {
      if slots[end] == nil {
          end--
          continue
      }
      if slots[start] != nil {
         start++
         continue
     }
     MOVE
     start++
     end--
}

if slots[end] != nil { // used
from := slots[end]
to := sharky.Location{Slot: start, Length: from.Location.Length, Shard: from.Location.Shard}
if err := sharkyRecover.Move(context.Background(), from.Location, to); err != nil {
return fmt.Errorf("sharky move: %w", err)
}
if err := sharkyRecover.Add(to); err != nil {
return fmt.Errorf("sharky add: %w", err)
}

from.Location = to
if err := store.Put(from); err != nil {
return fmt.Errorf("store put: %w", err)
}

start++
end--
} else {
end-- // keep moving to the left until a used slot is found
}
} else {
start++ // keep moving to the right until a free slot is found
}
}

logger.Info("shard truncated", "shard", fmt.Sprintf("%d/%d", shard, sharkyNoOfShards-1), "slot", end)

if err := sharkyRecover.TruncateAt(context.Background(), uint8(shard), end+1); err != nil {
return fmt.Errorf("sharky truncate: %w", err)
}
}

if err := sharkyRecover.Save(); err != nil {
return fmt.Errorf("sharky save: %w", err)
}

logger.Info("compaction finished", "duration", time.Since(n))

if validate {
logger.Info("performing chunk validation after compaction")
validationWork(logger, store, sharkyRecover)
}

return nil
}

func validationWork(logger log.Logger, store storage.Store, sharky *sharky.Recovery) {

n := time.Now()
defer func() {
logger.Info("validation finished", "duration", time.Since(n))
}()

iteratateItemsC := make(chan *chunkstore.RetrievalIndexItem)

validChunk := func(item *chunkstore.RetrievalIndexItem, buf []byte) error {
err := sharky.Read(context.Background(), item.Location, buf)
if err != nil {
return err
}

ch := swarm.NewChunk(item.Address, buf)
if !cac.Valid(ch) && !soc.Valid(ch) {
return errors.New("invalid chunk")
}

return nil
}

eg := errgroup.Group{}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why use an errgroup if you dont care about errors?
use waitgroup instead


for i := 0; i < 8; i++ {
eg.Go(func() error {
buf := make([]byte, swarm.SocMaxChunkSize)
for item := range iteratateItemsC {
if err := validChunk(item, buf[:item.Location.Length]); err != nil {
logger.Info("invalid chunk", "address", item.Address, "error", err)
}
}
return nil
})
}

count := 0
_ = chunkstore.Iterate(store, func(item *chunkstore.RetrievalIndexItem) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you ignoring the errors then you should not use errgroup but waitgroup

iteratateItemsC <- item
count++
if count%100_000 == 0 {
logger.Info("..still validating chunks", "count", count)
}
return nil
})

close(iteratateItemsC)

_ = eg.Wait()
}
Loading