Skip to content

Commit

Permalink
fix: upload store fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
acha-bill committed Jan 31, 2024
1 parent cffd1cc commit 9fcd900
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 74 deletions.
173 changes: 99 additions & 74 deletions pkg/storer/internal/upload/uploadstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/ethersphere/bee/pkg/storage/storageutil"
"github.com/ethersphere/bee/pkg/storer/internal"
"github.com/ethersphere/bee/pkg/storer/internal/chunkstamp"
"github.com/ethersphere/bee/pkg/storer/internal/stampindex"
"github.com/ethersphere/bee/pkg/swarm"
)

Expand Down Expand Up @@ -220,7 +219,7 @@ var (
)

// uploadItemSize is the size of a marshaled uploadItem.
const uploadItemSize = 3 * 8
const uploadItemSize = 3*8 + 2*swarm.HashSize

var _ storage.Item = (*uploadItem)(nil)

Expand All @@ -246,9 +245,6 @@ func (i uploadItem) Namespace() string {
// Marshal implements the storage.Item interface.
// If the Address is zero, an error is returned.
func (i uploadItem) Marshal() ([]byte, error) {
// Address and BatchID are not part of the marshaled payload. But they are used
// in they key and hence are required. The Marshaling is done when item is to
// be stored, so we return errors for these cases.
if i.Address.IsZero() {
return nil, errUploadItemMarshalAddressIsZero
}
Expand All @@ -259,6 +255,8 @@ func (i uploadItem) Marshal() ([]byte, error) {
binary.LittleEndian.PutUint64(buf, i.TagID)
binary.LittleEndian.PutUint64(buf[8:], uint64(i.Uploaded))
binary.LittleEndian.PutUint64(buf[16:], uint64(i.Synced))
copy(buf[24:], i.Address.Bytes())
copy(buf[24+swarm.HashSize:24+2*swarm.HashSize], i.BatchID)
return buf, nil
}

Expand All @@ -268,11 +266,12 @@ func (i *uploadItem) Unmarshal(bytes []byte) error {
if len(bytes) != uploadItemSize {
return errUploadItemUnmarshalInvalidSize
}
// The Address and BatchID are required for the key, so it is assumed that
// they will be filled already. We reuse them during unmarshaling.

i.TagID = binary.LittleEndian.Uint64(bytes[:8])
i.Uploaded = int64(binary.LittleEndian.Uint64(bytes[8:16]))
i.Synced = int64(binary.LittleEndian.Uint64(bytes[16:]))
i.Address = swarm.NewAddress(append(make([]byte, 0, swarm.HashSize), bytes[24:24+swarm.HashSize]...))
i.BatchID = append(make([]byte, 0, swarm.HashSize), bytes[24+swarm.HashSize:24+2*swarm.HashSize]...)
return nil
}

Expand Down Expand Up @@ -351,10 +350,6 @@ func (i dirtyTagItem) String() string {
return storageutil.JoinFields(i.Namespace(), i.ID())
}

// stampIndexUploadNamespace represents the
// namespace name of the stamp index for upload.
const stampIndexUploadNamespace = "upload"

var (
// errPutterAlreadyClosed is returned when trying to Put a new chunk
// after the putter has been closed.
Expand Down Expand Up @@ -420,28 +415,6 @@ func (u *uploadPutter) Put(ctx context.Context, s internal.Storage, writer stora
return nil
}

switch item, loaded, err := stampindex.LoadOrStore(
s.IndexStore(),
writer,
stampIndexUploadNamespace,
chunk,
); {
case err != nil:
return fmt.Errorf("load or store stamp index for chunk %v has fail: %w", chunk, err)
case loaded && item.ChunkIsImmutable:
return errOverwriteOfImmutableBatch
case loaded && !item.ChunkIsImmutable:
prev := binary.BigEndian.Uint64(item.StampTimestamp)
curr := binary.BigEndian.Uint64(chunk.Stamp().Timestamp())
if prev > curr {
return errOverwriteOfNewerBatch
}
err = stampindex.Store(writer, stampIndexUploadNamespace, chunk)
if err != nil {
return fmt.Errorf("failed updating stamp index: %w", err)
}
}

u.split++

if err := s.ChunkStore().Put(ctx, chunk); err != nil {
Expand Down Expand Up @@ -499,6 +472,16 @@ func (u *uploadPutter) Close(s internal.Storage, writer storage.Writer, addr swa
return fmt.Errorf("failed storing tag: %w", err)
}

ctx := context.Background()
err = u.removePushItems(ctx, s)
if err != nil {
return fmt.Errorf("remove push items: %w", err)
}
err = u.removeUploadItems(ctx, s)
if err != nil {
return fmt.Errorf("remove upload items: %w", err)
}

err = writer.Delete(&dirtyTagItem{TagID: u.tagID})
if err != nil {
return fmt.Errorf("failed deleting dirty tag: %w", err)
Expand All @@ -509,62 +492,104 @@ func (u *uploadPutter) Close(s internal.Storage, writer storage.Writer, addr swa
return nil
}

func (u *uploadPutter) Cleanup(tx internal.TxExecutor) error {
func (u *uploadPutter) removeUploadItems(ctx context.Context, s internal.Storage) error {
if u.closed {
return nil
}

itemsToDelete := make([]*pushItem, 0)

err := tx.Execute(context.Background(), func(s internal.Storage) error {
di := &dirtyTagItem{TagID: u.tagID}
err := s.IndexStore().Get(di)
if err != nil {
return fmt.Errorf("failed reading dirty tag while cleaning up: %w", err)
}
itemsToDelete := make([]storage.Item, 0)
err := s.IndexStore().Iterate(
storage.Query{
Factory: func() storage.Item { return &uploadItem{} },
},
func(res storage.Result) (bool, error) {
ui := res.Entry.(*uploadItem)
if ui.TagID == u.tagID {
itemsToDelete = append(itemsToDelete, ui)
}
return false, nil
},
)
if err != nil {
return fmt.Errorf("iterating upload items: %w", err)
}

return s.IndexStore().Iterate(
storage.Query{
Factory: func() storage.Item { return &pushItem{} },
PrefixAtStart: true,
Prefix: fmt.Sprintf("%d", di.Started),
},
func(res storage.Result) (bool, error) {
pi := res.Entry.(*pushItem)
if pi.TagID == u.tagID {
itemsToDelete = append(itemsToDelete, pi)
}
return false, nil
},
)
err = u.batchApply(ctx, s, itemsToDelete, func(b storage.Batch, si storage.Item) {
_ = b.Delete(si)
})
if err != nil {
return fmt.Errorf("failed iterating over push items: %w", err)
return fmt.Errorf("batch delete: %w", err)
}
return nil
}

batchCnt := 1000
for i := 0; i < len(itemsToDelete); i += batchCnt {
err = tx.Execute(context.Background(), func(st internal.Storage) error {
func (u *uploadPutter) removePushItems(ctx context.Context, s internal.Storage) error {
if u.closed {
return nil
}

b, err := st.IndexStore().Batch(context.Background())
if err != nil {
return err
}
di := &dirtyTagItem{TagID: u.tagID}
err := s.IndexStore().Get(di)
if err != nil {
return fmt.Errorf("failed reading dirty tag while cleaning up: %w", err)
}

end := i + batchCnt
if end > len(itemsToDelete) {
end = len(itemsToDelete)
}
for _, pi := range itemsToDelete[i:end] {
_ = remove(st, b, pi.Address, pi.BatchID)
_ = b.Delete(pi)
itemsToDelete := make([]storage.Item, 0)
err = s.IndexStore().Iterate(
storage.Query{
Factory: func() storage.Item { return &pushItem{} },
PrefixAtStart: true,
Prefix: fmt.Sprintf("%d", di.Started),
},
func(res storage.Result) (bool, error) {
pi := res.Entry.(*pushItem)
if pi.TagID == u.tagID {
itemsToDelete = append(itemsToDelete, pi)
}
return b.Commit()
})
return false, nil
},
)
if err != nil {
return fmt.Errorf("iterating push items: %w", err)
}

return u.batchApply(ctx, s, itemsToDelete, func(b storage.Batch, si storage.Item) {
pi := si.(*pushItem)
_ = remove(s, b, pi.Address, pi.BatchID)
_ = b.Delete(si)
})
}

func (u *uploadPutter) batchApply(ctx context.Context, s internal.Storage, items []storage.Item, f func(b storage.Batch, si storage.Item)) error {
batchCnt := 1000
for i := 0; i < len(items); i += batchCnt {
b, err := s.IndexStore().Batch(ctx)
if err != nil {
return fmt.Errorf("new batch: %w", err)
}
end := i + batchCnt
if end > len(items) {
end = len(items)
}
for _, ui := range items[i:end] {
f(b, ui)
}
err = b.Commit()
if err != nil {
return fmt.Errorf("failed deleting push items: %w", err)
return fmt.Errorf("commit: %w", err)
}
}
return nil
}

func (u *uploadPutter) Cleanup(tx internal.TxExecutor) error {
ctx := context.Background()
err := tx.Execute(ctx, func(s internal.Storage) error {
return u.removePushItems(ctx, s)
})
if err != nil {
return err
}

return tx.Execute(context.Background(), func(tx internal.Storage) error {
return tx.IndexStore().Delete(&dirtyTagItem{TagID: u.tagID})
Expand Down
40 changes: 40 additions & 0 deletions pkg/storer/internal/upload/uploadstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,46 @@ func TestChunkPutter(t *testing.T) {
if diff := cmp.Diff(wantTI, ti); diff != "" {
t.Fatalf("Get(...): unexpected TagItem (-want +have):\n%s", diff)
}

var uploadItemsCount int
err = ts.IndexStore().Iterate(
storage.Query{
Factory: func() storage.Item { return &upload.UploadItem{} },
},
func(res storage.Result) (bool, error) {
pi := res.Entry.(*upload.UploadItem)
if pi.TagID == tag.TagID {
uploadItemsCount++
}
return false, nil
},
)
if err != nil {
t.Fatalf("iterate upload items: %v", err)
}
if uploadItemsCount != 0 {
t.Fatalf("want %d upload items. got %d", 0, uploadItemsCount)
}

var pushItemsCount int
err = ts.IndexStore().Iterate(
storage.Query{
Factory: func() storage.Item { return &upload.PushItem{} },
},
func(res storage.Result) (bool, error) {
pi := res.Entry.(*upload.PushItem)
if pi.TagID == tag.TagID {
pushItemsCount++
}
return false, nil
},
)
if err != nil {
t.Fatalf("iterate push items: %v", err)
}
if pushItemsCount != 0 {
t.Fatalf("want %d push items. got %d", 0, pushItemsCount)
}
})

t.Run("error after close", func(t *testing.T) {
Expand Down

0 comments on commit 9fcd900

Please sign in to comment.