Skip to content

Commit

Permalink
feat: db compaction cmd (#4329)
Browse files Browse the repository at this point in the history
  • Loading branch information
istae authored Sep 23, 2023
1 parent d337770 commit 9ce6f3c
Show file tree
Hide file tree
Showing 7 changed files with 402 additions and 3 deletions.
56 changes: 56 additions & 0 deletions cmd/bee/cmd/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ package cmd
import (
"archive/tar"
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"io"
"os"
"path"
"path/filepath"
"strings"
"time"
Expand All @@ -24,6 +26,8 @@ import (
"github.com/spf13/cobra"
)

const optionNameValidation = "validate"

func (c *command) initDBCmd() {
cmd := &cobra.Command{
Use: "db",
Expand All @@ -34,6 +38,7 @@ func (c *command) initDBCmd() {
dbImportCmd(cmd)
dbNukeCmd(cmd)
dbInfoCmd(cmd)
dbCompactCmd(cmd)

c.root.AddCommand(cmd)
}
Expand Down Expand Up @@ -75,6 +80,8 @@ func dbInfoCmd(cmd *cobra.Command) {
}
defer db.Close()

logger.Info("getting db info", "path", dataDir)

info, err := db.DebugInfo(cmd.Context())
if err != nil {
return fmt.Errorf("fetching db info: %w", err)
Expand All @@ -95,6 +102,55 @@ func dbInfoCmd(cmd *cobra.Command) {
cmd.AddCommand(c)
}

func dbCompactCmd(cmd *cobra.Command) {
c := &cobra.Command{
Use: "compact",
Short: "Compacts the localstore sharky store.",
RunE: func(cmd *cobra.Command, args []string) (err error) {
v, err := cmd.Flags().GetString(optionNameVerbosity)
if err != nil {
return fmt.Errorf("get verbosity: %w", err)
}
v = strings.ToLower(v)
logger, err := newLogger(cmd, v)
if err != nil {
return fmt.Errorf("new logger: %w", err)
}

dataDir, err := cmd.Flags().GetString(optionNameDataDir)
if err != nil {
return fmt.Errorf("get data-dir: %w", err)
}
if dataDir == "" {
return errors.New("no data-dir provided")
}

validation, err := cmd.Flags().GetBool(optionNameValidation)
if err != nil {
return fmt.Errorf("get validation: %w", err)
}

localstorePath := path.Join(dataDir, "localstore")

err = storer.Compact(context.Background(), localstorePath, &storer.Options{
Logger: logger,
RadiusSetter: noopRadiusSetter{},
Batchstore: new(postage.NoOpBatchStore),
ReserveCapacity: node.ReserveCapacity,
}, validation)
if err != nil {
return fmt.Errorf("localstore: %w", err)
}

return nil
},
}
c.Flags().String(optionNameDataDir, "", "data directory")
c.Flags().String(optionNameVerbosity, "info", "verbosity level")
c.Flags().Bool(optionNameValidation, false, "run chunk validation checks before and after the compaction")
cmd.AddCommand(c)
}

func dbExportCmd(cmd *cobra.Command) {
c := &cobra.Command{
Use: "export",
Expand Down
23 changes: 22 additions & 1 deletion pkg/sharky/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func NewRecovery(dir string, shardCnt int, datasize int) (*Recovery, error) {
shardFiles := make([]*os.File, shardCnt)

for i := 0; i < shardCnt; i++ {
file, err := os.OpenFile(path.Join(dir, fmt.Sprintf("shard_%03d", i)), os.O_RDONLY, 0666)
file, err := os.OpenFile(path.Join(dir, fmt.Sprintf("shard_%03d", i)), os.O_RDWR, 0666)
if errors.Is(err, fs.ErrNotExist) {
return nil, fmt.Errorf("index %d: %w", i, ErrShardNotFound)
}
Expand Down Expand Up @@ -86,6 +86,27 @@ func (r *Recovery) Read(ctx context.Context, loc Location, buf []byte) error {
return err
}

func (r *Recovery) Move(ctx context.Context, from Location, to Location) error {
r.mtx.Lock()
defer r.mtx.Unlock()

chData := make([]byte, from.Length)
_, err := r.shardFiles[from.Shard].ReadAt(chData, int64(from.Slot)*int64(r.datasize))
if err != nil {
return err
}

_, err = r.shardFiles[to.Shard].WriteAt(chData, int64(to.Slot)*int64(r.datasize))
return err
}

func (r *Recovery) TruncateAt(ctx context.Context, shard uint8, slot uint32) error {
r.mtx.Lock()
defer r.mtx.Unlock()

return r.shardFiles[shard].Truncate(int64(slot) * int64(r.datasize))
}

// Save saves all free slots files of the recovery (without closing).
func (r *Recovery) Save() error {
r.mtx.Lock()
Expand Down
192 changes: 192 additions & 0 deletions pkg/storer/compact.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
// Copyright 2023 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package storer

import (
"context"
"errors"
"fmt"
"path"
"sort"
"time"

"github.com/ethersphere/bee/pkg/cac"
"github.com/ethersphere/bee/pkg/log"
"github.com/ethersphere/bee/pkg/sharky"
"github.com/ethersphere/bee/pkg/soc"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storer/internal/chunkstore"
"github.com/ethersphere/bee/pkg/swarm"
"golang.org/x/sync/errgroup"
)

// Compact minimizes sharky disk usage by, using the current sharky locations from the storer,
// relocating chunks starting from the end of the used slots to the first available slots.
func Compact(ctx context.Context, basePath string, opts *Options, validate bool) error {

logger := opts.Logger

store, err := initStore(basePath, opts)
if err != nil {
return fmt.Errorf("failed creating levelDB index store: %w", err)
}
defer func() {
if err := store.Close(); err != nil {
logger.Error(err, "failed closing store")
}
}()

sharkyRecover, err := sharky.NewRecovery(path.Join(basePath, sharkyPath), sharkyNoOfShards, swarm.SocMaxChunkSize)
if err != nil {
return err
}
defer func() {
if err := sharkyRecover.Close(); err != nil {
logger.Error(err, "failed closing sharky recovery")
}
}()

if validate {
logger.Info("performing chunk validation before compaction")
validationWork(logger, store, sharkyRecover)
}

logger.Info("starting compaction")

n := time.Now()

for shard := 0; shard < sharkyNoOfShards; shard++ {

select {
case <-ctx.Done():
return errors.Join(ctx.Err(), sharkyRecover.Save())
default:
}

items := make([]*chunkstore.RetrievalIndexItem, 0, 1_000_000)
// we deliberately choose to iterate the whole store again for each shard
// so that we do not store all the items in memory (for operators with huge localstores)
_ = chunkstore.Iterate(store, func(item *chunkstore.RetrievalIndexItem) error {
if item.Location.Shard == uint8(shard) {
items = append(items, item)
}
return nil
})

sort.Slice(items, func(i, j int) bool {
return items[i].Location.Slot < items[j].Location.Slot
})

lastUsedSlot := items[len(items)-1].Location.Slot
slots := make([]*chunkstore.RetrievalIndexItem, lastUsedSlot+1) // marks free and used slots
for _, l := range items {
slots[l.Location.Slot] = l
}

// start begins at the zero slot. The loop below will increment the position of start until a free slot is found.
// end points to the last slot, and the loop will decrement the position of end until a used slot is found.
// Once start and end point to free and used slots, respectively, the swap of the chunk location will occur.
start := uint32(0)
end := lastUsedSlot

for start < end {
if slots[start] == nil { // free
if slots[end] != nil { // used
from := slots[end]
to := sharky.Location{Slot: start, Length: from.Location.Length, Shard: from.Location.Shard}
if err := sharkyRecover.Move(context.Background(), from.Location, to); err != nil {
return fmt.Errorf("sharky move: %w", err)
}
if err := sharkyRecover.Add(to); err != nil {
return fmt.Errorf("sharky add: %w", err)
}

from.Location = to
if err := store.Put(from); err != nil {
return fmt.Errorf("store put: %w", err)
}

start++
end--
} else {
end-- // keep moving to the left until a used slot is found
}
} else {
start++ // keep moving to the right until a free slot is found
}
}

logger.Info("shard truncated", "shard", fmt.Sprintf("%d/%d", shard, sharkyNoOfShards-1), "slot", end)

if err := sharkyRecover.TruncateAt(context.Background(), uint8(shard), end+1); err != nil {
return fmt.Errorf("sharky truncate: %w", err)
}
}

if err := sharkyRecover.Save(); err != nil {
return fmt.Errorf("sharky save: %w", err)
}

logger.Info("compaction finished", "duration", time.Since(n))

if validate {
logger.Info("performing chunk validation after compaction")
validationWork(logger, store, sharkyRecover)
}

return nil
}

func validationWork(logger log.Logger, store storage.Store, sharky *sharky.Recovery) {

n := time.Now()
defer func() {
logger.Info("validation finished", "duration", time.Since(n))
}()

iteratateItemsC := make(chan *chunkstore.RetrievalIndexItem)

validChunk := func(item *chunkstore.RetrievalIndexItem, buf []byte) error {
err := sharky.Read(context.Background(), item.Location, buf)
if err != nil {
return err
}

ch := swarm.NewChunk(item.Address, buf)
if !cac.Valid(ch) && !soc.Valid(ch) {
return errors.New("invalid chunk")
}

return nil
}

eg := errgroup.Group{}

for i := 0; i < 8; i++ {
eg.Go(func() error {
buf := make([]byte, swarm.SocMaxChunkSize)
for item := range iteratateItemsC {
if err := validChunk(item, buf[:item.Location.Length]); err != nil {
logger.Info("invalid chunk", "address", item.Address, "error", err)
}
}
return nil
})
}

count := 0
_ = chunkstore.Iterate(store, func(item *chunkstore.RetrievalIndexItem) error {
iteratateItemsC <- item
count++
if count%100_000 == 0 {
logger.Info("..still validating chunks", "count", count)
}
return nil
})

close(iteratateItemsC)

_ = eg.Wait()
}
Loading

0 comments on commit 9ce6f3c

Please sign in to comment.