Skip to content

Commit

Permalink
fix: reset reserve only when neighborhood hopping (#4807)
Browse files Browse the repository at this point in the history
  • Loading branch information
istae authored Sep 11, 2024
1 parent 2d50d14 commit ab76c0b
Show file tree
Hide file tree
Showing 13 changed files with 339 additions and 262 deletions.
23 changes: 15 additions & 8 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package node
import (
"context"
"crypto/ecdsa"
"encoding/hex"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -277,7 +278,7 @@ func NewBee(
}
}

var changedOverlay bool
var changedOverlay, resetReserve bool
if targetNeighborhood != "" {
neighborhood, err := swarm.ParseBitStrAddress(targetNeighborhood)
if err != nil {
Expand All @@ -293,21 +294,19 @@ func NewBee(
}

if nonceExists {
logger.Info("Override nonce %x to %x and clean state for neighborhood %s", nonce, newNonce, targetNeighborhood)
logger.Info("Override nonce and clean state for neighborhood", "old_none", hex.EncodeToString(nonce), "new_nonce", hex.EncodeToString(newNonce))
logger.Warning("you have another 10 seconds to change your mind and kill this process with CTRL-C...")
time.Sleep(10 * time.Second)

dirsToNuke := []string{ioutil.DataPathLocalstore, ioutil.DataPathKademlia}
for _, dir := range dirsToNuke {
err := ioutil.RemoveContent(filepath.Join(o.DataDir, dir))
if err != nil {
return nil, fmt.Errorf("delete %s: %w", dir, err)
}
err := ioutil.RemoveContent(filepath.Join(o.DataDir, ioutil.DataPathKademlia))
if err != nil {
return nil, fmt.Errorf("delete %s: %w", ioutil.DataPathKademlia, err)
}

if err := stateStore.ClearForHopping(); err != nil {
return nil, fmt.Errorf("clearing stateStore %w", err)
}
resetReserve = true
}

swarmAddress = newSwarmAddress
Expand Down Expand Up @@ -736,6 +735,14 @@ func NewBee(
b.localstoreCloser = localStore
evictFn = func(id []byte) error { return localStore.EvictBatch(context.Background(), id) }

if resetReserve {
logger.Warning("resetting the reserve")
err := localStore.ResetReserve(ctx)
if err != nil {
return nil, fmt.Errorf("reset reserve: %w", err)
}
}

actLogic := accesscontrol.NewLogic(session)
accesscontrol := accesscontrol.NewController(actLogic)
b.accesscontrolCloser = accesscontrol
Expand Down
126 changes: 65 additions & 61 deletions pkg/storer/internal/chunkstamp/chunkstamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import (
)

var (
// errMarshalInvalidChunkStampItemNamespace is returned during marshaling if the namespace is not set.
errMarshalInvalidChunkStampItemNamespace = errors.New("marshal chunkstamp.Item: invalid namespace")
// errMarshalInvalidChunkStampItemScope is returned during marshaling if the scope is not set.
errMarshalInvalidChunkStampItemScope = errors.New("marshal chunkstamp.Item: invalid scope")
// errMarshalInvalidChunkStampAddress is returned during marshaling if the address is zero.
errMarshalInvalidChunkStampItemAddress = errors.New("marshal chunkstamp.item: invalid address")
// errUnmarshalInvalidChunkStampAddress is returned during unmarshaling if the address is not set.
Expand All @@ -29,55 +29,59 @@ var (
errUnmarshalInvalidChunkStampItemSize = errors.New("unmarshal chunkstamp.item: invalid size")
)

var _ storage.Item = (*item)(nil)
var _ storage.Item = (*Item)(nil)

// item is the index used to represent a stamp for a chunk.
// Item is the index used to represent a stamp for a chunk.
//
// Going ahead we will support multiple stamps on chunks. This item will allow
// Going ahead we will support multiple stamps on chunks. This Item will allow
// mapping multiple stamps to a single address. For this reason, the address is
// part of the Namespace and can be used to iterate on all the stamps for this
// address.
type item struct {
namespace []byte // The namespace of other related item.
address swarm.Address
stamp swarm.Stamp
type Item struct {
scope []byte // The scope of other related item.
address swarm.Address
stamp swarm.Stamp
}

// ID implements the storage.Item interface.
func (i *item) ID() string {
func (i *Item) ID() string {
return storageutil.JoinFields(string(i.stamp.BatchID()), string(i.stamp.Index()))
}

// Namespace implements the storage.Item interface.
func (i *item) Namespace() string {
return storageutil.JoinFields("chunkStamp", string(i.namespace), i.address.ByteString())
func (i *Item) Namespace() string {
return storageutil.JoinFields("chunkStamp", string(i.scope), i.address.ByteString())
}

func (i *Item) SetScope(ns []byte) {
i.scope = ns
}

// Marshal implements the storage.Item interface.
// address is not part of the payload which is stored, as address is part of the
// prefix, hence already known before querying this object. This will be reused
// during unmarshaling.
func (i *item) Marshal() ([]byte, error) {
func (i *Item) Marshal() ([]byte, error) {
// The address is not part of the payload, but it is used to create the
// namespace so it is better if we check that the address is correctly
// scope so it is better if we check that the address is correctly
// set here before it is stored in the underlying storage.

switch {
case len(i.namespace) == 0:
return nil, errMarshalInvalidChunkStampItemNamespace
case len(i.scope) == 0:
return nil, errMarshalInvalidChunkStampItemScope
case i.address.IsZero():
return nil, errMarshalInvalidChunkStampItemAddress
case i.stamp == nil:
return nil, errMarshalInvalidChunkStampItemStamp
}

buf := make([]byte, 8+len(i.namespace)+postage.StampSize)
buf := make([]byte, 8+len(i.scope)+postage.StampSize)

l := 0
binary.LittleEndian.PutUint64(buf[l:l+8], uint64(len(i.namespace)))
binary.LittleEndian.PutUint64(buf[l:l+8], uint64(len(i.scope)))
l += 8
copy(buf[l:l+len(i.namespace)], i.namespace)
l += len(i.namespace)
copy(buf[l:l+len(i.scope)], i.scope)
l += len(i.scope)
data, err := i.stamp.MarshalBinary()
if err != nil {
return nil, fmt.Errorf("unable to marshal chunkstamp.item: %w", err)
Expand All @@ -88,7 +92,7 @@ func (i *item) Marshal() ([]byte, error) {
}

// Unmarshal implements the storage.Item interface.
func (i *item) Unmarshal(bytes []byte) error {
func (i *Item) Unmarshal(bytes []byte) error {
if len(bytes) < 8 {
return errUnmarshalInvalidChunkStampItemSize
}
Expand All @@ -102,9 +106,9 @@ func (i *item) Unmarshal(bytes []byte) error {
return errUnmarshalInvalidChunkStampItemAddress
}

ni := &item{address: i.address.Clone()}
ni := &Item{address: i.address.Clone()}
l := 8
ni.namespace = append(make([]byte, 0, nsLen), bytes[l:l+nsLen]...)
ni.scope = append(make([]byte, 0, nsLen), bytes[l:l+nsLen]...)
l += nsLen
stamp := new(postage.Stamp)
if err := stamp.UnmarshalBinary(bytes[l : l+postage.StampSize]); err != nil {
Expand All @@ -119,13 +123,13 @@ func (i *item) Unmarshal(bytes []byte) error {
}

// Clone implements the storage.Item interface.
func (i *item) Clone() storage.Item {
func (i *Item) Clone() storage.Item {
if i == nil {
return nil
}
clone := &item{
namespace: append([]byte(nil), i.namespace...),
address: i.address.Clone(),
clone := &Item{
scope: append([]byte(nil), i.scope...),
address: i.address.Clone(),
}
if i.stamp != nil {
clone.stamp = i.stamp.Clone()
Expand All @@ -134,31 +138,31 @@ func (i *item) Clone() storage.Item {
}

// String implements the storage.Item interface.
func (i item) String() string {
func (i Item) String() string {
return storageutil.JoinFields(i.Namespace(), i.ID())
}

// Load returns first found swarm.Stamp related to the given address.
func Load(s storage.Reader, namespace string, addr swarm.Address) (swarm.Stamp, error) {
return LoadWithBatchID(s, namespace, addr, nil)
func Load(s storage.Reader, scope string, addr swarm.Address) (swarm.Stamp, error) {
return LoadWithBatchID(s, scope, addr, nil)
}

// LoadWithBatchID returns swarm.Stamp related to the given address and batchID.
func LoadWithBatchID(s storage.Reader, namespace string, addr swarm.Address, batchID []byte) (swarm.Stamp, error) {
func LoadWithBatchID(s storage.Reader, scope string, addr swarm.Address, batchID []byte) (swarm.Stamp, error) {
var stamp swarm.Stamp

found := false
err := s.Iterate(
storage.Query{
Factory: func() storage.Item {
return &item{
namespace: []byte(namespace),
address: addr,
return &Item{
scope: []byte(scope),
address: addr,
}
},
},
func(res storage.Result) (bool, error) {
item := res.Entry.(*item)
item := res.Entry.(*Item)
if batchID == nil || bytes.Equal(batchID, item.stamp.BatchID()) {
stamp = item.stamp
found = true
Expand All @@ -178,12 +182,12 @@ func LoadWithBatchID(s storage.Reader, namespace string, addr swarm.Address, bat
}

// Store creates new or updated an existing stamp index
// record related to the given namespace and chunk.
func Store(s storage.IndexStore, namespace string, chunk swarm.Chunk) error {
item := &item{
namespace: []byte(namespace),
address: chunk.Address(),
stamp: chunk.Stamp(),
// record related to the given scope and chunk.
func Store(s storage.IndexStore, scope string, chunk swarm.Chunk) error {
item := &Item{
scope: []byte(scope),
address: chunk.Address(),
stamp: chunk.Stamp(),
}
if err := s.Put(item); err != nil {
return fmt.Errorf("unable to put chunkstamp.item %s: %w", item, err)
Expand All @@ -192,19 +196,19 @@ func Store(s storage.IndexStore, namespace string, chunk swarm.Chunk) error {
}

// DeleteAll removes all swarm.Stamp related to the given address.
func DeleteAll(s storage.IndexStore, namespace string, addr swarm.Address) error {
func DeleteAll(s storage.IndexStore, scope string, addr swarm.Address) error {
var stamps []swarm.Stamp
err := s.Iterate(
storage.Query{
Factory: func() storage.Item {
return &item{
namespace: []byte(namespace),
address: addr,
return &Item{
scope: []byte(scope),
address: addr,
}
},
},
func(res storage.Result) (bool, error) {
stamps = append(stamps, res.Entry.(*item).stamp)
stamps = append(stamps, res.Entry.(*Item).stamp)
return false, nil
},
)
Expand All @@ -216,41 +220,41 @@ func DeleteAll(s storage.IndexStore, namespace string, addr swarm.Address) error
for _, stamp := range stamps {
errs = errors.Join(
errs,
s.Delete(&item{
namespace: []byte(namespace),
address: addr,
stamp: stamp,
s.Delete(&Item{
scope: []byte(scope),
address: addr,
stamp: stamp,
}),
)
}
return errs
}

// Delete removes a stamp associated with an chunk and batchID.
func Delete(s storage.IndexStore, namespace string, addr swarm.Address, batchId []byte) error {
stamp, err := LoadWithBatchID(s, namespace, addr, batchId)
func Delete(s storage.IndexStore, scope string, addr swarm.Address, batchId []byte) error {
stamp, err := LoadWithBatchID(s, scope, addr, batchId)
if err != nil {
if errors.Is(err, storage.ErrNotFound) {
return nil
}
return err
}
return s.Delete(&item{
namespace: []byte(namespace),
address: addr,
stamp: stamp,
return s.Delete(&Item{
scope: []byte(scope),
address: addr,
stamp: stamp,
})
}

func DeleteWithStamp(
writer storage.Writer,
namespace string,
scope string,
addr swarm.Address,
stamp swarm.Stamp,
) error {
return writer.Delete(&item{
namespace: []byte(namespace),
address: addr,
stamp: stamp,
return writer.Delete(&Item{
scope: []byte(scope),
address: addr,
stamp: stamp,
})
}
6 changes: 2 additions & 4 deletions pkg/storer/internal/chunkstamp/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@ package chunkstamp

import "github.com/ethersphere/bee/v2/pkg/swarm"

type Item = item

func (i *Item) WithNamespace(ns string) *Item {
i.namespace = []byte(ns)
i.scope = []byte(ns)
return i
}

Expand All @@ -24,7 +22,7 @@ func (i *Item) WithStamp(stamp swarm.Stamp) *Item {
}

var (
ErrMarshalInvalidChunkStampItemNamespace = errMarshalInvalidChunkStampItemNamespace
ErrMarshalInvalidChunkStampItemNamespace = errMarshalInvalidChunkStampItemScope
ErrMarshalInvalidChunkStampItemAddress = errMarshalInvalidChunkStampItemAddress
ErrUnmarshalInvalidChunkStampItemAddress = errUnmarshalInvalidChunkStampItemAddress
ErrMarshalInvalidChunkStampItemStamp = errMarshalInvalidChunkStampItemStamp
Expand Down
Loading

0 comments on commit ab76c0b

Please sign in to comment.