Skip to content

Commit

Permalink
fix: comments
Browse files Browse the repository at this point in the history
  • Loading branch information
istae committed May 8, 2024
1 parent 8f0f905 commit 709e2b7
Show file tree
Hide file tree
Showing 21 changed files with 196 additions and 187 deletions.
1 change: 1 addition & 0 deletions cmd/bee/cmd/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ func dbRepairReserve(cmd *cobra.Command) {
logger.Warning("Repair will recreate the reserve entries based on the chunk availability in the chunkstore. The epoch time and bin IDs will be reset.")
logger.Warning("The pullsync peer sync intervals are reset so on the next run, the node will perform historical syncing.")
logger.Warning("This is a destructive process. If the process is stopped for any reason, the reserve may become corrupted.")
logger.Warning("To prevent permanent loss of data, data should be backed up before running the cmd.")
logger.Warning("You have another 10 seconds to change your mind and kill this process with CTRL-C...")
time.Sleep(10 * time.Second)
logger.Warning("proceeding with repair...")
Expand Down
2 changes: 1 addition & 1 deletion pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ const (
minPaymentThreshold = 2 * refreshRate // minimal accepted payment threshold of full nodes
maxPaymentThreshold = 24 * refreshRate // maximal accepted payment threshold of full nodes
mainnetNetworkID = uint64(1) //
ReserveCapacity = 4194304 // 2^22 chunks
ReserveCapacity = 4_194_304 // 2^22 chunks
reserveWakeUpDuration = 15 * time.Minute // time to wait before waking up reserveWorker
reserveTreshold = ReserveCapacity * 5 / 10
reserveMinimumRadius = 0
Expand Down
6 changes: 5 additions & 1 deletion pkg/node/statestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@ func InitStateStore(logger log.Logger, dataDir string, cacheCapacity uint64) (st
return nil, nil, err
}

caching := cache.MustWrap(ldb, int(cacheCapacity))
caching, err := cache.Wrap(ldb, int(cacheCapacity))
if err != nil {
return nil, nil, err
}

stateStore, err := storeadapter.NewStateStorerAdapter(caching)

return stateStore, caching, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/puller/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const (
IntervalPrefix = "sync_interval"
recalcPeersDur = time.Minute * 5

maxChunksPerSecond = 1000
maxChunksPerSecond = 1000 // roughly 4 MB/s
)

type Options struct {
Expand Down
9 changes: 0 additions & 9 deletions pkg/storage/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,6 @@ func Wrap(store storage.IndexStore, capacity int) (*Cache, error) {
return &Cache{store, lru, newMetrics()}, nil
}

// MustWrap is like Wrap but panics on error.
func MustWrap(store storage.IndexStore, capacity int) *Cache {
c, err := Wrap(store, capacity)
if err != nil {
panic(err)
}
return c
}

// add caches given item.
func (c *Cache) add(i storage.Item) {
b, err := i.Marshal()
Expand Down
17 changes: 11 additions & 6 deletions pkg/storer/internal/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,12 +253,17 @@ func (c *Cache) ShallowCopy(

defer func() {
if err != nil {
_ = store.Run(context.Background(), func(s transaction.Store) error {
for _, entry := range entries {
err = errors.Join(s.ChunkStore().Delete(context.Background(), entry.Address))
}
return nil
})
err = errors.Join(err,
store.Run(context.Background(), func(s transaction.Store) error {
for _, entry := range entries {
dErr := s.ChunkStore().Delete(context.Background(), entry.Address)
if dErr != nil {
return dErr
}
}
return nil
}),
)
}
}()

Expand Down
112 changes: 106 additions & 6 deletions pkg/storer/internal/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ import (
"time"

storage "github.com/ethersphere/bee/v2/pkg/storage"
"github.com/ethersphere/bee/v2/pkg/storage/inmemchunkstore"
"github.com/ethersphere/bee/v2/pkg/storage/inmemstore"
"github.com/ethersphere/bee/v2/pkg/storage/storagetest"
chunktest "github.com/ethersphere/bee/v2/pkg/storage/testing"
"github.com/ethersphere/bee/v2/pkg/storer/internal"
"github.com/ethersphere/bee/v2/pkg/storer/internal/cache"
"github.com/ethersphere/bee/v2/pkg/storer/internal/transaction"
"github.com/ethersphere/bee/v2/pkg/swarm"
Expand Down Expand Up @@ -87,11 +88,6 @@ func TestCacheEntryItem(t *testing.T) {
}
}

func newTestStorage(t *testing.T) transaction.Storage {
t.Helper()
return internal.NewInmemStorage()
}

type timeProvider struct {
t int64
mtx sync.Mutex
Expand Down Expand Up @@ -248,6 +244,59 @@ func TestCache(t *testing.T) {
verifyCacheState(t, st.IndexStore(), c, state.Head, state.Tail, state.Size)
}
})

t.Run("handle error", func(t *testing.T) {
t.Parallel()

st := newTestStorage(t)
c, err := cache.New(context.TODO(), st.IndexStore(), 10)
if err != nil {
t.Fatal(err)
}

chunks := chunktest.GenerateTestRandomChunks(5)

for _, ch := range chunks {
err := c.Putter(st).Put(context.TODO(), ch)
if err != nil {
t.Fatal(err)
}
}
// return error for state update, which occurs at the end of Get/Put operations
retErr := errors.New("dummy error")

st.indexStore.putFn = func(i storage.Item) error {
if i.Namespace() == "cacheOrderIndex" {
return retErr
}

return st.indexStore.IndexStore.Put(i)
}

// on error the cache expects the overarching transactions to clean itself up
// and undo any store updates. So here we only want to ensure the state is
// reverted to correct one.
t.Run("put error handling", func(t *testing.T) {
newChunk := chunktest.GenerateTestRandomChunk()
err := c.Putter(st).Put(context.TODO(), newChunk)
if !errors.Is(err, retErr) {
t.Fatalf("expected error %v during put, found %v", retErr, err)
}

// state should be preserved on failure
verifyCacheState(t, st.IndexStore(), c, chunks[0].Address(), chunks[4].Address(), 5)
})

t.Run("get error handling", func(t *testing.T) {
_, err := c.Getter(st).Get(context.TODO(), chunks[2].Address())
if !errors.Is(err, retErr) {
t.Fatalf("expected error %v during get, found %v", retErr, err)
}

// state should be preserved on failure
verifyCacheState(t, st.IndexStore(), c, chunks[0].Address(), chunks[4].Address(), 5)
})
})
})
}

Expand Down Expand Up @@ -529,3 +578,54 @@ func verifyChunksExist(
}
}
}

type inmemStorage struct {
indexStore *customIndexStore
chunkStore storage.ChunkStore
}

func newTestStorage(t *testing.T) *inmemStorage {
t.Helper()

ts := &inmemStorage{
indexStore: &customIndexStore{inmemstore.New(), nil},
chunkStore: inmemchunkstore.New(),
}

return ts
}

type customIndexStore struct {
storage.IndexStore
putFn func(storage.Item) error
}

func (s *customIndexStore) Put(i storage.Item) error {
if s.putFn != nil {
return s.putFn(i)
}
return s.IndexStore.Put(i)
}

func (t *inmemStorage) NewTransaction(ctx context.Context) (transaction.Transaction, func()) {
return &inmemTrx{t.indexStore, t.chunkStore}, func() {}
}

type inmemTrx struct {
indexStore storage.IndexStore
chunkStore storage.ChunkStore
}

func (t *inmemStorage) IndexStore() storage.Reader { return t.indexStore }
func (t *inmemStorage) ChunkStore() storage.ReadOnlyChunkStore { return t.chunkStore }

func (t *inmemTrx) IndexStore() storage.IndexStore { return t.indexStore }
func (t *inmemTrx) ChunkStore() storage.ChunkStore { return t.chunkStore }
func (t *inmemTrx) Commit() error { return nil }

func (t *inmemStorage) Close() error { return nil }
func (t *inmemStorage) Run(ctx context.Context, f func(s transaction.Store) error) error {
trx, done := t.NewTransaction(ctx)
defer done()
return f(trx)
}
8 changes: 6 additions & 2 deletions pkg/storer/internal/chunkstore/chunkstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,10 @@ type IterateResult struct {
func IterateLocations(
ctx context.Context,
st storage.Reader,
locationResultC chan<- LocationResult,
) {
) <-chan LocationResult {

locationResultC := make(chan LocationResult)

go func() {
defer close(locationResultC)

Expand All @@ -190,6 +192,8 @@ func IterateLocations(
}
}
}()

return locationResultC
}

// Iterate iterates over entire retrieval index with a call back.
Expand Down
6 changes: 2 additions & 4 deletions pkg/storer/internal/chunkstore/chunkstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,8 +362,7 @@ func TestIterateLocations(t *testing.T) {
}

readCount := 0
respC := make(chan chunkstore.LocationResult, chunksCount)
chunkstore.IterateLocations(ctx, st.IndexStore(), respC)
respC := chunkstore.IterateLocations(ctx, st.IndexStore())

for resp := range respC {
assert.NoError(t, resp.Err)
Expand Down Expand Up @@ -396,8 +395,7 @@ func TestIterateLocations_Stop(t *testing.T) {
}

readCount := 0
respC := make(chan chunkstore.LocationResult)
chunkstore.IterateLocations(ctx, st.IndexStore(), respC)
respC := chunkstore.IterateLocations(ctx, st.IndexStore())

for resp := range respC {
if resp.Err != nil {
Expand Down
24 changes: 12 additions & 12 deletions pkg/storer/internal/reserve/items.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,11 @@ func (b *BatchRadiusItem) Unmarshal(buf []byte) error {
// ChunkBinItem allows for iterating on ranges of bin and binIDs for chunks.
// BinIDs come in handy when syncing the reserve contents with other peers.
type ChunkBinItem struct {
Bin uint8
BinID uint64
Address swarm.Address
BatchID []byte
Type swarm.ChunkType
Bin uint8
BinID uint64
Address swarm.Address
BatchID []byte
ChunkType swarm.ChunkType
}

func (c *ChunkBinItem) Namespace() string {
Expand All @@ -133,11 +133,11 @@ func (c *ChunkBinItem) Clone() storage.Item {
return nil
}
return &ChunkBinItem{
Bin: c.Bin,
BinID: c.BinID,
Address: c.Address.Clone(),
BatchID: copyBytes(c.BatchID),
Type: c.Type,
Bin: c.Bin,
BinID: c.BinID,
Address: c.Address.Clone(),
BatchID: copyBytes(c.BatchID),
ChunkType: c.ChunkType,
}
}

Expand All @@ -164,7 +164,7 @@ func (c *ChunkBinItem) Marshal() ([]byte, error) {
copy(buf[i:i+swarm.HashSize], c.BatchID)
i += swarm.HashSize

buf[i] = uint8(c.Type)
buf[i] = uint8(c.ChunkType)

return buf, nil
}
Expand All @@ -188,7 +188,7 @@ func (c *ChunkBinItem) Unmarshal(buf []byte) error {
c.BatchID = copyBytes(buf[i : i+swarm.HashSize])
i += swarm.HashSize

c.Type = swarm.ChunkType(buf[i])
c.ChunkType = swarm.ChunkType(buf[i])

return nil
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/storer/internal/reserve/reserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,11 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error {
}

err = s.IndexStore().Put(&ChunkBinItem{
Bin: bin,
BinID: binID,
Address: chunk.Address(),
BatchID: chunk.Stamp().BatchID(),
Type: storage.ChunkType(chunk),
Bin: bin,
BinID: binID,
Address: chunk.Address(),
BatchID: chunk.Stamp().BatchID(),
ChunkType: storage.ChunkType(chunk),
})
if err != nil {
return err
Expand Down
6 changes: 3 additions & 3 deletions pkg/storer/internal/reserve/reserve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,12 @@ func TestReserveChunkType(t *testing.T) {
Factory: func() storage.Item { return &reserve.ChunkBinItem{} },
}, func(res storage.Result) (bool, error) {
item := res.Entry.(*reserve.ChunkBinItem)
if item.Type == swarm.ChunkTypeContentAddressed {
if item.ChunkType == swarm.ChunkTypeContentAddressed {
storedChunksCA--
} else if item.Type == swarm.ChunkTypeSingleOwner {
} else if item.ChunkType == swarm.ChunkTypeSingleOwner {
storedChunksSO--
} else {
t.Fatalf("unexpected chunk type: %d", item.Type)
t.Fatalf("unexpected chunk type: %d", item.ChunkType)
}
return false, nil
})
Expand Down
Loading

0 comments on commit 709e2b7

Please sign in to comment.