Skip to content

Commit

Permalink
fix: rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
istae committed Feb 5, 2024
1 parent 536034b commit 6ff67c1
Show file tree
Hide file tree
Showing 8 changed files with 99 additions and 107 deletions.
2 changes: 1 addition & 1 deletion pkg/storer/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (db *DB) DebugInfo(ctx context.Context) (Info, error) {
synced uint64
)
eg.Go(func() error {
return upload.IterateAllTagItems(db.repo.IndexStore(), func(ti *upload.TagItem) (bool, error) {
return upload.IterateAllTagItems(db.storage.ReadOnly().IndexStore(), func(ti *upload.TagItem) (bool, error) {
select {
case <-ctx.Done():
return true, ctx.Err()
Expand Down
36 changes: 12 additions & 24 deletions pkg/storer/internal/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,23 +196,24 @@ func (c *Cache) ShallowCopy(

entries := make([]*cacheEntry, 0, len(addrs))

entriesToAdd := make([]*cacheEntry, 0, len(addrs))

defer func() {
if err != nil {
for _, entry := range entries {
err = errors.Join(store.ChunkStore().Delete(context.Background(), entry.Address))
}
_ = store.Run(context.Background(), func(s transaction.Store) error {
for _, entry := range entries {
err = errors.Join(s.ChunkStore().Delete(context.Background(), entry.Address))
}
return nil
})
}
}()

for _, addr := range addrs {
entry := &cacheEntry{Address: addr, AccessTimestamp: now().UnixNano()}
if has, err := store.IndexStore().Has(entry); err == nil && has {
if has, err := store.ReadOnly().IndexStore().Has(entry); err == nil && has {
// Since the caller has previously referenced the chunk (+1 refCnt), and if the chunk is already referenced
// by the cache store (+1 refCnt), then we must decrement the refCnt by one ( -1 refCnt to bring the total to +1).
// See https://github.com/ethersphere/bee/issues/4530.
_ = store.ChunkStore().Delete(ctx, addr)
_ = store.Run(ctx, func(s transaction.Store) error { return s.ChunkStore().Delete(ctx, addr) })
continue
}
entries = append(entries, entry)
Expand All @@ -225,23 +226,13 @@ func (c *Cache) ShallowCopy(
//consider only the amount that can fit, the rest should be deleted from the chunkstore.
if len(entries) > c.capacity {
for _, addr := range entries[:len(entries)-c.capacity] {
_ = store.ChunkStore().Delete(ctx, addr.Address)
_ = store.Run(ctx, func(s transaction.Store) error { return s.ChunkStore().Delete(ctx, addr.Address) })
}
entries = entries[len(entries)-c.capacity:]
}

batch, err := store.IndexStore().Batch(ctx)
if err != nil {
return fmt.Errorf("failed creating batch: %w", err)
}

for _, entry := range entries {
err = batch.Put(entry)
if err != nil {
return fmt.Errorf("failed adding entry %s: %w", entry, err)
}

for _, entry := range entriesToAdd {
err = store.Run(ctx, func(s transaction.Store) error {
for _, entry := range entries {
err = s.IndexStore().Put(entry)
if err != nil {
return fmt.Errorf("failed adding entry %s: %w", entry, err)
Expand All @@ -254,15 +245,12 @@ func (c *Cache) ShallowCopy(
return fmt.Errorf("failed adding cache order index: %w", err)
}
}

return nil
})
if err == nil {
c.size.Add(int64(len(entriesToAdd)))
c.size.Add(int64(len(entries)))
}

c.size.Add(int64(len(entries)))

return nil
}

Expand Down
15 changes: 9 additions & 6 deletions pkg/storer/internal/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ func TestShallowCopyAlreadyCached(t *testing.T) {
t.Parallel()

st := newTestStorage(t)
c, err := cache.New(context.Background(), st, 1000)
c, err := cache.New(context.Background(), st.ReadOnly().IndexStore(), 1000)
if err != nil {
t.Fatal(err)
}
Expand All @@ -426,7 +426,10 @@ func TestShallowCopyAlreadyCached(t *testing.T) {

for _, ch := range chunks {
// add the chunks to chunkstore. This simulates the reserve already populating the chunkstore with chunks.
err := st.ChunkStore().Put(context.Background(), ch)

err := st.Run(context.Background(), func(s transaction.Store) error {
return s.ChunkStore().Put(context.Background(), ch)
})
if err != nil {
t.Fatal(err)
}
Expand All @@ -444,14 +447,14 @@ func TestShallowCopyAlreadyCached(t *testing.T) {
t.Fatal(err)
}

verifyChunksExist(t, st.ChunkStore(), chunks...)
verifyChunksExist(t, st.ReadOnly().ChunkStore(), chunks...)

err = c.RemoveOldest(context.Background(), st, st.ChunkStore(), 10)
err = c.RemoveOldest(context.Background(), st, 10)
if err != nil {
t.Fatal(err)
}

verifyChunksDeleted(t, st.ChunkStore(), chunks...)
verifyChunksDeleted(t, st.ReadOnly().ChunkStore(), chunks...)
}

func verifyCacheState(
Expand Down Expand Up @@ -525,7 +528,7 @@ func verifyChunksDeleted(

func verifyChunksExist(
t *testing.T,
chStore storage.ChunkStore,
chStore storage.ReadOnlyChunkStore,
chs ...swarm.Chunk,
) {
t.Helper()
Expand Down
6 changes: 3 additions & 3 deletions pkg/storer/internal/upload/uploadstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -679,7 +679,7 @@ func Report(
return fmt.Errorf("failed deleting chunk %s: %w", chunk.Address(), err)
}

err = batch.Delete(ui)
err = indexStore.Delete(ui)
if err != nil {
return fmt.Errorf("failed deleting uploadItem %s: %w", ui, err)
}
Expand Down Expand Up @@ -815,7 +815,7 @@ func DeleteTag(st storage.Writer, tagID uint64) error {
return nil
}

func IterateAll(st storage.Store, iterateFn func(item storage.Item) (bool, error)) error {
func IterateAll(st storage.Reader, iterateFn func(item storage.Item) (bool, error)) error {
return st.Iterate(
storage.Query{
Factory: func() storage.Item { return new(uploadItem) },
Expand All @@ -830,7 +830,7 @@ func IterateAll(st storage.Store, iterateFn func(item storage.Item) (bool, error
)
}

func IterateAllTagItems(st storage.Store, cb func(ti *TagItem) (bool, error)) error {
func IterateAllTagItems(st storage.Reader, cb func(ti *TagItem) (bool, error)) error {
return st.Iterate(
storage.Query{
Factory: func() storage.Item {
Expand Down
10 changes: 5 additions & 5 deletions pkg/storer/internal/upload/uploadstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ func TestChunkPutter(t *testing.T) {

t.Run("iterate all", func(t *testing.T) {
count := 0
err := ts.IndexStore().Iterate(
err := ts.ReadOnly().IndexStore().Iterate(
storage.Query{
Factory: func() storage.Item { return new(upload.UploadItem) },
},
Expand All @@ -538,7 +538,7 @@ func TestChunkPutter(t *testing.T) {
if synced {
t.Fatal("expected synced to be false")
}
has, err := ts.ChunkStore().Has(context.Background(), address)
has, err := ts.ReadOnly().ChunkStore().Has(context.Background(), address)
if err != nil {
t.Fatalf("unexpected error in Has(...): %v", err)
}
Expand Down Expand Up @@ -589,7 +589,7 @@ func TestChunkPutter(t *testing.T) {

t.Run("iterate all tag items", func(t *testing.T) {
var tagItemsCount, uploaded, synced uint64
err := upload.IterateAllTagItems(ts.IndexStore(), func(ti *upload.TagItem) (bool, error) {
err := upload.IterateAllTagItems(ts.ReadOnly().IndexStore(), func(ti *upload.TagItem) (bool, error) {
uploaded += ti.Split
synced += ti.Synced
tagItemsCount++
Expand Down Expand Up @@ -764,7 +764,7 @@ func TestChunkReporter(t *testing.T) {
Address: chunk.Address(),
BatchID: chunk.Stamp().BatchID(),
}
has, err := ts.IndexStore().Has(ui)
has, err := ts.ReadOnly().IndexStore().Has(ui)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
Expand All @@ -777,7 +777,7 @@ func TestChunkReporter(t *testing.T) {
Address: chunk.Address(),
BatchID: chunk.Stamp().BatchID(),
}
has, err = ts.IndexStore().Has(pi)
has, err = ts.ReadOnly().IndexStore().Has(pi)
if err != nil {
t.Fatalf("Has(...): unexpected error: %v", err)
}
Expand Down
4 changes: 1 addition & 3 deletions pkg/storer/migration/step_04_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,7 @@ func Test_Step_04(t *testing.T) {
sharkyDir := t.TempDir()
sharkyStore, err := sharky.New(&dirFS{basedir: sharkyDir}, 1, swarm.SocMaxChunkSize)
assert.NoError(t, err)

store := inmemstore.New()

storage := transaction.NewStorage(sharkyStore, store)

stepFn := localmigration.Step_04(sharkyDir, 1, storage)
Expand All @@ -58,7 +56,7 @@ func Test_Step_04(t *testing.T) {
assert.NoError(t, err)
}

err = sharkyStore.Close()
err = storage.Close()
assert.NoError(t, err)

assert.NoError(t, stepFn())
Expand Down
30 changes: 15 additions & 15 deletions pkg/storer/migration/step_05.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,28 @@
package migration

import (
"context"
"fmt"
"os"

"github.com/ethersphere/bee/pkg/log"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storer/internal/transaction"
"github.com/ethersphere/bee/pkg/storer/internal/upload"
)

// step_05 is a migration step that removes all upload items from the store.
func step_05(st storage.BatchedStore) error {
func step_05(st transaction.Storage) error {
logger := log.NewLogger("migration-step-05", log.WithSink(os.Stdout))
logger.Info("start removing upload items")

itemC := make(chan storage.Item)
errC := make(chan error)
go func() {
for item := range itemC {
err := st.Delete(item)
err := st.Run(context.Background(), func(s transaction.Store) error {
return s.IndexStore().Delete(item)
})
if err != nil {
errC <- fmt.Errorf("delete upload item: %w", err)
return
Expand All @@ -31,23 +35,19 @@ func step_05(st storage.BatchedStore) error {
close(errC)
}()

go func() {
defer close(itemC)
err := upload.IterateAll(st, func(u storage.Item) (bool, error) {
itemC <- u
return false, nil
})
if err != nil {
errC <- fmt.Errorf("iterate upload items: %w", err)
return
err := upload.IterateAll(st.ReadOnly().IndexStore(), func(u storage.Item) (bool, error) {
select {
case itemC <- u:
case err := <-errC:
return true, err
}
}()

err := <-errC
return false, nil
})
close(itemC)
if err != nil {
return err
}

logger.Info("finished removing upload items")
return nil
return <-errC
}
Loading

0 comments on commit 6ff67c1

Please sign in to comment.