From c0e292223960fe638282c8d613830a2cee62ace7 Mon Sep 17 00:00:00 2001 From: Acha Bill <57879913+acha-bill@users.noreply.github.com> Date: Wed, 13 Nov 2024 03:53:39 -0500 Subject: [PATCH] feat: chunkstore replace unit test & soc pinning (#4902) --- openapi/Swarm.yaml | 2 - pkg/api/accesscontrol_test.go | 15 +++- pkg/api/api_test.go | 10 +++ pkg/api/soc.go | 39 +++------- pkg/api/soc_test.go | 12 ++- .../internal/chunkstore/chunkstore_test.go | 74 +++++++++++++++++++ pkg/storer/mock/mockstorer.go | 8 +- 7 files changed, 123 insertions(+), 37 deletions(-) diff --git a/openapi/Swarm.yaml b/openapi/Swarm.yaml index c2a08d6796d..5aca29f9e22 100644 --- a/openapi/Swarm.yaml +++ b/openapi/Swarm.yaml @@ -869,8 +869,6 @@ paths: schema: $ref: "SwarmCommon.yaml#/components/parameters/SwarmPostageBatchId" required: true - - $ref: "SwarmCommon.yaml#/components/parameters/SwarmPinParameter" - required: false - $ref: "SwarmCommon.yaml#/components/parameters/SwarmPostageStamp" - $ref: "SwarmCommon.yaml#/components/parameters/SwarmAct" - $ref: "SwarmCommon.yaml#/components/parameters/SwarmActHistoryAddress" diff --git a/pkg/api/accesscontrol_test.go b/pkg/api/accesscontrol_test.go index 509a2cea1b8..c00318c1afb 100644 --- a/pkg/api/accesscontrol_test.go +++ b/pkg/api/accesscontrol_test.go @@ -100,6 +100,7 @@ func TestAccessLogicEachEndpointWithAct(t *testing.T) { resp struct { Reference swarm.Address `json:"reference"` } + direct bool }{ { name: "bzz", @@ -159,6 +160,7 @@ func TestAccessLogicEachEndpointWithAct(t *testing.T) { data: bytes.NewReader(sch.WrappedChunk.Data()), expdata: sch.Chunk().Data(), contenttype: "binary/octet-stream", + direct: true, }, } @@ -183,13 +185,24 @@ func TestAccessLogicEachEndpointWithAct(t *testing.T) { upTestOpts = append(upTestOpts, jsonhttptest.WithRequestHeader(api.SwarmCollectionHeader, "True")) } t.Run(v.name, func(t *testing.T) { - client, _, _, _ := newTestServer(t, testServerOptions{ + client, _, _, chanStore := newTestServer(t, testServerOptions{ Storer: storerMock, Logger: logger, Post: mockpost.New(mockpost.WithAcceptAll()), PublicKey: pk.PublicKey, AccessControl: mockac.New(), + DirectUpload: v.direct, }) + + if chanStore != nil { + chanStore.Subscribe(func(chunk swarm.Chunk) { + err := storerMock.Put(context.Background(), chunk) + if err != nil { + t.Fatal(err) + } + }) + } + header := jsonhttptest.Request(t, client, http.MethodPost, v.upurl, http.StatusCreated, upTestOpts..., ) diff --git a/pkg/api/api_test.go b/pkg/api/api_test.go index bb9ec9ad621..63f366615f3 100644 --- a/pkg/api/api_test.go +++ b/pkg/api/api_test.go @@ -633,6 +633,7 @@ type chanStorer struct { lock sync.Mutex chunks map[string]struct{} quit chan struct{} + subs []func(chunk swarm.Chunk) } func newChanStore(cc <-chan *pusher.Op) *chanStorer { @@ -650,6 +651,9 @@ func (c *chanStorer) drain(cc <-chan *pusher.Op) { case op := <-cc: c.lock.Lock() c.chunks[op.Chunk.Address().ByteString()] = struct{}{} + for _, h := range c.subs { + h(op.Chunk) + } c.lock.Unlock() op.Err <- nil case <-c.quit: @@ -670,6 +674,12 @@ func (c *chanStorer) Has(addr swarm.Address) bool { return ok } +func (c *chanStorer) Subscribe(f func(chunk swarm.Chunk)) { + c.lock.Lock() + defer c.lock.Unlock() + c.subs = append(c.subs, f) +} + func createRedistributionAgentService( t *testing.T, addr swarm.Address, diff --git a/pkg/api/soc.go b/pkg/api/soc.go index d1595d5b811..a30cc1174f6 100644 --- a/pkg/api/soc.go +++ b/pkg/api/soc.go @@ -18,8 +18,7 @@ import ( "github.com/ethersphere/bee/v2/pkg/jsonhttp" "github.com/ethersphere/bee/v2/pkg/postage" "github.com/ethersphere/bee/v2/pkg/soc" - "github.com/ethersphere/bee/v2/pkg/storage" - storer "github.com/ethersphere/bee/v2/pkg/storer" + "github.com/ethersphere/bee/v2/pkg/storer" "github.com/ethersphere/bee/v2/pkg/swarm" "github.com/gorilla/mux" ) @@ -51,7 +50,6 @@ func (s *Service) socUploadHandler(w http.ResponseWriter, r *http.Request) { headers := struct { BatchID []byte `map:"Swarm-Postage-Batch-Id"` StampSig []byte `map:"Swarm-Postage-Stamp"` - Pin bool `map:"Swarm-Pin"` Act bool `map:"Swarm-Act"` HistoryAddress swarm.Address `map:"Swarm-Act-History-Address"` }{} @@ -66,30 +64,11 @@ func (s *Service) socUploadHandler(w http.ResponseWriter, r *http.Request) { return } - // if pinning header is set we do a deferred upload, else we do a direct upload var ( - tag uint64 - err error + putter storer.PutterSession + err error ) - if headers.Pin { - session, err := s.storer.NewSession() - if err != nil { - logger.Debug("get or create tag failed", "error", err) - logger.Error(nil, "get or create tag failed") - switch { - case errors.Is(err, storage.ErrNotFound): - jsonhttp.NotFound(w, "tag not found") - default: - jsonhttp.InternalServerError(w, "cannot get or create tag") - } - return - } - tag = session.TagID - } - - deferred := tag != 0 - var putter storer.PutterSession if len(headers.StampSig) != 0 { stamp := postage.Stamp{} if err := stamp.UnmarshalBinary(headers.StampSig); err != nil { @@ -102,16 +81,16 @@ func (s *Service) socUploadHandler(w http.ResponseWriter, r *http.Request) { putter, err = s.newStampedPutter(r.Context(), putterOptions{ BatchID: stamp.BatchID(), - TagID: tag, - Pin: headers.Pin, - Deferred: deferred, + TagID: 0, + Pin: false, + Deferred: false, }, &stamp) } else { putter, err = s.newStamperPutter(r.Context(), putterOptions{ BatchID: headers.BatchID, - TagID: tag, - Pin: headers.Pin, - Deferred: deferred, + TagID: 0, + Pin: false, + Deferred: false, }) } if err != nil { diff --git a/pkg/api/soc_test.go b/pkg/api/soc_test.go index 344204f06c6..6c0d6fa0449 100644 --- a/pkg/api/soc_test.go +++ b/pkg/api/soc_test.go @@ -6,6 +6,7 @@ package api_test import ( "bytes" + "context" "encoding/hex" "fmt" "io" @@ -74,13 +75,20 @@ func TestSOC(t *testing.T) { t.Run("ok", func(t *testing.T) { s := testingsoc.GenerateMockSOC(t, testData) - client, _, _, _ := newTestServer(t, testServerOptions{ + client, _, _, chanStore := newTestServer(t, testServerOptions{ Storer: mockStorer, Post: newTestPostService(), DirectUpload: true, }) + + chanStore.Subscribe(func(ch swarm.Chunk) { + err := mockStorer.Put(context.Background(), ch) + if err != nil { + t.Fatal(err) + } + }) + jsonhttptest.Request(t, client, http.MethodPost, socResource(hex.EncodeToString(s.Owner), hex.EncodeToString(s.ID), hex.EncodeToString(s.Signature)), http.StatusCreated, - jsonhttptest.WithRequestHeader(api.SwarmPinHeader, "true"), jsonhttptest.WithRequestHeader(api.SwarmPostageBatchIdHeader, batchOkStr), jsonhttptest.WithRequestBody(bytes.NewReader(s.WrappedChunk.Data())), jsonhttptest.WithExpectedJSONResponse(api.SocPostResponse{ diff --git a/pkg/storer/internal/chunkstore/chunkstore_test.go b/pkg/storer/internal/chunkstore/chunkstore_test.go index 9e30c1af876..6787cf826ab 100644 --- a/pkg/storer/internal/chunkstore/chunkstore_test.go +++ b/pkg/storer/internal/chunkstore/chunkstore_test.go @@ -5,6 +5,7 @@ package chunkstore_test import ( + "bytes" "context" "errors" "fmt" @@ -13,7 +14,9 @@ import ( "os" "testing" + "github.com/ethersphere/bee/v2/pkg/crypto" "github.com/ethersphere/bee/v2/pkg/sharky" + soctesting "github.com/ethersphere/bee/v2/pkg/soc/testing" "github.com/ethersphere/bee/v2/pkg/storer/internal/transaction" "github.com/ethersphere/bee/v2/pkg/storage" @@ -336,6 +339,77 @@ func TestChunkStore(t *testing.T) { } }) + t.Run("replace chunk", func(t *testing.T) { + privKey, err := crypto.GenerateSecp256k1Key() + if err != nil { + t.Fatal(err) + } + signer := crypto.NewDefaultSigner(privKey) + ctx := context.Background() + + ch1 := soctesting.GenerateMockSocWithSigner(t, []byte("data"), signer).Chunk() + err = st.Run(context.Background(), func(s transaction.Store) error { + return s.ChunkStore().Put(ctx, ch1) + }) + if err != nil { + t.Fatal(err) + } + + tests := []struct { + data string + emplace bool + wantRefCount uint32 + }{ + { + data: "data1", + emplace: true, + wantRefCount: 2, + }, + { + data: "data2", + emplace: false, + wantRefCount: 2, + }, + { + data: "data3", + emplace: true, + wantRefCount: 3, + }, + } + + for _, tt := range tests { + ch2 := soctesting.GenerateMockSocWithSigner(t, []byte(tt.data), signer).Chunk() + if !ch1.Address().Equal(ch2.Address()) { + t.Fatal("chunk addresses don't match") + } + + err = st.Run(ctx, func(s transaction.Store) error { + return s.ChunkStore().Replace(ctx, ch2, tt.emplace) + }) + if err != nil { + t.Fatal(err) + } + + ch, err := st.ChunkStore().Get(ctx, ch2.Address()) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(ch.Data(), ch2.Data()) { + t.Fatalf("expected data override") + } + + rIdx := &chunkstore.RetrievalIndexItem{Address: ch2.Address()} + err = st.IndexStore().Get(rIdx) + if err != nil { + t.Fatal(err) + } + + if rIdx.RefCnt != tt.wantRefCount { + t.Fatalf("expected ref count %d, got %d", tt.wantRefCount, rIdx.RefCnt) + } + } + }) + t.Run("close store", func(t *testing.T) { err := st.Close() if err != nil { diff --git a/pkg/storer/mock/mockstorer.go b/pkg/storer/mock/mockstorer.go index 6ab457ab759..d0b5b7e6ad0 100644 --- a/pkg/storer/mock/mockstorer.go +++ b/pkg/storer/mock/mockstorer.go @@ -10,9 +10,9 @@ import ( "time" "github.com/ethersphere/bee/v2/pkg/pusher" - storage "github.com/ethersphere/bee/v2/pkg/storage" + "github.com/ethersphere/bee/v2/pkg/storage" "github.com/ethersphere/bee/v2/pkg/storage/inmemchunkstore" - storer "github.com/ethersphere/bee/v2/pkg/storer" + "github.com/ethersphere/bee/v2/pkg/storer" "github.com/ethersphere/bee/v2/pkg/swarm" "go.uber.org/atomic" ) @@ -231,3 +231,7 @@ func (m *mockStorer) DebugInfo(_ context.Context) (storer.Info, error) { func (m *mockStorer) NeighborhoodsStat(ctx context.Context) ([]*storer.NeighborhoodStat, error) { return nil, nil } + +func (m *mockStorer) Put(ctx context.Context, ch swarm.Chunk) error { + return m.chunkStore.Put(ctx, ch) +}