Skip to content

Commit

Permalink
feat: pinned reference integrity check API (#4573)
Browse files Browse the repository at this point in the history
  • Loading branch information
notanatol authored Feb 27, 2024
1 parent 5de732c commit b905de1
Show file tree
Hide file tree
Showing 8 changed files with 277 additions and 55 deletions.
11 changes: 10 additions & 1 deletion pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ type Storer interface {
storer.Debugger
}

type PinIntegrity interface {
Check(ctx context.Context, logger log.Logger, pin string, out chan storer.PinStat)
}

type Service struct {
auth auth.Authenticator
storer Storer
Expand Down Expand Up @@ -174,7 +178,9 @@ type Service struct {

batchStore postage.Storer
stamperStore storage.Store
syncStatus func() (bool, error)
pinIntegrity PinIntegrity

syncStatus func() (bool, error)

swap swap.Interface
transaction transaction.Service
Expand Down Expand Up @@ -242,6 +248,7 @@ type ExtraOptions struct {
Steward steward.Interface
SyncStatus func() (bool, error)
NodeStatus *status.Service
PinIntegrity PinIntegrity
}

func New(
Expand Down Expand Up @@ -348,6 +355,8 @@ func (s *Service) Configure(signer crypto.Signer, auth auth.Authenticator, trace
return "", err
}
}

s.pinIntegrity = e.PinIntegrity
}

func (s *Service) SetProbe(probe *Probe) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ type testServerOptions struct {
BeeMode api.BeeNodeMode
RedistributionAgent *storageincentives.Agent
NodeStatus *status.Service
PinIntegrity api.PinIntegrity
}

func newTestServer(t *testing.T, o testServerOptions) (*http.Client, *websocket.Conn, string, *chanStorer) {
Expand Down Expand Up @@ -201,6 +202,7 @@ func newTestServer(t *testing.T, o testServerOptions) (*http.Client, *websocket.
SyncStatus: o.SyncStatus,
Staking: o.StakingContract,
NodeStatus: o.NodeStatus,
PinIntegrity: o.PinIntegrity,
}

// By default bee mode is set to full mode.
Expand Down
62 changes: 62 additions & 0 deletions pkg/api/integritycheck.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2023 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package api

import (
"encoding/json"
"net/http"

storer "github.com/ethersphere/bee/pkg/storer"
"github.com/ethersphere/bee/pkg/swarm"
)

type PinIntegrityResponse struct {
Ref swarm.Address `json:"ref"`
Total int `json:"total"`
Missing int `json:"missing"`
Invalid int `json:"invalid"`
}

func (s *Service) pinIntegrityHandler(w http.ResponseWriter, r *http.Request) {
logger := s.logger.WithName("get_pin_integrity").Build()

querie := struct {
Ref swarm.Address `map:"ref"`
}{}

if response := s.mapStructure(r.URL.Query(), &querie); response != nil {
response("invalid query params", logger, w)
return
}

out := make(chan storer.PinStat)

go s.pinIntegrity.Check(r.Context(), logger, querie.Ref.String(), out)

flusher, ok := w.(http.Flusher)
if !ok {
http.NotFound(w, r)
return
}

w.Header().Set("Transfer-Encoding", "chunked")
w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.WriteHeader(http.StatusOK)
flusher.Flush()

enc := json.NewEncoder(w)

for v := range out {
resp := PinIntegrityResponse{
Ref: v.Ref,
Total: v.Total,
Missing: v.Missing,
Invalid: v.Invalid,
}
if err := enc.Encode(resp); err != nil {
break
}
flusher.Flush()
}
}
67 changes: 67 additions & 0 deletions pkg/api/integritycheck_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright 2022 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package api_test

import (
"context"
"net/http"
"testing"

"github.com/ethersphere/bee/pkg/jsonhttp/jsonhttptest"
"github.com/ethersphere/bee/pkg/log"
storage "github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storage/inmemstore"
storer "github.com/ethersphere/bee/pkg/storer"
)

const pinRef = "620fcd78c7ce54da2d1b7cc2274a02e190cbe8fecbc3bd244690ab6517ce8f39"

func TestIntegrityHandler(t *testing.T) {
t.Parallel()

t.Run("ok", func(t *testing.T) {
t.Parallel()
testServer, _, _, _ := newTestServer(t, testServerOptions{
DebugAPI: true,
PinIntegrity: &mockPinIntegrity{
Store: inmemstore.New(),
tester: t,
},
})

endp := "/check/pin?ref=" + pinRef

// When probe is not set health endpoint should indicate that node is not healthy
jsonhttptest.Request(t, testServer, http.MethodGet, endp, http.StatusOK, jsonhttptest.WithExpectedResponse(nil))
})

t.Run("wrong hash format", func(t *testing.T) {
t.Parallel()
testServer, _, _, _ := newTestServer(t, testServerOptions{
DebugAPI: true,
PinIntegrity: &mockPinIntegrity{
Store: inmemstore.New(),
tester: t,
},
})

endp := "/check/pin?ref=0xbadhash"

// When probe is not set health endpoint should indicate that node is not healthy
jsonhttptest.Request(t, testServer, http.MethodGet, endp, http.StatusBadRequest, jsonhttptest.WithExpectedResponse(nil))
})
}

type mockPinIntegrity struct {
tester *testing.T
Store storage.Store
}

func (p *mockPinIntegrity) Check(ctx context.Context, logger log.Logger, pin string, out chan storer.PinStat) {
if pin != pinRef {
p.tester.Fatal("bad pin", pin)
}
close(out)
}
6 changes: 6 additions & 0 deletions pkg/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,4 +599,10 @@ func (s *Service) mountBusinessDebug(restricted bool) {
"GET": http.HandlerFunc(s.rchash),
}),
))

handle("/check/pin", web.ChainHandlers(
web.FinalHandler(jsonhttp.MethodHandler{
"GET": http.HandlerFunc(s.pinIntegrityHandler),
}),
))
}
1 change: 1 addition & 0 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1089,6 +1089,7 @@ func NewBee(
Steward: steward,
SyncStatus: syncStatusFn,
NodeStatus: nodeStatus,
PinIntegrity: localStore.PinIntegrity(),
}

if o.APIAddr != "" {
Expand Down
39 changes: 26 additions & 13 deletions pkg/storer/storer.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,15 +278,15 @@ func initDiskRepository(
basePath string,
locker storage.ChunkLocker,
opts *Options,
) (storage.Repository, io.Closer, error) {
) (storage.Repository, *PinIntegrity, io.Closer, error) {
store, err := initStore(basePath, opts)
if err != nil {
return nil, nil, fmt.Errorf("failed creating levelDB index store: %w", err)
return nil, nil, nil, fmt.Errorf("failed creating levelDB index store: %w", err)
}

err = migration.Migrate(store, "core-migration", localmigration.BeforeIinitSteps())
if err != nil {
return nil, nil, fmt.Errorf("failed core migration: %w", err)
return nil, nil, nil, fmt.Errorf("failed core migration: %w", err)
}

if opts.LdbStats.Load() != nil {
Expand Down Expand Up @@ -338,13 +338,13 @@ func initDiskRepository(
if _, err := os.Stat(sharkyBasePath); os.IsNotExist(err) {
err := os.Mkdir(sharkyBasePath, 0777)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
}

recoveryCloser, err := sharkyRecovery(ctx, sharkyBasePath, store, opts)
if err != nil {
return nil, nil, fmt.Errorf("failed to recover sharky: %w", err)
return nil, nil, nil, fmt.Errorf("failed to recover sharky: %w", err)
}

sharky, err := sharky.New(
Expand All @@ -353,20 +353,25 @@ func initDiskRepository(
swarm.SocMaxChunkSize,
)
if err != nil {
return nil, nil, fmt.Errorf("failed creating sharky instance: %w", err)
return nil, nil, nil, fmt.Errorf("failed creating sharky instance: %w", err)
}

txStore := leveldbstore.NewTxStore(store)
if err := txStore.Recover(); err != nil {
return nil, nil, fmt.Errorf("failed to recover index store: %w", err)
return nil, nil, nil, fmt.Errorf("failed to recover index store: %w", err)
}

txChunkStore := chunkstore.NewTxChunkStore(txStore, sharky)
if err := txChunkStore.Recover(); err != nil {
return nil, nil, fmt.Errorf("failed to recover chunk store: %w", err)
return nil, nil, nil, fmt.Errorf("failed to recover chunk store: %w", err)
}

return storage.NewRepository(txStore, txChunkStore, locker), closer(store, sharky, recoveryCloser), nil
pinIntegrity := &PinIntegrity{
Store: store,
Sharky: sharky,
}

return storage.NewRepository(txStore, txChunkStore, locker), pinIntegrity, closer(store, sharky, recoveryCloser), nil
}

func initCache(ctx context.Context, capacity uint64, repo storage.Repository) (*cache.Cache, error) {
Expand Down Expand Up @@ -457,6 +462,8 @@ type DB struct {
setSyncerOnce sync.Once
syncer Syncer
opts workerOpts

pinIntegrity *PinIntegrity
}

type workerOpts struct {
Expand All @@ -468,9 +475,10 @@ type workerOpts struct {
// component stores.
func New(ctx context.Context, dirPath string, opts *Options) (*DB, error) {
var (
repo storage.Repository
err error
dbCloser io.Closer
repo storage.Repository
err error
dbCloser io.Closer
pinIntegrity *PinIntegrity
)
if opts == nil {
opts = defaultOptions()
Expand All @@ -497,7 +505,7 @@ func New(ctx context.Context, dirPath string, opts *Options) (*DB, error) {
return nil, err
}
} else {
repo, dbCloser, err = initDiskRepository(ctx, dirPath, locker, opts)
repo, pinIntegrity, dbCloser, err = initDiskRepository(ctx, dirPath, locker, opts)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -550,6 +558,7 @@ func New(ctx context.Context, dirPath string, opts *Options) (*DB, error) {
wakeupDuration: opts.ReserveWakeUpDuration,
},
directUploadLimiter: make(chan struct{}, pusher.ConcurrentPushes),
pinIntegrity: pinIntegrity,
}

if db.validStamp == nil {
Expand Down Expand Up @@ -665,6 +674,10 @@ func (db *DB) ChunkStore() storage.ReadOnlyChunkStore {
return db.repo.ChunkStore()
}

func (db *DB) PinIntegrity() *PinIntegrity {
return db.pinIntegrity
}

// Execute implements the internal.TxExecutor interface.
func (db *DB) Execute(ctx context.Context, do func(internal.Storage) error) error {
tx, commit, rollback := db.repo.NewTx(ctx)
Expand Down
Loading

0 comments on commit b905de1

Please sign in to comment.