Skip to content

Commit

Permalink
feat: chunkstore replace unit test & soc pinning (#4902)
Browse files Browse the repository at this point in the history
  • Loading branch information
acha-bill authored Nov 13, 2024
1 parent 927f6f2 commit c0e2922
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 37 deletions.
2 changes: 0 additions & 2 deletions openapi/Swarm.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -869,8 +869,6 @@ paths:
schema:
$ref: "SwarmCommon.yaml#/components/parameters/SwarmPostageBatchId"
required: true
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmPinParameter"
required: false
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmPostageStamp"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmAct"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmActHistoryAddress"
Expand Down
15 changes: 14 additions & 1 deletion pkg/api/accesscontrol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ func TestAccessLogicEachEndpointWithAct(t *testing.T) {
resp struct {
Reference swarm.Address `json:"reference"`
}
direct bool
}{
{
name: "bzz",
Expand Down Expand Up @@ -159,6 +160,7 @@ func TestAccessLogicEachEndpointWithAct(t *testing.T) {
data: bytes.NewReader(sch.WrappedChunk.Data()),
expdata: sch.Chunk().Data(),
contenttype: "binary/octet-stream",
direct: true,
},
}

Expand All @@ -183,13 +185,24 @@ func TestAccessLogicEachEndpointWithAct(t *testing.T) {
upTestOpts = append(upTestOpts, jsonhttptest.WithRequestHeader(api.SwarmCollectionHeader, "True"))
}
t.Run(v.name, func(t *testing.T) {
client, _, _, _ := newTestServer(t, testServerOptions{
client, _, _, chanStore := newTestServer(t, testServerOptions{
Storer: storerMock,
Logger: logger,
Post: mockpost.New(mockpost.WithAcceptAll()),
PublicKey: pk.PublicKey,
AccessControl: mockac.New(),
DirectUpload: v.direct,
})

if chanStore != nil {
chanStore.Subscribe(func(chunk swarm.Chunk) {
err := storerMock.Put(context.Background(), chunk)
if err != nil {
t.Fatal(err)
}
})
}

header := jsonhttptest.Request(t, client, http.MethodPost, v.upurl, http.StatusCreated,
upTestOpts...,
)
Expand Down
10 changes: 10 additions & 0 deletions pkg/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,7 @@ type chanStorer struct {
lock sync.Mutex
chunks map[string]struct{}
quit chan struct{}
subs []func(chunk swarm.Chunk)
}

func newChanStore(cc <-chan *pusher.Op) *chanStorer {
Expand All @@ -650,6 +651,9 @@ func (c *chanStorer) drain(cc <-chan *pusher.Op) {
case op := <-cc:
c.lock.Lock()
c.chunks[op.Chunk.Address().ByteString()] = struct{}{}
for _, h := range c.subs {
h(op.Chunk)
}
c.lock.Unlock()
op.Err <- nil
case <-c.quit:
Expand All @@ -670,6 +674,12 @@ func (c *chanStorer) Has(addr swarm.Address) bool {
return ok
}

func (c *chanStorer) Subscribe(f func(chunk swarm.Chunk)) {
c.lock.Lock()
defer c.lock.Unlock()
c.subs = append(c.subs, f)
}

func createRedistributionAgentService(
t *testing.T,
addr swarm.Address,
Expand Down
39 changes: 9 additions & 30 deletions pkg/api/soc.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ import (
"github.com/ethersphere/bee/v2/pkg/jsonhttp"
"github.com/ethersphere/bee/v2/pkg/postage"
"github.com/ethersphere/bee/v2/pkg/soc"
"github.com/ethersphere/bee/v2/pkg/storage"
storer "github.com/ethersphere/bee/v2/pkg/storer"
"github.com/ethersphere/bee/v2/pkg/storer"
"github.com/ethersphere/bee/v2/pkg/swarm"
"github.com/gorilla/mux"
)
Expand Down Expand Up @@ -51,7 +50,6 @@ func (s *Service) socUploadHandler(w http.ResponseWriter, r *http.Request) {
headers := struct {
BatchID []byte `map:"Swarm-Postage-Batch-Id"`
StampSig []byte `map:"Swarm-Postage-Stamp"`
Pin bool `map:"Swarm-Pin"`
Act bool `map:"Swarm-Act"`
HistoryAddress swarm.Address `map:"Swarm-Act-History-Address"`
}{}
Expand All @@ -66,30 +64,11 @@ func (s *Service) socUploadHandler(w http.ResponseWriter, r *http.Request) {
return
}

// if pinning header is set we do a deferred upload, else we do a direct upload
var (
tag uint64
err error
putter storer.PutterSession
err error
)
if headers.Pin {
session, err := s.storer.NewSession()
if err != nil {
logger.Debug("get or create tag failed", "error", err)
logger.Error(nil, "get or create tag failed")
switch {
case errors.Is(err, storage.ErrNotFound):
jsonhttp.NotFound(w, "tag not found")
default:
jsonhttp.InternalServerError(w, "cannot get or create tag")
}
return
}
tag = session.TagID
}

deferred := tag != 0

var putter storer.PutterSession
if len(headers.StampSig) != 0 {
stamp := postage.Stamp{}
if err := stamp.UnmarshalBinary(headers.StampSig); err != nil {
Expand All @@ -102,16 +81,16 @@ func (s *Service) socUploadHandler(w http.ResponseWriter, r *http.Request) {

putter, err = s.newStampedPutter(r.Context(), putterOptions{
BatchID: stamp.BatchID(),
TagID: tag,
Pin: headers.Pin,
Deferred: deferred,
TagID: 0,
Pin: false,
Deferred: false,
}, &stamp)
} else {
putter, err = s.newStamperPutter(r.Context(), putterOptions{
BatchID: headers.BatchID,
TagID: tag,
Pin: headers.Pin,
Deferred: deferred,
TagID: 0,
Pin: false,
Deferred: false,
})
}
if err != nil {
Expand Down
12 changes: 10 additions & 2 deletions pkg/api/soc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package api_test

import (
"bytes"
"context"
"encoding/hex"
"fmt"
"io"
Expand Down Expand Up @@ -74,13 +75,20 @@ func TestSOC(t *testing.T) {

t.Run("ok", func(t *testing.T) {
s := testingsoc.GenerateMockSOC(t, testData)
client, _, _, _ := newTestServer(t, testServerOptions{
client, _, _, chanStore := newTestServer(t, testServerOptions{
Storer: mockStorer,
Post: newTestPostService(),
DirectUpload: true,
})

chanStore.Subscribe(func(ch swarm.Chunk) {
err := mockStorer.Put(context.Background(), ch)
if err != nil {
t.Fatal(err)
}
})

jsonhttptest.Request(t, client, http.MethodPost, socResource(hex.EncodeToString(s.Owner), hex.EncodeToString(s.ID), hex.EncodeToString(s.Signature)), http.StatusCreated,
jsonhttptest.WithRequestHeader(api.SwarmPinHeader, "true"),
jsonhttptest.WithRequestHeader(api.SwarmPostageBatchIdHeader, batchOkStr),
jsonhttptest.WithRequestBody(bytes.NewReader(s.WrappedChunk.Data())),
jsonhttptest.WithExpectedJSONResponse(api.SocPostResponse{
Expand Down
74 changes: 74 additions & 0 deletions pkg/storer/internal/chunkstore/chunkstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package chunkstore_test

import (
"bytes"
"context"
"errors"
"fmt"
Expand All @@ -13,7 +14,9 @@ import (
"os"
"testing"

"github.com/ethersphere/bee/v2/pkg/crypto"
"github.com/ethersphere/bee/v2/pkg/sharky"
soctesting "github.com/ethersphere/bee/v2/pkg/soc/testing"
"github.com/ethersphere/bee/v2/pkg/storer/internal/transaction"

"github.com/ethersphere/bee/v2/pkg/storage"
Expand Down Expand Up @@ -336,6 +339,77 @@ func TestChunkStore(t *testing.T) {
}
})

t.Run("replace chunk", func(t *testing.T) {
privKey, err := crypto.GenerateSecp256k1Key()
if err != nil {
t.Fatal(err)
}
signer := crypto.NewDefaultSigner(privKey)
ctx := context.Background()

ch1 := soctesting.GenerateMockSocWithSigner(t, []byte("data"), signer).Chunk()
err = st.Run(context.Background(), func(s transaction.Store) error {
return s.ChunkStore().Put(ctx, ch1)
})
if err != nil {
t.Fatal(err)
}

tests := []struct {
data string
emplace bool
wantRefCount uint32
}{
{
data: "data1",
emplace: true,
wantRefCount: 2,
},
{
data: "data2",
emplace: false,
wantRefCount: 2,
},
{
data: "data3",
emplace: true,
wantRefCount: 3,
},
}

for _, tt := range tests {
ch2 := soctesting.GenerateMockSocWithSigner(t, []byte(tt.data), signer).Chunk()
if !ch1.Address().Equal(ch2.Address()) {
t.Fatal("chunk addresses don't match")
}

err = st.Run(ctx, func(s transaction.Store) error {
return s.ChunkStore().Replace(ctx, ch2, tt.emplace)
})
if err != nil {
t.Fatal(err)
}

ch, err := st.ChunkStore().Get(ctx, ch2.Address())
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(ch.Data(), ch2.Data()) {
t.Fatalf("expected data override")
}

rIdx := &chunkstore.RetrievalIndexItem{Address: ch2.Address()}
err = st.IndexStore().Get(rIdx)
if err != nil {
t.Fatal(err)
}

if rIdx.RefCnt != tt.wantRefCount {
t.Fatalf("expected ref count %d, got %d", tt.wantRefCount, rIdx.RefCnt)
}
}
})

t.Run("close store", func(t *testing.T) {
err := st.Close()
if err != nil {
Expand Down
8 changes: 6 additions & 2 deletions pkg/storer/mock/mockstorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import (
"time"

"github.com/ethersphere/bee/v2/pkg/pusher"
storage "github.com/ethersphere/bee/v2/pkg/storage"
"github.com/ethersphere/bee/v2/pkg/storage"
"github.com/ethersphere/bee/v2/pkg/storage/inmemchunkstore"
storer "github.com/ethersphere/bee/v2/pkg/storer"
"github.com/ethersphere/bee/v2/pkg/storer"
"github.com/ethersphere/bee/v2/pkg/swarm"
"go.uber.org/atomic"
)
Expand Down Expand Up @@ -231,3 +231,7 @@ func (m *mockStorer) DebugInfo(_ context.Context) (storer.Info, error) {
func (m *mockStorer) NeighborhoodsStat(ctx context.Context) ([]*storer.NeighborhoodStat, error) {
return nil, nil
}

func (m *mockStorer) Put(ctx context.Context, ch swarm.Chunk) error {
return m.chunkStore.Put(ctx, ch)
}

0 comments on commit c0e2922

Please sign in to comment.