Skip to content

Commit

Permalink
fix: uploadstore (#4434)
Browse files Browse the repository at this point in the history
  • Loading branch information
istae authored Oct 30, 2023
1 parent 6cac44c commit b0836ff
Show file tree
Hide file tree
Showing 10 changed files with 186 additions and 236 deletions.
5 changes: 0 additions & 5 deletions pkg/storage/chunkstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,6 @@ import (
"github.com/ethersphere/bee/pkg/swarm"
)

var (
// ErrNoStampsForChunk is returned when chunk was found but there were no associated stamps.
ErrNoStampsForChunk = fmt.Errorf("no stamp for existing chunk: %w", ErrNotFound)
)

// Getter is the interface that wraps the basic Get method.
type Getter interface {
// Get a chunk by its swarm.Address. Returns the chunk associated with
Expand Down
9 changes: 2 additions & 7 deletions pkg/storer/internal/chunkstamp/chunkstamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ func Load(s storage.Reader, namespace string, addr swarm.Address) (swarm.Stamp,
func LoadWithBatchID(s storage.Reader, namespace string, addr swarm.Address, batchID []byte) (swarm.Stamp, error) {
var stamp swarm.Stamp

cnt := 0
found := false
err := s.Iterate(
storage.Query{
Expand All @@ -161,7 +160,6 @@ func LoadWithBatchID(s storage.Reader, namespace string, addr swarm.Address, bat
},
},
func(res storage.Result) (bool, error) {
cnt++
item := res.Entry.(*item)
if batchID == nil || bytes.Equal(batchID, item.stamp.BatchID()) {
stamp = item.stamp
Expand All @@ -174,9 +172,6 @@ func LoadWithBatchID(s storage.Reader, namespace string, addr swarm.Address, bat
if err != nil {
return nil, err
}
if cnt == 0 {
return nil, storage.ErrNoStampsForChunk
}
if !found {
return nil, fmt.Errorf("stamp not found for batchID %x: %w", batchID, storage.ErrNotFound)
}
Expand Down Expand Up @@ -234,15 +229,15 @@ func DeleteAll(s storage.Store, namespace string, addr swarm.Address) error {
}

// Delete removes a stamp associated with an chunk and batchID.
func Delete(s storage.Store, namespace string, addr swarm.Address, batchId []byte) error {
func Delete(s storage.Store, batch storage.Writer, namespace string, addr swarm.Address, batchId []byte) error {
stamp, err := LoadWithBatchID(s, namespace, addr, batchId)
if err != nil {
if errors.Is(err, storage.ErrNotFound) {
return nil
}
return err
}
return s.Delete(&item{
return batch.Delete(&item{
namespace: []byte(namespace),
address: addr,
stamp: stamp,
Expand Down
2 changes: 1 addition & 1 deletion pkg/storer/internal/chunkstamp/chunkstamp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func TestStoreLoadDelete(t *testing.T) {

t.Run("delete stored stamp", func(t *testing.T) {
if i%2 == 0 {
if err := chunkstamp.Delete(ts.IndexStore(), ns, chunk.Address(), chunk.Stamp().BatchID()); err != nil {
if err := chunkstamp.Delete(ts.IndexStore(), ts.IndexStore(), ns, chunk.Address(), chunk.Stamp().BatchID()); err != nil {
t.Fatalf("Delete(...): unexpected error: %v", err)
}
} else {
Expand Down
47 changes: 16 additions & 31 deletions pkg/storer/internal/upload/uploadstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"errors"
"fmt"
"strconv"
"sync"
"time"

"github.com/ethersphere/bee/pkg/encryption"
Expand Down Expand Up @@ -372,7 +371,6 @@ var (

type uploadPutter struct {
tagID uint64
mtx sync.Mutex
split uint64
seen uint64
closed bool
Expand Down Expand Up @@ -404,9 +402,6 @@ func NewPutter(s internal.Storage, tagID uint64) (internal.PutterCloserWithRefer
// - pushItem entry to make it available for PushSubscriber
// - add chunk to the chunkstore till it is synced
func (u *uploadPutter) Put(ctx context.Context, s internal.Storage, writer storage.Writer, chunk swarm.Chunk) error {
u.mtx.Lock()
defer u.mtx.Unlock()

if u.closed {
return errPutterAlreadyClosed
}
Expand Down Expand Up @@ -482,9 +477,6 @@ func (u *uploadPutter) Put(ctx context.Context, s internal.Storage, writer stora
// the tags. It will update the tag. This will be filled with the Split and Seen count
// by the Putter.
func (u *uploadPutter) Close(s internal.Storage, writer storage.Writer, addr swarm.Address) error {
u.mtx.Lock()
defer u.mtx.Unlock()

if u.closed {
return nil
}
Expand Down Expand Up @@ -518,9 +510,6 @@ func (u *uploadPutter) Close(s internal.Storage, writer storage.Writer, addr swa
}

func (u *uploadPutter) Cleanup(tx internal.TxExecutor) error {
u.mtx.Lock()
defer u.mtx.Unlock()

if u.closed {
return nil
}
Expand Down Expand Up @@ -645,19 +634,10 @@ func CleanupDirty(tx internal.TxExecutor) error {
return nil
}

type pushReporter struct {
s internal.Storage
}

// NewPushReporter returns a new storage.PushReporter which can be used by the
// pusher component to report chunk state information.
func NewPushReporter(s internal.Storage) storage.PushReporter {
return &pushReporter{s: s}
}

// Report is the implementation of the PushReporter interface.
func (p *pushReporter) Report(
func Report(
ctx context.Context,
s internal.Storage,
chunk swarm.Chunk,
state storage.ChunkState,
) error {
Expand All @@ -666,7 +646,7 @@ func (p *pushReporter) Report(
BatchID: chunk.Stamp().BatchID(),
}

err := p.s.IndexStore().Get(ui)
err := s.IndexStore().Get(ui)
if err != nil {
return fmt.Errorf("failed to read uploadItem %s: %w", ui, err)
}
Expand All @@ -675,7 +655,7 @@ func (p *pushReporter) Report(
TagID: ui.TagID,
}

err = p.s.IndexStore().Get(ti)
err = s.IndexStore().Get(ti)
if err != nil {
return fmt.Errorf("failed getting tag: %w", err)
}
Expand All @@ -693,13 +673,18 @@ func (p *pushReporter) Report(
break
}

err = p.s.IndexStore().Put(ti)
batch, err := s.IndexStore().Batch(ctx)
if err != nil {
return err
}

err = batch.Put(ti)
if err != nil {
return fmt.Errorf("failed updating tag: %w", err)
}

if state == storage.ChunkSent {
return nil
return batch.Commit()
}

// Once the chunk is stored/synced/failed to sync, it is deleted from the upload store as
Expand All @@ -711,28 +696,28 @@ func (p *pushReporter) Report(
BatchID: chunk.Stamp().BatchID(),
}

err = p.s.IndexStore().Delete(pi)
err = batch.Delete(pi)
if err != nil {
return fmt.Errorf("failed deleting pushItem %s: %w", pi, err)
}

err = chunkstamp.Delete(p.s.IndexStore(), chunkStampNamespace, pi.Address, pi.BatchID)
err = chunkstamp.Delete(s.IndexStore(), batch, chunkStampNamespace, pi.Address, pi.BatchID)
if err != nil {
return fmt.Errorf("failed deleting chunk stamp %x: %w", pi.BatchID, err)
}

err = p.s.ChunkStore().Delete(ctx, chunk.Address())
err = s.ChunkStore().Delete(ctx, chunk.Address())
if err != nil {
return fmt.Errorf("failed deleting chunk %s: %w", chunk.Address(), err)
}

ui.Synced = now().UnixNano()
err = p.s.IndexStore().Put(ui)
err = batch.Put(ui)
if err != nil {
return fmt.Errorf("failed updating uploadItem %s: %w", ui, err)
}

return nil
return batch.Commit()
}

var (
Expand Down
10 changes: 4 additions & 6 deletions pkg/storer/internal/upload/uploadstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,8 +636,6 @@ func TestChunkReporter(t *testing.T) {
t.Fatalf("failed creating putter: %v", err)
}

reporter := upload.NewPushReporter(ts)

for idx, chunk := range chunktest.GenerateTestRandomChunks(10) {
t.Run(fmt.Sprintf("chunk %s", chunk.Address()), func(t *testing.T) {
err := putter.Put(context.Background(), ts, ts.IndexStore(), chunk)
Expand All @@ -646,15 +644,15 @@ func TestChunkReporter(t *testing.T) {
}

t.Run("mark sent", func(t *testing.T) {
err := reporter.Report(context.Background(), chunk, storage.ChunkSent)
err := upload.Report(context.Background(), ts, chunk, storage.ChunkSent)
if err != nil {
t.Fatalf("Report(...): unexpected error: %v", err)
}
})

if idx < 4 {
t.Run("mark stored", func(t *testing.T) {
err := reporter.Report(context.Background(), chunk, storage.ChunkStored)
err := upload.Report(context.Background(), ts, chunk, storage.ChunkStored)
if err != nil {
t.Fatalf("Report(...): unexpected error: %v", err)
}
Expand All @@ -663,7 +661,7 @@ func TestChunkReporter(t *testing.T) {

if idx >= 4 && idx < 8 {
t.Run("mark synced", func(t *testing.T) {
err := reporter.Report(context.Background(), chunk, storage.ChunkSynced)
err := upload.Report(context.Background(), ts, chunk, storage.ChunkSynced)
if err != nil {
t.Fatalf("Report(...): unexpected error: %v", err)
}
Expand All @@ -672,7 +670,7 @@ func TestChunkReporter(t *testing.T) {

if idx >= 8 {
t.Run("mark could not sync", func(t *testing.T) {
err := reporter.Report(context.Background(), chunk, storage.ChunkCouldNotSync)
err := upload.Report(context.Background(), ts, chunk, storage.ChunkCouldNotSync)
if err != nil {
t.Fatalf("Report(...): unexpected error: %v", err)
}
Expand Down
33 changes: 0 additions & 33 deletions pkg/storer/reporter.go

This file was deleted.

Loading

0 comments on commit b0836ff

Please sign in to comment.