From 9fcd900685171d880ec23c384e272c4a95f7aa82 Mon Sep 17 00:00:00 2001 From: Acha Bill Date: Wed, 31 Jan 2024 05:50:09 +0100 Subject: [PATCH] fix: upload store fixes --- pkg/storer/internal/upload/uploadstore.go | 173 ++++++++++-------- .../internal/upload/uploadstore_test.go | 40 ++++ 2 files changed, 139 insertions(+), 74 deletions(-) diff --git a/pkg/storer/internal/upload/uploadstore.go b/pkg/storer/internal/upload/uploadstore.go index 1126770176a..ace90198c53 100644 --- a/pkg/storer/internal/upload/uploadstore.go +++ b/pkg/storer/internal/upload/uploadstore.go @@ -17,7 +17,6 @@ import ( "github.com/ethersphere/bee/pkg/storage/storageutil" "github.com/ethersphere/bee/pkg/storer/internal" "github.com/ethersphere/bee/pkg/storer/internal/chunkstamp" - "github.com/ethersphere/bee/pkg/storer/internal/stampindex" "github.com/ethersphere/bee/pkg/swarm" ) @@ -220,7 +219,7 @@ var ( ) // uploadItemSize is the size of a marshaled uploadItem. -const uploadItemSize = 3 * 8 +const uploadItemSize = 3*8 + 2*swarm.HashSize var _ storage.Item = (*uploadItem)(nil) @@ -246,9 +245,6 @@ func (i uploadItem) Namespace() string { // Marshal implements the storage.Item interface. // If the Address is zero, an error is returned. func (i uploadItem) Marshal() ([]byte, error) { - // Address and BatchID are not part of the marshaled payload. But they are used - // in they key and hence are required. The Marshaling is done when item is to - // be stored, so we return errors for these cases. if i.Address.IsZero() { return nil, errUploadItemMarshalAddressIsZero } @@ -259,6 +255,8 @@ func (i uploadItem) Marshal() ([]byte, error) { binary.LittleEndian.PutUint64(buf, i.TagID) binary.LittleEndian.PutUint64(buf[8:], uint64(i.Uploaded)) binary.LittleEndian.PutUint64(buf[16:], uint64(i.Synced)) + copy(buf[24:], i.Address.Bytes()) + copy(buf[24+swarm.HashSize:24+2*swarm.HashSize], i.BatchID) return buf, nil } @@ -268,11 +266,12 @@ func (i *uploadItem) Unmarshal(bytes []byte) error { if len(bytes) != uploadItemSize { return errUploadItemUnmarshalInvalidSize } - // The Address and BatchID are required for the key, so it is assumed that - // they will be filled already. We reuse them during unmarshaling. + i.TagID = binary.LittleEndian.Uint64(bytes[:8]) i.Uploaded = int64(binary.LittleEndian.Uint64(bytes[8:16])) i.Synced = int64(binary.LittleEndian.Uint64(bytes[16:])) + i.Address = swarm.NewAddress(append(make([]byte, 0, swarm.HashSize), bytes[24:24+swarm.HashSize]...)) + i.BatchID = append(make([]byte, 0, swarm.HashSize), bytes[24+swarm.HashSize:24+2*swarm.HashSize]...) return nil } @@ -351,10 +350,6 @@ func (i dirtyTagItem) String() string { return storageutil.JoinFields(i.Namespace(), i.ID()) } -// stampIndexUploadNamespace represents the -// namespace name of the stamp index for upload. -const stampIndexUploadNamespace = "upload" - var ( // errPutterAlreadyClosed is returned when trying to Put a new chunk // after the putter has been closed. @@ -420,28 +415,6 @@ func (u *uploadPutter) Put(ctx context.Context, s internal.Storage, writer stora return nil } - switch item, loaded, err := stampindex.LoadOrStore( - s.IndexStore(), - writer, - stampIndexUploadNamespace, - chunk, - ); { - case err != nil: - return fmt.Errorf("load or store stamp index for chunk %v has fail: %w", chunk, err) - case loaded && item.ChunkIsImmutable: - return errOverwriteOfImmutableBatch - case loaded && !item.ChunkIsImmutable: - prev := binary.BigEndian.Uint64(item.StampTimestamp) - curr := binary.BigEndian.Uint64(chunk.Stamp().Timestamp()) - if prev > curr { - return errOverwriteOfNewerBatch - } - err = stampindex.Store(writer, stampIndexUploadNamespace, chunk) - if err != nil { - return fmt.Errorf("failed updating stamp index: %w", err) - } - } - u.split++ if err := s.ChunkStore().Put(ctx, chunk); err != nil { @@ -499,6 +472,16 @@ func (u *uploadPutter) Close(s internal.Storage, writer storage.Writer, addr swa return fmt.Errorf("failed storing tag: %w", err) } + ctx := context.Background() + err = u.removePushItems(ctx, s) + if err != nil { + return fmt.Errorf("remove push items: %w", err) + } + err = u.removeUploadItems(ctx, s) + if err != nil { + return fmt.Errorf("remove upload items: %w", err) + } + err = writer.Delete(&dirtyTagItem{TagID: u.tagID}) if err != nil { return fmt.Errorf("failed deleting dirty tag: %w", err) @@ -509,62 +492,104 @@ func (u *uploadPutter) Close(s internal.Storage, writer storage.Writer, addr swa return nil } -func (u *uploadPutter) Cleanup(tx internal.TxExecutor) error { +func (u *uploadPutter) removeUploadItems(ctx context.Context, s internal.Storage) error { if u.closed { return nil } - itemsToDelete := make([]*pushItem, 0) - - err := tx.Execute(context.Background(), func(s internal.Storage) error { - di := &dirtyTagItem{TagID: u.tagID} - err := s.IndexStore().Get(di) - if err != nil { - return fmt.Errorf("failed reading dirty tag while cleaning up: %w", err) - } + itemsToDelete := make([]storage.Item, 0) + err := s.IndexStore().Iterate( + storage.Query{ + Factory: func() storage.Item { return &uploadItem{} }, + }, + func(res storage.Result) (bool, error) { + ui := res.Entry.(*uploadItem) + if ui.TagID == u.tagID { + itemsToDelete = append(itemsToDelete, ui) + } + return false, nil + }, + ) + if err != nil { + return fmt.Errorf("iterating upload items: %w", err) + } - return s.IndexStore().Iterate( - storage.Query{ - Factory: func() storage.Item { return &pushItem{} }, - PrefixAtStart: true, - Prefix: fmt.Sprintf("%d", di.Started), - }, - func(res storage.Result) (bool, error) { - pi := res.Entry.(*pushItem) - if pi.TagID == u.tagID { - itemsToDelete = append(itemsToDelete, pi) - } - return false, nil - }, - ) + err = u.batchApply(ctx, s, itemsToDelete, func(b storage.Batch, si storage.Item) { + _ = b.Delete(si) }) if err != nil { - return fmt.Errorf("failed iterating over push items: %w", err) + return fmt.Errorf("batch delete: %w", err) } + return nil +} - batchCnt := 1000 - for i := 0; i < len(itemsToDelete); i += batchCnt { - err = tx.Execute(context.Background(), func(st internal.Storage) error { +func (u *uploadPutter) removePushItems(ctx context.Context, s internal.Storage) error { + if u.closed { + return nil + } - b, err := st.IndexStore().Batch(context.Background()) - if err != nil { - return err - } + di := &dirtyTagItem{TagID: u.tagID} + err := s.IndexStore().Get(di) + if err != nil { + return fmt.Errorf("failed reading dirty tag while cleaning up: %w", err) + } - end := i + batchCnt - if end > len(itemsToDelete) { - end = len(itemsToDelete) - } - for _, pi := range itemsToDelete[i:end] { - _ = remove(st, b, pi.Address, pi.BatchID) - _ = b.Delete(pi) + itemsToDelete := make([]storage.Item, 0) + err = s.IndexStore().Iterate( + storage.Query{ + Factory: func() storage.Item { return &pushItem{} }, + PrefixAtStart: true, + Prefix: fmt.Sprintf("%d", di.Started), + }, + func(res storage.Result) (bool, error) { + pi := res.Entry.(*pushItem) + if pi.TagID == u.tagID { + itemsToDelete = append(itemsToDelete, pi) } - return b.Commit() - }) + return false, nil + }, + ) + if err != nil { + return fmt.Errorf("iterating push items: %w", err) + } + + return u.batchApply(ctx, s, itemsToDelete, func(b storage.Batch, si storage.Item) { + pi := si.(*pushItem) + _ = remove(s, b, pi.Address, pi.BatchID) + _ = b.Delete(si) + }) +} + +func (u *uploadPutter) batchApply(ctx context.Context, s internal.Storage, items []storage.Item, f func(b storage.Batch, si storage.Item)) error { + batchCnt := 1000 + for i := 0; i < len(items); i += batchCnt { + b, err := s.IndexStore().Batch(ctx) + if err != nil { + return fmt.Errorf("new batch: %w", err) + } + end := i + batchCnt + if end > len(items) { + end = len(items) + } + for _, ui := range items[i:end] { + f(b, ui) + } + err = b.Commit() if err != nil { - return fmt.Errorf("failed deleting push items: %w", err) + return fmt.Errorf("commit: %w", err) } } + return nil +} + +func (u *uploadPutter) Cleanup(tx internal.TxExecutor) error { + ctx := context.Background() + err := tx.Execute(ctx, func(s internal.Storage) error { + return u.removePushItems(ctx, s) + }) + if err != nil { + return err + } return tx.Execute(context.Background(), func(tx internal.Storage) error { return tx.IndexStore().Delete(&dirtyTagItem{TagID: u.tagID}) diff --git a/pkg/storer/internal/upload/uploadstore_test.go b/pkg/storer/internal/upload/uploadstore_test.go index 569b8c7999e..182e7188c1a 100644 --- a/pkg/storer/internal/upload/uploadstore_test.go +++ b/pkg/storer/internal/upload/uploadstore_test.go @@ -573,6 +573,46 @@ func TestChunkPutter(t *testing.T) { if diff := cmp.Diff(wantTI, ti); diff != "" { t.Fatalf("Get(...): unexpected TagItem (-want +have):\n%s", diff) } + + var uploadItemsCount int + err = ts.IndexStore().Iterate( + storage.Query{ + Factory: func() storage.Item { return &upload.UploadItem{} }, + }, + func(res storage.Result) (bool, error) { + pi := res.Entry.(*upload.UploadItem) + if pi.TagID == tag.TagID { + uploadItemsCount++ + } + return false, nil + }, + ) + if err != nil { + t.Fatalf("iterate upload items: %v", err) + } + if uploadItemsCount != 0 { + t.Fatalf("want %d upload items. got %d", 0, uploadItemsCount) + } + + var pushItemsCount int + err = ts.IndexStore().Iterate( + storage.Query{ + Factory: func() storage.Item { return &upload.PushItem{} }, + }, + func(res storage.Result) (bool, error) { + pi := res.Entry.(*upload.PushItem) + if pi.TagID == tag.TagID { + pushItemsCount++ + } + return false, nil + }, + ) + if err != nil { + t.Fatalf("iterate push items: %v", err) + } + if pushItemsCount != 0 { + t.Fatalf("want %d push items. got %d", 0, pushItemsCount) + } }) t.Run("error after close", func(t *testing.T) {