diff --git a/cmd/bee/cmd/db.go b/cmd/bee/cmd/db.go index e5ae3ae0635..ffb17771172 100644 --- a/cmd/bee/cmd/db.go +++ b/cmd/bee/cmd/db.go @@ -131,7 +131,7 @@ func dbCompactCmd(cmd *cobra.Command) { } logger.Warning("Compaction is a destructive process. If the process is stopped for any reason, the localstore may become corrupted.") - logger.Warning("It is highly advised to perform the compaction on a copy of the localtore.") + logger.Warning("It is highly advised to perform the compaction on a copy of the localstore.") logger.Warning("After compaction finishes, the data directory may be replaced with the compacted version.") logger.Warning("you have another 10 seconds to change your mind and kill this process with CTRL-C...") time.Sleep(10 * time.Second) diff --git a/pkg/cac/cac.go b/pkg/cac/cac.go index 9e1f7ef321b..44af58d573b 100644 --- a/pkg/cac/cac.go +++ b/pkg/cac/cac.go @@ -57,7 +57,7 @@ func validateDataLength(dataLength int) error { // newWithSpan creates a new chunk prepending the given span to the data. func newWithSpan(data, span []byte) (swarm.Chunk, error) { - hash, err := doHash(data, span) + hash, err := DoHash(data, span) if err != nil { return nil, err } @@ -77,12 +77,12 @@ func Valid(c swarm.Chunk) bool { return false } - hash, _ := doHash(data[swarm.SpanSize:], data[:swarm.SpanSize]) + hash, _ := DoHash(data[swarm.SpanSize:], data[:swarm.SpanSize]) return bytes.Equal(hash, c.Address().Bytes()) } -func doHash(data, span []byte) ([]byte, error) { +func DoHash(data, span []byte) ([]byte, error) { hasher := bmtpool.Get() defer bmtpool.Put(hasher) diff --git a/pkg/sharky/recovery.go b/pkg/sharky/recovery.go index 5a223e4c233..2b19ee93935 100644 --- a/pkg/sharky/recovery.go +++ b/pkg/sharky/recovery.go @@ -75,14 +75,7 @@ func (r *Recovery) Add(loc Location) error { func (r *Recovery) Read(ctx context.Context, loc Location, buf []byte) error { r.mtx.Lock() defer r.mtx.Unlock() - - shFile := r.shardFiles[loc.Shard] - if stat, err := shFile.Stat(); err != nil { - return err - } else if stat.Size() < int64(loc.Slot)*int64(r.datasize) { - return errors.New("slot not found") - } - _, err := shFile.ReadAt(buf, int64(loc.Slot)*int64(r.datasize)) + _, err := r.shardFiles[loc.Shard].ReadAt(buf, int64(loc.Slot)*int64(r.datasize)) return err } diff --git a/pkg/sharky/shard.go b/pkg/sharky/shard.go index e0b0909cc34..edb2faa8fee 100644 --- a/pkg/sharky/shard.go +++ b/pkg/sharky/shard.go @@ -7,6 +7,7 @@ package sharky import ( "context" "encoding/binary" + "fmt" "io" ) @@ -20,6 +21,10 @@ type Location struct { Length uint16 } +func (l Location) String() string { + return fmt.Sprintf("shard: %d, slot: %d, length: %d", l.Shard, l.Slot, l.Length) +} + // MarshalBinary returns byte representation of location func (l *Location) MarshalBinary() ([]byte, error) { b := make([]byte, LocationSize) diff --git a/pkg/storer/compact.go b/pkg/storer/compact.go index 4a21ed71c76..c9a2846c21b 100644 --- a/pkg/storer/compact.go +++ b/pkg/storer/compact.go @@ -91,15 +91,20 @@ func Compact(ctx context.Context, basePath string, opts *Options, validate bool) start := uint32(0) end := lastUsedSlot + batch, err := store.Batch(ctx) + if err != nil { + return err + } + for start < end { - if slots[end] == nil { - end-- // walk to the left until a used slot found + if slots[start] != nil { + start++ // walk to the right until a free slot is found continue } - if slots[start] != nil { - start++ // walk to the right until a free slot is found + if slots[end] == nil { + end-- // walk to the left until a used slot found continue } @@ -113,7 +118,7 @@ func Compact(ctx context.Context, basePath string, opts *Options, validate bool) } from.Location = to - if err := store.Put(from); err != nil { + if err := batch.Put(from); err != nil { return fmt.Errorf("store put: %w", err) } @@ -121,6 +126,10 @@ func Compact(ctx context.Context, basePath string, opts *Options, validate bool) end-- } + if err := batch.Commit(); err != nil { + return err + } + logger.Info("shard truncated", "shard", fmt.Sprintf("%d/%d", shard, sharkyNoOfShards-1), "slot", end) if err := sharkyRecover.TruncateAt(context.Background(), uint8(shard), end+1); err != nil { @@ -151,18 +160,40 @@ func validationWork(logger log.Logger, store storage.Store, sharky *sharky.Recov iteratateItemsC := make(chan *chunkstore.RetrievalIndexItem) - validChunk := func(item *chunkstore.RetrievalIndexItem, buf []byte) error { + validChunk := func(item *chunkstore.RetrievalIndexItem, buf []byte) { err := sharky.Read(context.Background(), item.Location, buf) if err != nil { - return err + logger.Warning("invalid chunk", "address", item.Address, "timestamp", time.Unix(int64(item.Timestamp), 0), "location", item.Location, "error", err) + return } ch := swarm.NewChunk(item.Address, buf) if !cac.Valid(ch) && !soc.Valid(ch) { - return errors.New("invalid chunk") - } - return nil + logger.Info("invalid cac/soc chunk ", "address", item.Address, "timestamp", time.Unix(int64(item.Timestamp), 0)) + + h, err := cac.DoHash(buf[swarm.SpanSize:], buf[:swarm.SpanSize]) + if err != nil { + logger.Error(err, "cac hash") + return + } + + computedAddr := swarm.NewAddress(h) + + if !cac.Valid(swarm.NewChunk(computedAddr, buf)) { + logger.Info("computed chunk is also an invalid cac") + return + } + + shardedEntry := chunkstore.RetrievalIndexItem{Address: computedAddr} + err = store.Get(&shardedEntry) + if err != nil { + logger.Info("no shared entry found") + return + } + + logger.Info("retrieved chunk with shared slot", "shared_address", shardedEntry.Address, "shared_timestamp", time.Unix(int64(shardedEntry.Timestamp), 0)) + } } var wg sync.WaitGroup @@ -173,9 +204,7 @@ func validationWork(logger log.Logger, store storage.Store, sharky *sharky.Recov defer wg.Done() buf := make([]byte, swarm.SocMaxChunkSize) for item := range iteratateItemsC { - if err := validChunk(item, buf[:item.Location.Length]); err != nil { - logger.Info("invalid chunk", "address", item.Address, "error", err) - } + validChunk(item, buf[:item.Location.Length]) } }() }