Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(compact): log retrieval item data of invalid chunks #4354

Merged
merged 5 commits into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/bee/cmd/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func dbCompactCmd(cmd *cobra.Command) {
}

logger.Warning("Compaction is a destructive process. If the process is stopped for any reason, the localstore may become corrupted.")
logger.Warning("It is highly advised to perform the compaction on a copy of the localtore.")
logger.Warning("It is highly advised to perform the compaction on a copy of the localstore.")
logger.Warning("After compaction finishes, the data directory may be replaced with the compacted version.")
logger.Warning("you have another 10 seconds to change your mind and kill this process with CTRL-C...")
time.Sleep(10 * time.Second)
Expand Down
6 changes: 3 additions & 3 deletions pkg/cac/cac.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func validateDataLength(dataLength int) error {

// newWithSpan creates a new chunk prepending the given span to the data.
func newWithSpan(data, span []byte) (swarm.Chunk, error) {
hash, err := doHash(data, span)
hash, err := DoHash(data, span)
if err != nil {
return nil, err
}
Expand All @@ -77,12 +77,12 @@ func Valid(c swarm.Chunk) bool {
return false
}

hash, _ := doHash(data[swarm.SpanSize:], data[:swarm.SpanSize])
hash, _ := DoHash(data[swarm.SpanSize:], data[:swarm.SpanSize])

return bytes.Equal(hash, c.Address().Bytes())
}

func doHash(data, span []byte) ([]byte, error) {
func DoHash(data, span []byte) ([]byte, error) {
hasher := bmtpool.Get()
defer bmtpool.Put(hasher)

Expand Down
9 changes: 1 addition & 8 deletions pkg/sharky/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,7 @@ func (r *Recovery) Add(loc Location) error {
func (r *Recovery) Read(ctx context.Context, loc Location, buf []byte) error {
r.mtx.Lock()
defer r.mtx.Unlock()

shFile := r.shardFiles[loc.Shard]
if stat, err := shFile.Stat(); err != nil {
return err
} else if stat.Size() < int64(loc.Slot)*int64(r.datasize) {
return errors.New("slot not found")
}
_, err := shFile.ReadAt(buf, int64(loc.Slot)*int64(r.datasize))
_, err := r.shardFiles[loc.Shard].ReadAt(buf, int64(loc.Slot)*int64(r.datasize))
return err
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/sharky/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package sharky
import (
"context"
"encoding/binary"
"fmt"
"io"
)

Expand All @@ -20,6 +21,10 @@ type Location struct {
Length uint16
}

func (l Location) String() string {
return fmt.Sprintf("shard: %d, slot: %d, length: %d", l.Shard, l.Slot, l.Length)
}

// MarshalBinary returns byte representation of location
func (l *Location) MarshalBinary() ([]byte, error) {
b := make([]byte, LocationSize)
Expand Down
55 changes: 42 additions & 13 deletions pkg/storer/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,20 @@ func Compact(ctx context.Context, basePath string, opts *Options, validate bool)
start := uint32(0)
end := lastUsedSlot

batch, err := store.Batch(ctx)
if err != nil {
return err
}

for start < end {

if slots[end] == nil {
end-- // walk to the left until a used slot found
if slots[start] != nil {
start++ // walk to the right until a free slot is found
continue
}

if slots[start] != nil {
start++ // walk to the right until a free slot is found
if slots[end] == nil {
end-- // walk to the left until a used slot found
continue
}

Expand All @@ -113,14 +118,18 @@ func Compact(ctx context.Context, basePath string, opts *Options, validate bool)
}

from.Location = to
if err := store.Put(from); err != nil {
if err := batch.Put(from); err != nil {
return fmt.Errorf("store put: %w", err)
}

start++
end--
}

if err := batch.Commit(); err != nil {
return err
}

logger.Info("shard truncated", "shard", fmt.Sprintf("%d/%d", shard, sharkyNoOfShards-1), "slot", end)

if err := sharkyRecover.TruncateAt(context.Background(), uint8(shard), end+1); err != nil {
Expand Down Expand Up @@ -151,18 +160,40 @@ func validationWork(logger log.Logger, store storage.Store, sharky *sharky.Recov

iteratateItemsC := make(chan *chunkstore.RetrievalIndexItem)

validChunk := func(item *chunkstore.RetrievalIndexItem, buf []byte) error {
validChunk := func(item *chunkstore.RetrievalIndexItem, buf []byte) {
err := sharky.Read(context.Background(), item.Location, buf)
if err != nil {
return err
logger.Warning("invalid chunk", "address", item.Address, "timestamp", time.Unix(int64(item.Timestamp), 0), "location", item.Location, "error", err)
return
}

ch := swarm.NewChunk(item.Address, buf)
if !cac.Valid(ch) && !soc.Valid(ch) {
return errors.New("invalid chunk")
}

return nil
logger.Info("invalid cac/soc chunk ", "address", item.Address, "timestamp", time.Unix(int64(item.Timestamp), 0))

h, err := cac.DoHash(buf[swarm.SpanSize:], buf[:swarm.SpanSize])
if err != nil {
logger.Error(err, "cac hash")
return
}

computedAddr := swarm.NewAddress(h)

if !cac.Valid(swarm.NewChunk(computedAddr, buf)) {
logger.Info("computed chunk is also an invalid cac")
return
}

shardedEntry := chunkstore.RetrievalIndexItem{Address: computedAddr}
err = store.Get(&shardedEntry)
if err != nil {
logger.Info("no shared entry found")
return
}

logger.Info("retrieved chunk with shared slot", "shared_address", shardedEntry.Address, "shared_timestamp", time.Unix(int64(shardedEntry.Timestamp), 0))
}
}

var wg sync.WaitGroup
Expand All @@ -173,9 +204,7 @@ func validationWork(logger log.Logger, store storage.Store, sharky *sharky.Recov
defer wg.Done()
buf := make([]byte, swarm.SocMaxChunkSize)
for item := range iteratateItemsC {
if err := validChunk(item, buf[:item.Location.Length]); err != nil {
logger.Info("invalid chunk", "address", item.Address, "error", err)
}
validChunk(item, buf[:item.Location.Length])
}
}()
}
Expand Down
Loading