diff --git a/pkg/api/api.go b/pkg/api/api.go index 49331689a0f..d4ea6baf63a 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -51,6 +51,7 @@ import ( "github.com/ethersphere/bee/pkg/steward" storage "github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storageincentives" + "github.com/ethersphere/bee/pkg/storageincentives/sampler" "github.com/ethersphere/bee/pkg/storageincentives/staking" storer "github.com/ethersphere/bee/pkg/storer" "github.com/ethersphere/bee/pkg/swarm" @@ -153,6 +154,7 @@ type Service struct { probe *Probe metricsRegistry *prometheus.Registry stakingContract staking.Contract + sampler *sampler.Sampler Options http.Handler @@ -181,6 +183,7 @@ type Service struct { batchStore postage.Storer stamperStore storage.Store syncStatus func() (bool, error) + sample *sampler.Sampler swap swap.Interface transaction transaction.Service @@ -245,6 +248,7 @@ type ExtraOptions struct { Post postage.Service PostageContract postagecontract.Interface Staking staking.Contract + Sampler *sampler.Sampler Steward steward.Interface SyncStatus func() (bool, error) NodeStatus *status.Service @@ -323,6 +327,7 @@ func (s *Service) Configure(signer crypto.Signer, auth auth.Authenticator, trace s.postageContract = e.PostageContract s.steward = e.Steward s.stakingContract = e.Staking + s.sampler = e.Sampler s.pingpong = e.Pingpong s.topologyDriver = e.TopologyDriver diff --git a/pkg/api/api_test.go b/pkg/api/api_test.go index 21e535f10ac..53309f1d63e 100644 --- a/pkg/api/api_test.go +++ b/pkg/api/api_test.go @@ -58,8 +58,10 @@ import ( testingc "github.com/ethersphere/bee/pkg/storage/testing" "github.com/ethersphere/bee/pkg/storageincentives" "github.com/ethersphere/bee/pkg/storageincentives/redistribution" + "github.com/ethersphere/bee/pkg/storageincentives/sampler" "github.com/ethersphere/bee/pkg/storageincentives/staking" mock2 "github.com/ethersphere/bee/pkg/storageincentives/staking/mock" + storer "github.com/ethersphere/bee/pkg/storer" mockstorer "github.com/ethersphere/bee/pkg/storer/mock" "github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/topology/lightnode" @@ -213,7 +215,7 @@ func newTestServer(t *testing.T, o testServerOptions) (*http.Client, *websocket. s.SetP2P(o.P2P) if o.RedistributionAgent == nil { - o.RedistributionAgent, _ = createRedistributionAgentService(t, o.Overlay, o.StateStorer, erc20, transaction, backend, o.BatchStore) + o.RedistributionAgent, _ = createRedistributionAgentService(t, o.Overlay, backend, o.BatchStore, o.StateStorer, erc20, transaction) s.SetRedistributionAgent(o.RedistributionAgent) } testutil.CleanupCloser(t, o.RedistributionAgent) @@ -689,11 +691,11 @@ func (c *chanStorer) Has(addr swarm.Address) bool { func createRedistributionAgentService( t *testing.T, addr swarm.Address, - storer storage.StateStorer, + backend postage.ChainBackend, + batchstore sampler.Batchstore, + stateStore storage.StateStorer, erc20Service erc20.Service, tranService transaction.Service, - backend storageincentives.ChainBackend, - chainStateGetter postage.ChainStateGetter, ) (*storageincentives.Agent, error) { t.Helper() @@ -707,28 +709,34 @@ func createRedistributionAgentService( return true, nil })) contract := &mockContract{} - + fullySyncedFunc := func() bool { return true } + healthyFunc := func() bool { return true } return storageincentives.New( + log.Noop, addr, common.Address{}, + time.Millisecond*10, backend, contract, postageContract, stakingContract, - mockstorer.NewReserve(), - func() bool { return true }, - time.Millisecond*10, - blocksPerRound, - blocksPerPhase, - storer, - chainStateGetter, + sampler.New(backend, batchstore, &mockSampler{}, fullySyncedFunc, healthyFunc), + stateStore, erc20Service, tranService, - &mockHealth{}, - log.Noop, ) } +type mockSampler struct{} + +func (*mockSampler) StorageRadius() uint8 { + return 0 +} + +func (*mockSampler) Iterate(uint8, func(storer.Chunk) (bool, error)) error { + return nil +} + type contractCall int func (c contractCall) String() string { @@ -776,7 +784,7 @@ func (m *mockContract) IsWinner(context.Context) (bool, error) { return false, nil } -func (m *mockContract) Claim(context.Context, redistribution.ChunkInclusionProofs) (common.Hash, error) { +func (m *mockContract) Claim(context.Context, []redistribution.Proof) (common.Hash, error) { m.mtx.Lock() defer m.mtx.Unlock() m.callsList = append(m.callsList, claimCall) diff --git a/pkg/api/rchash.go b/pkg/api/rchash.go index ad7a25f9992..736293d61e1 100644 --- a/pkg/api/rchash.go +++ b/pkg/api/rchash.go @@ -16,38 +16,32 @@ import ( "github.com/gorilla/mux" ) -type RCHashResponse struct { - Hash swarm.Address `json:"hash"` - Proofs ChunkInclusionProofs `json:"proofs"` - Duration time.Duration `json:"duration"` +type SampleWithProofs struct { + Hash swarm.Address `json:"hash"` + Proofs []Proof `json:"proofs"` + Duration time.Duration `json:"duration"` } -type ChunkInclusionProofs struct { - A ChunkInclusionProof `json:"proof1"` - B ChunkInclusionProof `json:"proof2"` - C ChunkInclusionProof `json:"proofLast"` -} - -// ChunkInclusionProof structure must exactly match +// Proof structure must exactly match // corresponding structure (of the same name) in Redistribution.sol smart contract. // github.com/ethersphere/storage-incentives/blob/ph_f2/src/Redistribution.sol // github.com/ethersphere/storage-incentives/blob/master/src/Redistribution.sol (when merged to master) -type ChunkInclusionProof struct { - ProofSegments []string `json:"proofSegments"` - ProveSegment string `json:"proveSegment"` - ProofSegments2 []string `json:"proofSegments2"` - ProveSegment2 string `json:"proveSegment2"` - ChunkSpan uint64 `json:"chunkSpan"` - ProofSegments3 []string `json:"proofSegments3"` - PostageProof PostageProof `json:"postageProof"` - SocProof []SOCProof `json:"socProof"` +type Proof struct { + Sisters []string `json:"sisters"` + Data string `json:"data"` + Sisters2 []string `json:"sisters2"` + Data2 string `json:"data2"` + Sisters3 []string `json:"sisters3"` + ChunkSpan uint64 `json:"chunkSpan"` + PostageProof PostageProof `json:"postageProof"` + SocProof []SOCProof `json:"socProof"` } // SOCProof structure must exactly match // corresponding structure (of the same name) in Redistribution.sol smart contract. type PostageProof struct { Signature string `json:"signature"` - PostageId string `json:"postageId"` + BatchID string `json:"BatchID"` Index string `json:"index"` TimeStamp string `json:"timeStamp"` } @@ -61,35 +55,37 @@ type SOCProof struct { ChunkAddr string `json:"chunkAddr"` } -func renderChunkInclusionProofs(proofs redistribution.ChunkInclusionProofs) ChunkInclusionProofs { - return ChunkInclusionProofs{ - A: renderChunkInclusionProof(proofs.A), - B: renderChunkInclusionProof(proofs.B), - C: renderChunkInclusionProof(proofs.C), +func renderProofs(proofs []redistribution.Proof) []Proof { + out := make([]Proof, 3) + for i, p := range proofs { + out[i] = renderProof(p) } + return out } -func renderChunkInclusionProof(proof redistribution.ChunkInclusionProof) ChunkInclusionProof { +func renderProof(proof redistribution.Proof) Proof { var socProof []SOCProof if len(proof.SocProof) == 1 { - socProof = []SOCProof{{ - Signer: hex.EncodeToString(proof.SocProof[0].Signer.Bytes()), - Signature: hex.EncodeToString(proof.SocProof[0].Signature[:]), - Identifier: hex.EncodeToString(proof.SocProof[0].Identifier.Bytes()), - ChunkAddr: hex.EncodeToString(proof.SocProof[0].ChunkAddr.Bytes()), - }} + socProof = []SOCProof{ + { + Signer: toHex(proof.SocProof[0].Signer), + Signature: toHex(proof.SocProof[0].Signature), + Identifier: toHex(proof.SocProof[0].Identifier[:]), + ChunkAddr: toHex(proof.SocProof[0].ChunkAddr[:]), + }, + } } - return ChunkInclusionProof{ - ProveSegment: hex.EncodeToString(proof.ProveSegment.Bytes()), - ProofSegments: renderCommonHash(proof.ProofSegments), - ProveSegment2: hex.EncodeToString(proof.ProveSegment2.Bytes()), - ProofSegments2: renderCommonHash(proof.ProofSegments2), - ProofSegments3: renderCommonHash(proof.ProofSegments3), - ChunkSpan: proof.ChunkSpan, + return Proof{ + Data: toHex(proof.Data[:]), + Sisters: renderHash(proof.Sisters...), + Data2: toHex(proof.Data2[:]), + Sisters2: renderHash(proof.Sisters2...), + Sisters3: renderHash(proof.Sisters3...), + ChunkSpan: proof.ChunkSpan, PostageProof: PostageProof{ - Signature: hex.EncodeToString(proof.PostageProof.Signature[:]), - PostageId: hex.EncodeToString(proof.PostageProof.PostageId[:]), + Signature: toHex(proof.PostageProof.Signature), + BatchID: toHex(proof.PostageProof.BatchId[:]), Index: strconv.FormatUint(proof.PostageProof.Index, 16), TimeStamp: strconv.FormatUint(proof.PostageProof.TimeStamp, 16), }, @@ -97,14 +93,16 @@ func renderChunkInclusionProof(proof redistribution.ChunkInclusionProof) ChunkIn } } -func renderCommonHash(proofSegments []common.Hash) []string { - output := make([]string, len(proofSegments)) - for i, s := range proofSegments { - output[i] = hex.EncodeToString(s.Bytes()) +func renderHash(hs ...common.Hash) []string { + output := make([]string, len(hs)) + for i, h := range hs { + output[i] = hex.EncodeToString(h.Bytes()) } return output } +var toHex func([]byte) string = hex.EncodeToString + // This API is kept for testing the sampler. As a result, no documentation or tests are added here. func (s *Service) rchash(w http.ResponseWriter, r *http.Request) { logger := s.logger.WithName("get_rchash").Build() @@ -123,17 +121,18 @@ func (s *Service) rchash(w http.ResponseWriter, r *http.Request) { anchor2 := []byte(paths.Anchor2) - swp, err := s.redistributionAgent.SampleWithProofs(r.Context(), anchor1, anchor2, paths.Depth) + var round uint64 + swp, err := s.sampler.ReserveSampleWithProofs(r.Context(), anchor1, anchor2, paths.Depth, round) if err != nil { logger.Error(err, "failed making sample with proofs") jsonhttp.InternalServerError(w, "failed making sample with proofs") return } - resp := RCHashResponse{ + resp := SampleWithProofs{ Hash: swp.Hash, Duration: swp.Duration, - Proofs: renderChunkInclusionProofs(swp.Proofs), + Proofs: renderProofs(swp.Proofs), } jsonhttp.OK(w, resp) diff --git a/pkg/bmt/proof.go b/pkg/bmt/proof.go index cc59fd33766..d753b6556d4 100644 --- a/pkg/bmt/proof.go +++ b/pkg/bmt/proof.go @@ -11,10 +11,10 @@ type Prover struct { // Proof represents a Merkle proof of segment type Proof struct { - ProveSegment []byte - ProofSegments [][]byte - Span []byte - Index int + Data []byte + Sisters [][]byte + Span []byte + Index int } // Hash overrides base hash function of Hasher to fill buffer with zeros until chunk length @@ -28,6 +28,7 @@ func (p Prover) Hash(b []byte) ([]byte, error) { return p.Hasher.Hash(b) } + // Proof returns the inclusion proof of the i-th data segment func (p Prover) Proof(i int) Proof { index := i @@ -60,9 +61,9 @@ func (p Prover) Proof(i int) Proof { func (p Prover) Verify(i int, proof Proof) (root []byte, err error) { var section []byte if i%2 == 0 { - section = append(append(section, proof.ProveSegment...), proof.ProofSegments[0]...) + section = append(append(section, proof.Data...), proof.Sisters[0]...) } else { - section = append(append(section, proof.ProofSegments[0]...), proof.ProveSegment...) + section = append(append(section, proof.Sisters[0]...), proof.Data...) } i = i / 2 n := p.bmt.leaves[i] @@ -74,7 +75,7 @@ func (p Prover) Verify(i int, proof Proof) (root []byte, err error) { } n = n.parent - for _, sister := range proof.ProofSegments[1:] { + for _, sister := range proof.Sisters[1:] { if isLeft { root, err = doHash(hasher, root, sister) } else { diff --git a/pkg/bmt/proof_test.go b/pkg/bmt/proof_test.go index 1b7f6d3b3dd..8609f7f3746 100644 --- a/pkg/bmt/proof_test.go +++ b/pkg/bmt/proof_test.go @@ -79,9 +79,9 @@ func TestProofCorrectness(t *testing.T) { "887c22bd8750d34016ac3c66b5ff102dacdd73f6b014e710b51e8022af9a1968", } - verifySegments(t, expSegmentStrings, proof.ProofSegments) + verifySegments(t, expSegmentStrings, proof.Sisters) - if !bytes.Equal(proof.ProveSegment, testDataPadded[:hh.Size()]) { + if !bytes.Equal(proof.Data, testDataPadded[:hh.Size()]) { t.Fatal("section incorrect") } @@ -105,9 +105,9 @@ func TestProofCorrectness(t *testing.T) { "745bae095b6ff5416b4a351a167f731db6d6f5924f30cd88d48e74261795d27b", } - verifySegments(t, expSegmentStrings, proof.ProofSegments) + verifySegments(t, expSegmentStrings, proof.Sisters) - if !bytes.Equal(proof.ProveSegment, testDataPadded[127*hh.Size():]) { + if !bytes.Equal(proof.Data, testDataPadded[127*hh.Size():]) { t.Fatal("section incorrect") } @@ -131,9 +131,9 @@ func TestProofCorrectness(t *testing.T) { "745bae095b6ff5416b4a351a167f731db6d6f5924f30cd88d48e74261795d27b", } - verifySegments(t, expSegmentStrings, proof.ProofSegments) + verifySegments(t, expSegmentStrings, proof.Sisters) - if !bytes.Equal(proof.ProveSegment, testDataPadded[64*hh.Size():65*hh.Size()]) { + if !bytes.Equal(proof.Data, testDataPadded[64*hh.Size():65*hh.Size()]) { t.Fatal("section incorrect") } @@ -167,9 +167,9 @@ func TestProofCorrectness(t *testing.T) { segment := testDataPadded[64*hh.Size() : 65*hh.Size()] rootHash, err := pr.Verify(64, bmt.Proof{ - ProveSegment: segment, - ProofSegments: segments, - Span: bmt.LengthToSpan(4096), + Data: segment, + Sisters: segments, + Span: bmt.LengthToSpan(4096), }) if err != nil { t.Fatal(err) diff --git a/pkg/node/node.go b/pkg/node/node.go index ddcdad37771..5288bce7d86 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -61,6 +61,7 @@ import ( "github.com/ethersphere/bee/pkg/steward" "github.com/ethersphere/bee/pkg/storageincentives" "github.com/ethersphere/bee/pkg/storageincentives/redistribution" + "github.com/ethersphere/bee/pkg/storageincentives/sampler" "github.com/ethersphere/bee/pkg/storageincentives/staking" storer "github.com/ethersphere/bee/pkg/storer" "github.com/ethersphere/bee/pkg/swarm" @@ -681,7 +682,6 @@ func NewBee( } postageStampContractABI := abiutil.MustParseABI(chainCfg.PostageStampABI) - bzzTokenAddress, err := postagecontract.LookupERC20Address(ctx, transactionService, postageStampContractAddress, postageStampContractABI, chainEnabled) if err != nil { return nil, err @@ -1051,30 +1051,31 @@ func NewBee( } redistributionContractAddress = common.HexToAddress(o.RedistributionContractAddress) } - isFullySynced := func() bool { return localStore.ReserveSize() >= reserveTreshold && pullerService.SyncRate() == 0 } redistributionContract := redistribution.New(swarmAddress, logger, transactionService, redistributionContractAddress, abiutil.MustParseABI(chainCfg.RedistributionABI)) + s := sampler.New( + chainBackend, + batchStore, + localStore, + isFullySynced, + saludService.IsHealthy, + ) agent, err = storageincentives.New( + logger, swarmAddress, overlayEthAddress, + o.BlockTime, chainBackend, redistributionContract, postageStampContractService, stakingContract, - localStore, - isFullySynced, - o.BlockTime, - storageincentives.DefaultBlocksPerRound, - storageincentives.DefaultBlocksPerPhase, + s, stateStore, - batchStore, erc20Service, transactionService, - saludService, - logger, ) if err != nil { return nil, fmt.Errorf("storage incentives agent: %w", err) diff --git a/pkg/postage/batchstore/mock/store.go b/pkg/postage/batchstore/mock/store.go index f3c10bf236d..9e06f7d8e7a 100644 --- a/pkg/postage/batchstore/mock/store.go +++ b/pkg/postage/batchstore/mock/store.go @@ -144,6 +144,13 @@ func (bs *BatchStore) Iterate(f func(*postage.Batch) (bool, error)) error { return err } +// IterateByValue mocks the IterateByvalue method from the BatchStore +func (bs *BatchStore) IterateByValue(f func([]byte, *big.Int) (bool, error)) error { + return bs.Iterate(func(b *postage.Batch) (bool, error) { + return f(b.ID, b.Value) + }) +} + // Save mocks the Save method from the BatchStore. func (bs *BatchStore) Save(batch *postage.Batch) error { bs.mtx.Lock() diff --git a/pkg/postage/batchstore/store.go b/pkg/postage/batchstore/store.go index 99069292ea3..466f3ca1d3e 100644 --- a/pkg/postage/batchstore/store.go +++ b/pkg/postage/batchstore/store.go @@ -126,6 +126,13 @@ func (s *store) Iterate(cb func(*postage.Batch) (bool, error)) error { }) } +// IterateByValue iterates on batches by value ascending order +func (s *store) IterateByValue(cb func(id []byte, val *big.Int) (bool, error)) error { + return s.store.Iterate(valueKeyPrefix, func(key, _ []byte) (bool, error) { + return cb(valueKeyToID(key), valueKeyToValue(key)) + }) +} + // Save is implementation of postage.Storer interface Save method. // This method has side effects; it also updates the radius of the node if successful. func (s *store) Save(batch *postage.Batch) error { @@ -269,44 +276,38 @@ func (s *store) saveBatch(b *postage.Batch) error { // cleanup evicts and removes expired batch. // Must be called under lock. func (s *store) cleanup() error { + var ids [][]byte + var keys []string + totalAmount := s.cs.Load().TotalAmount - var evictions []*postage.Batch - - err := s.store.Iterate(valueKeyPrefix, func(key, value []byte) (stop bool, err error) { - - b, err := s.Get(valueKeyToID(key)) - if err != nil { - return false, err - } - + err := s.IterateByValue(func(id []byte, val *big.Int) (bool, error) { // batches whose balance is below the total cumulative payout - if b.Value.Cmp(s.cs.Load().TotalAmount) <= 0 { - evictions = append(evictions, b) - } else { - return true, nil // stop early as an optimization at first value above the total cumulative payout + if val.Cmp(totalAmount) <= 0 { + ids = append(ids, id) + keys = append(keys, valueKey(val, id)) + return false, nil } - - return false, nil + return true, nil // stop early as an optimization at first value above the total cumulative payout }) if err != nil { return err } - for _, b := range evictions { - err := s.store.Delete(valueKey(b.Value, b.ID)) + for i, id := range ids { + err := s.store.Delete(keys[i]) if err != nil { - return fmt.Errorf("delete value key for batch %x: %w", b.ID, err) + return fmt.Errorf("delete value key for batch %x: %w", id, err) } - err = s.store.Delete(batchKey(b.ID)) + err = s.store.Delete(batchKey(id)) if err != nil { - return fmt.Errorf("delete batch %x: %w", b.ID, err) + return fmt.Errorf("delete batch %x: %w", id, err) } - err = s.evictFn(b.ID) + err = s.evictFn(id) if err != nil { - return fmt.Errorf("evict batch %x: %w", b.ID, err) + return fmt.Errorf("evict batch %x: %w", id, err) } if s.batchExpiry != nil { - s.batchExpiry.HandleStampExpiry(b.ID) + s.batchExpiry.HandleStampExpiry(id) } } @@ -373,6 +374,11 @@ func valueKeyToID(key []byte) []byte { return key[l-32 : l] } +// valueKeyToValue extracts the value from a value key - used in value-based iteration. +func valueKeyToValue(key []byte) *big.Int { + return new(big.Int).SetBytes(key[:32]) +} + func (s *store) SetBatchExpiryHandler(be postage.BatchExpiryHandler) { s.batchExpiry = be } diff --git a/pkg/postage/interface.go b/pkg/postage/interface.go index 38660c13b48..9612a5f1086 100644 --- a/pkg/postage/interface.go +++ b/pkg/postage/interface.go @@ -37,12 +37,19 @@ type ChainSnapshot struct { FirstBlockNumber uint64 `json:"firstBlockNumber"` Timestamp int64 `json:"timestamp"` } +type ChainBackend interface { + BlockNumber(context.Context) (uint64, error) + HeaderByNumber(context.Context, *big.Int) (*types.Header, error) + BalanceAt(ctx context.Context, address common.Address, block *big.Int) (*big.Int, error) + SuggestGasPrice(ctx context.Context) (*big.Int, error) +} // Storer represents the persistence layer for batches // on the current (highest available) block. type Storer interface { ChainStateGetter CommitmentGetter + ValueIterator Radius() uint8 @@ -84,6 +91,11 @@ type ChainStateGetter interface { GetChainState() *ChainState } +type ValueIterator interface { + // IterateByValue iterates on batches by value ascending order + IterateByValue(cb func(id []byte, val *big.Int) (bool, error)) error +} + // Listener provides a blockchain event iterator. type Listener interface { io.Closer diff --git a/pkg/postage/noop.go b/pkg/postage/noop.go index ea8387deee8..1b3c331f8ba 100644 --- a/pkg/postage/noop.go +++ b/pkg/postage/noop.go @@ -26,6 +26,8 @@ func (b *NoOpBatchStore) Exists([]byte) (bool, error) { return false, nil } func (b *NoOpBatchStore) Iterate(func(*Batch) (bool, error)) error { return nil } +func (b *NoOpBatchStore) IterateByValue(func([]byte, *big.Int) (bool, error)) error { return nil } + func (b *NoOpBatchStore) Save(*Batch) error { return nil } func (b *NoOpBatchStore) Update(*Batch, *big.Int, uint8) error { return nil } diff --git a/pkg/storageincentives/agent.go b/pkg/storageincentives/agent.go index 51a62393a8c..a0669e97875 100644 --- a/pkg/storageincentives/agent.go +++ b/pkg/storageincentives/agent.go @@ -11,11 +11,9 @@ import ( "fmt" "io" "math/big" - "sync" "time" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" "github.com/ethersphere/bee/pkg/crypto" "github.com/ethersphere/bee/pkg/log" "github.com/ethersphere/bee/pkg/postage" @@ -23,8 +21,8 @@ import ( "github.com/ethersphere/bee/pkg/settlement/swap/erc20" "github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storageincentives/redistribution" + "github.com/ethersphere/bee/pkg/storageincentives/sampler" "github.com/ethersphere/bee/pkg/storageincentives/staking" - "github.com/ethersphere/bee/pkg/storer" "github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/transaction" ) @@ -32,9 +30,8 @@ import ( const loggerName = "storageincentives" const ( - DefaultBlocksPerRound = 152 - DefaultBlocksPerPhase = DefaultBlocksPerRound / 4 - + blocksPerRound = 152 + blocksPerPhase = blocksPerRound / 4 // min # of transactions our wallet should be able to cover minTxCountToCover = 15 @@ -42,70 +39,62 @@ const ( avgTxGas = 250_000 ) -type ChainBackend interface { - BlockNumber(context.Context) (uint64, error) - HeaderByNumber(context.Context, *big.Int) (*types.Header, error) - BalanceAt(ctx context.Context, address common.Address, block *big.Int) (*big.Int, error) - SuggestGasPrice(ctx context.Context) (*big.Int, error) -} - -type Health interface { - IsHealthy() bool -} - type Agent struct { logger log.Logger metrics metrics - backend ChainBackend - blocksPerRound uint64 + overlay swarm.Address + handlers []func(context.Context, uint64) error + backend postage.ChainBackend contract redistribution.Contract batchExpirer postagecontract.PostageBatchExpirer redistributionStatuser staking.RedistributionStatuser - store storer.Reserve - fullSyncedFunc func() bool - overlay swarm.Address + sampler *sampler.Sampler quit chan struct{} - wg sync.WaitGroup + stopped chan struct{} state *RedistributionState - chainStateGetter postage.ChainStateGetter - commitLock sync.Mutex - health Health } -func New(overlay swarm.Address, +func handlers(a *Agent) []func(context.Context, uint64) error { + return []func(context.Context, uint64) error{ + a.handleCommit, + a.handleReveal, + a.handleClaim, + a.handleSample, + } +} + +// handlersFunc is made available in tests +var handlersFunc func(a *Agent) []func(context.Context, uint64) error = handlers + +func New( + logger log.Logger, + overlay swarm.Address, ethAddress common.Address, - backend ChainBackend, + blockTime time.Duration, + blockbackend postage.ChainBackend, contract redistribution.Contract, batchExpirer postagecontract.PostageBatchExpirer, redistributionStatuser staking.RedistributionStatuser, - store storer.Reserve, - fullSyncedFunc func() bool, - blockTime time.Duration, - blocksPerRound, - blocksPerPhase uint64, + s *sampler.Sampler, stateStore storage.StateStorer, - chainStateGetter postage.ChainStateGetter, erc20Service erc20.Service, tranService transaction.Service, - health Health, - logger log.Logger, ) (*Agent, error) { a := &Agent{ - overlay: overlay, - metrics: newMetrics(), - backend: backend, logger: logger.WithName(loggerName).Register(), + metrics: newMetrics(), + overlay: overlay, + backend: blockbackend, + sampler: s, contract: contract, batchExpirer: batchExpirer, - store: store, - fullSyncedFunc: fullSyncedFunc, - blocksPerRound: blocksPerRound, - quit: make(chan struct{}), redistributionStatuser: redistributionStatuser, - health: health, - chainStateGetter: chainStateGetter, + quit: make(chan struct{}), + stopped: make(chan struct{}), } + a.handlers = handlersFunc(a) + state, err := NewRedistributionState(logger, ethAddress, stateStore, erc20Service, tranService) if err != nil { return nil, err @@ -113,155 +102,142 @@ func New(overlay swarm.Address, a.state = state - a.wg.Add(1) - go a.start(blockTime, a.blocksPerRound, blocksPerPhase) + go a.start(blockTime) return a, nil } +type phaseAndBlock struct { + phase PhaseType + block uint64 +} + // start polls the current block number, calculates, and publishes only once the current phase. // Each round is blocksPerRound long and is divided into three blocksPerPhase long phases: commit, reveal, claim. // The sample phase is triggered upon entering the claim phase and may run until the end of the commit phase. // If our neighborhood is selected to participate, a sample is created during the sample phase. In the commit phase, // the sample is submitted, and in the reveal phase, the obfuscation key from the commit phase is submitted. // Next, in the claim phase, we check if we've won, and the cycle repeats. The cycle must occur in the length of one round. -func (a *Agent) start(blockTime time.Duration, blocksPerRound, blocksPerPhase uint64) { - defer a.wg.Done() - - phaseEvents := newEvents() +func (a *Agent) start(blockTime time.Duration) { + defer close(a.stopped) + phaseEvents := NewEvents() defer phaseEvents.Close() - logErr := func(phase PhaseType, round uint64, err error) { - if err != nil { - a.logger.Error(err, "phase failed", "phase", phase, "round", round) - } - } - - phaseEvents.On(commit, func(ctx context.Context) { + phaseEvents.On(commit, func(quit chan struct{}, blockNumber uint64) { phaseEvents.Cancel(claim) - - round, _ := a.state.currentRoundAndPhase() - err := a.handleCommit(ctx, round) - logErr(commit, round, err) + a.handlePhase(commit, quit, blockNumber) }) - phaseEvents.On(reveal, func(ctx context.Context) { + phaseEvents.On(reveal, func(quit chan struct{}, blockNumber uint64) { phaseEvents.Cancel(commit, sample) - round, _ := a.state.currentRoundAndPhase() - logErr(reveal, round, a.handleReveal(ctx, round)) + a.handlePhase(reveal, quit, blockNumber) }) - phaseEvents.On(claim, func(ctx context.Context) { + phaseEvents.On(claim, func(quit chan struct{}, blockNumber uint64) { phaseEvents.Cancel(reveal) - phaseEvents.Publish(sample) - - round, _ := a.state.currentRoundAndPhase() - logErr(claim, round, a.handleClaim(ctx, round)) + phaseEvents.Publish(sample, blockNumber) + a.handlePhase(claim, quit, blockNumber) }) - phaseEvents.On(sample, func(ctx context.Context) { - round, _ := a.state.currentRoundAndPhase() - isPhasePlayed, err := a.handleSample(ctx, round) - logErr(sample, round, err) - - // Sample handled could potentially take long time, therefore it could overlap with commit - // phase of next round. When that case happens commit event needs to be triggered once more - // in order to handle commit phase with delay. - currentRound, currentPhase := a.state.currentRoundAndPhase() - if isPhasePlayed && - currentPhase == commit && - currentRound-1 == round { - phaseEvents.Publish(commit) - } + phaseEvents.On(sample, func(quit chan struct{}, blockNumber uint64) { + a.handlePhase(sample, quit, blockNumber) }) - - var ( - prevPhase PhaseType = -1 - currentPhase PhaseType - ) - - phaseCheck := func(ctx context.Context) { - ctx, cancel := context.WithTimeout(ctx, blockTime*time.Duration(blocksPerRound)) - defer cancel() - - a.metrics.BackendCalls.Inc() - block, err := a.backend.BlockNumber(ctx) - if err != nil { - a.metrics.BackendErrors.Inc() - a.logger.Error(err, "getting block number") - return - } - - a.state.SetCurrentBlock(block) - - round := block / blocksPerRound - - a.metrics.Round.Set(float64(round)) - - p := block % blocksPerRound - if p < blocksPerPhase { - currentPhase = commit // [0, 37] - } else if p >= blocksPerPhase && p < 2*blocksPerPhase { // [38, 75] - currentPhase = reveal - } else if p >= 2*blocksPerPhase { - currentPhase = claim // [76, 151] - } - - // write the current phase only once - if currentPhase == prevPhase { + phaseC := make(chan phaseAndBlock) + phaseCheckInterval := blockTime + // optimization, we do not need to check the phase change at every new block + if blocksPerPhase > 10 { + phaseCheckInterval = blockTime * 5 + } + ticker := time.NewTicker(phaseCheckInterval) + defer ticker.Stop() + var phase PhaseType + var block uint64 + ctx, cancel := context.WithCancel(context.Background()) + for { + select { + case pr := <-phaseC: + phase, block = pr.phase, pr.block + phaseEvents.Publish(phase, block) + round := block / blocksPerRound + + go a.metrics.CurrentPhase.Set(float64(phase)) + a.metrics.Round.Set(float64(round)) + + a.logger.Info("entered new phase", "phase", phase.String(), "round", round, "block", block) + a.state.SetCurrentBlock(block) + a.state.SetCurrentEvent(phase, round) + // a.state.SetFullySynced(a.fullSyncedFunc()) + // a.state.SetHealthy(a.healthyFunc()) + go a.state.purgeStaleRoundData() + + case <-ticker.C: + cancel() + ctx, cancel = context.WithCancel(context.Background()) + go func() { + defer cancel() + a.getPhase(ctx, phase, phaseC) + }() + + case <-a.quit: return } + } +} - prevPhase = currentPhase - a.metrics.CurrentPhase.Set(float64(currentPhase)) - - a.logger.Info("entered new phase", "phase", currentPhase.String(), "round", round, "block", block) - - a.state.SetCurrentEvent(currentPhase, round) - a.state.SetFullySynced(a.fullSyncedFunc()) - a.state.SetHealthy(a.health.IsHealthy()) - go a.state.purgeStaleRoundData() +func currentPhase(ctx context.Context, phase PhaseType, block uint64) (changed bool, newphase PhaseType) { + rem := block % blocksPerRound + newphase = phase + switch { + case rem < blocksPerPhase: // [0, 37] + newphase = commit + case rem >= blocksPerPhase && rem < 2*blocksPerPhase: // [38, 75] + newphase = reveal + default: + newphase = claim // [76, 151] + } + return newphase == phase, newphase +} - isFrozen, err := a.redistributionStatuser.IsOverlayFrozen(ctx, block) - if err != nil { - a.logger.Error(err, "error checking if stake is frozen") - } else { - a.state.SetFrozen(isFrozen, round) - } +// getPhase feeds the phase to the channel if changed +func (a *Agent) getPhase(ctx context.Context, phase PhaseType, phaseC chan phaseAndBlock) { + a.metrics.BackendCalls.Inc() + block, err := a.backend.BlockNumber(ctx) + if err != nil { + a.metrics.BackendErrors.Inc() + a.logger.Error(err, "getting block number from chain backend") + return + } - phaseEvents.Publish(currentPhase) + changed, phase := currentPhase(ctx, phase, block) + if !changed { + return + } + select { + case phaseC <- phaseAndBlock{phase, block}: + case <-ctx.Done(): } +} + +func (a *Agent) handlePhase(phase PhaseType, quit chan struct{}, blockNumber uint64) { + round := blockNumber / blocksPerRound + handler := a.handlers[phase] ctx, cancel := context.WithCancel(context.Background()) + defer cancel() go func() { - <-a.quit - cancel() - }() - - // manually invoke phaseCheck initially in order to set initial data asap - phaseCheck(ctx) - - phaseCheckInterval := blockTime - // optimization, we do not need to check the phase change at every new block - if blocksPerPhase > 10 { - phaseCheckInterval = blockTime * 5 - } - - for { select { + case <-quit: + cancel() case <-ctx.Done(): - return - case <-time.After(phaseCheckInterval): - phaseCheck(ctx) } + }() + err := handler(ctx, round) + if err != nil { + a.logger.Error(err, "phase failed", "phase", phase, "round", round) } } func (a *Agent) handleCommit(ctx context.Context, round uint64) error { - // commit event handler has to be guarded with lock to avoid - // race conditions when handler is triggered again from sample phase - a.commitLock.Lock() - defer a.commitLock.Unlock() if _, exists := a.state.CommitKey(round); exists { // already committed on this round, phase is skipped @@ -269,7 +245,7 @@ func (a *Agent) handleCommit(ctx context.Context, round uint64) error { } // the sample has to come from previous round to be able to commit it - sample, exists := a.state.SampleData(round - 1) + sample, exists := a.state.Data(round - 1) if !exists { // In absence of sample, phase is skipped return nil @@ -294,7 +270,7 @@ func (a *Agent) handleReveal(ctx context.Context, round uint64) error { } // reveal requires sample from previous round - sample, exists := a.state.SampleData(round - 1) + sample, exists := a.state.Data(round - 1) if !exists { // Sample must have been saved so far return fmt.Errorf("sample not found in reveal phase") @@ -302,8 +278,8 @@ func (a *Agent) handleReveal(ctx context.Context, round uint64) error { a.metrics.RevealPhase.Inc() - rsh := sample.ReserveSampleHash.Bytes() - txHash, err := a.contract.Reveal(ctx, sample.StorageRadius, rsh, commitKey) + rsh := sample.Hash.Bytes() + txHash, err := a.contract.Reveal(ctx, sample.Depth, rsh, commitKey) if err != nil { a.metrics.ErrReveal.Inc() return err @@ -353,17 +329,18 @@ func (a *Agent) handleClaim(ctx context.Context, round uint64) error { a.logger.Info("could not set balance", "err", err) } - sampleData, exists := a.state.SampleData(round - 1) + sampleData, exists := a.state.Data(round - 1) if !exists { return fmt.Errorf("sample not found") } - anchor2, err := a.contract.ReserveSalt(ctx) + anchor, err := a.contract.ReserveSalt(ctx) if err != nil { a.logger.Info("failed getting anchor after second reveal", "err", err) } - proofs, err := makeInclusionProofs(sampleData.ReserveSampleItems, sampleData.Anchor1, anchor2) + proofs, err := sampler.MakeProofs(sampleData, anchor) + if err != nil { return fmt.Errorf("making inclusion proofs: %w", err) } @@ -388,22 +365,28 @@ func (a *Agent) handleClaim(ctx context.Context, round uint64) error { return nil } -func (a *Agent) handleSample(ctx context.Context, round uint64) (bool, error) { - storageRadius := a.store.StorageRadius() +func (a *Agent) isPlaying(ctx context.Context, round uint64) (bool, uint8, error) { + isFrozen, err := a.redistributionStatuser.IsOverlayFrozen(ctx, round*blocksPerRound) + if err != nil { + a.logger.Error(err, "error checking if stake is frozen") + } else { + a.state.SetFrozen(isFrozen, round) + } if a.state.IsFrozen() { a.logger.Info("skipping round because node is frozen") - return false, nil + return false, 0, nil } - isPlaying, err := a.contract.IsPlaying(ctx, storageRadius) + depth := a.sampler.StorageRadius() + isPlaying, err := a.contract.IsPlaying(ctx, depth) if err != nil { a.metrics.ErrCheckIsPlaying.Inc() - return false, err + return false, 0, err } if !isPlaying { a.logger.Info("not playing in this round") - return false, nil + return false, 0, nil } a.state.SetLastSelectedRound(round + 1) a.metrics.NeighborhoodSelected.Inc() @@ -411,92 +394,50 @@ func (a *Agent) handleSample(ctx context.Context, round uint64) (bool, error) { if !a.state.IsFullySynced() { a.logger.Info("skipping round because node is not fully synced") - return false, nil + return false, 0, nil } if !a.state.IsHealthy() { a.logger.Info("skipping round because node is unhealhy", "round", round) - return false, nil + return false, 0, nil } _, hasFunds, err := a.HasEnoughFundsToPlay(ctx) if err != nil { - return false, fmt.Errorf("has enough funds to play: %w", err) - } else if !hasFunds { + return false, 0, fmt.Errorf("has enough funds to play: %w", err) + } + if !hasFunds { a.logger.Info("insufficient funds to play in next round", "round", round) a.metrics.InsufficientFundsToPlay.Inc() - return false, nil + return false, 0, nil } - - now := time.Now() - sample, err := a.makeSample(ctx, storageRadius) - if err != nil { - return false, err - } - dur := time.Since(now) - a.metrics.SampleDuration.Set(dur.Seconds()) - - a.logger.Info("produced sample", "hash", sample.ReserveSampleHash, "radius", sample.StorageRadius, "round", round) - - a.state.SetSampleData(round, sample, dur) - - return true, nil + return true, depth, nil } -func (a *Agent) makeSample(ctx context.Context, storageRadius uint8) (SampleData, error) { - salt, err := a.contract.ReserveSalt(ctx) - if err != nil { - return SampleData{}, err - } - - timeLimiter, err := a.getPreviousRoundTime(ctx) - if err != nil { - return SampleData{}, err +func (a *Agent) handleSample(ctx context.Context, round uint64) error { + isPlaying, depth, err := a.isPlaying(ctx, round) + if !isPlaying { + return nil } - rSample, err := a.store.ReserveSample(ctx, salt, storageRadius, uint64(timeLimiter), a.minBatchBalance()) + salt, err := a.contract.ReserveSalt(ctx) if err != nil { - return SampleData{}, err + return err } - sampleHash, err := sampleHash(rSample.Items) + sample, err := a.sampler.MakeSample(ctx, salt, depth, round) if err != nil { - return SampleData{}, err + return err } - sample := SampleData{ - Anchor1: salt, - ReserveSampleItems: rSample.Items, - ReserveSampleHash: sampleHash, - StorageRadius: storageRadius, - } + a.logger.Info("produced sample", "hash", sample.Hash, "depth", sample.Depth, "round", round) - return sample, nil -} + a.state.SetData(round, sample) -func (a *Agent) minBatchBalance() *big.Int { - cs := a.chainStateGetter.GetChainState() - nextRoundBlockNumber := ((a.state.currentBlock() / a.blocksPerRound) + 2) * a.blocksPerRound - difference := nextRoundBlockNumber - cs.Block - minBalance := new(big.Int).Add(cs.TotalAmount, new(big.Int).Mul(cs.CurrentPrice, big.NewInt(int64(difference)))) - - return minBalance -} - -func (a *Agent) getPreviousRoundTime(ctx context.Context) (time.Duration, error) { - previousRoundBlockNumber := ((a.state.currentBlock() / a.blocksPerRound) - 1) * a.blocksPerRound - - a.metrics.BackendCalls.Inc() - timeLimiterBlock, err := a.backend.HeaderByNumber(ctx, new(big.Int).SetUint64(previousRoundBlockNumber)) - if err != nil { - a.metrics.BackendErrors.Inc() - return 0, err - } - - return time.Duration(timeLimiterBlock.Time) * time.Second / time.Nanosecond, nil + return nil } -func (a *Agent) commit(ctx context.Context, sample SampleData, round uint64) error { +func (a *Agent) commit(ctx context.Context, sample sampler.Data, round uint64) error { a.metrics.CommitPhase.Inc() key := make([]byte, swarm.HashSize) @@ -504,8 +445,7 @@ func (a *Agent) commit(ctx context.Context, sample SampleData, round uint64) err return err } - rsh := sample.ReserveSampleHash.Bytes() - obfuscatedHash, err := a.wrapCommit(sample.StorageRadius, rsh, key) + obfuscatedHash, err := a.wrapCommit(sample.Depth, sample.Hash.Bytes(), key) if err != nil { return err } @@ -525,24 +465,17 @@ func (a *Agent) commit(ctx context.Context, sample SampleData, round uint64) err func (a *Agent) Close() error { close(a.quit) - stopped := make(chan struct{}) - go func() { - a.wg.Wait() - close(stopped) - }() - select { - case <-stopped: + case <-a.stopped: return nil case <-time.After(5 * time.Second): return errors.New("stopping incentives with ongoing worker goroutine") } } -func (a *Agent) wrapCommit(storageRadius uint8, sample []byte, key []byte) ([]byte, error) { - storageRadiusByte := []byte{storageRadius} - - data := append(a.overlay.Bytes(), storageRadiusByte...) +func (a *Agent) wrapCommit(depth uint8, sample []byte, key []byte) ([]byte, error) { + data := append([]byte{}, a.overlay.Bytes()...) + data = append(data, depth) data = append(data, sample...) data = append(data, key...) @@ -554,48 +487,6 @@ func (a *Agent) Status() (*Status, error) { return a.state.Status() } -type SampleWithProofs struct { - Hash swarm.Address `json:"hash"` - Proofs redistribution.ChunkInclusionProofs `json:"proofs"` - Duration time.Duration `json:"duration"` -} - -// SampleWithProofs is called only by rchash API -func (a *Agent) SampleWithProofs( - ctx context.Context, - anchor1 []byte, - anchor2 []byte, - storageRadius uint8, -) (SampleWithProofs, error) { - sampleStartTime := time.Now() - - timeLimiter, err := a.getPreviousRoundTime(ctx) - if err != nil { - return SampleWithProofs{}, err - } - - rSample, err := a.store.ReserveSample(ctx, anchor1, storageRadius, uint64(timeLimiter), a.minBatchBalance()) - if err != nil { - return SampleWithProofs{}, err - } - - hash, err := sampleHash(rSample.Items) - if err != nil { - return SampleWithProofs{}, fmt.Errorf("sample hash: %w", err) - } - - proofs, err := makeInclusionProofs(rSample.Items, anchor1, anchor2) - if err != nil { - return SampleWithProofs{}, fmt.Errorf("make proofs: %w", err) - } - - return SampleWithProofs{ - Hash: hash, - Proofs: proofs, - Duration: time.Since(sampleStartTime), - }, nil -} - func (a *Agent) HasEnoughFundsToPlay(ctx context.Context) (*big.Int, bool, error) { balance, err := a.backend.BalanceAt(ctx, a.state.ethAddress, nil) if err != nil { @@ -609,6 +500,5 @@ func (a *Agent) HasEnoughFundsToPlay(ctx context.Context) (*big.Int, bool, error avgTxFee := new(big.Int).Mul(big.NewInt(avgTxGas), price) minBalance := new(big.Int).Mul(avgTxFee, big.NewInt(minTxCountToCover)) - return minBalance, balance.Cmp(minBalance) >= 1, nil } diff --git a/pkg/storageincentives/agent_test.go b/pkg/storageincentives/agent_test.go index 70ec3193975..d0951479789 100644 --- a/pkg/storageincentives/agent_test.go +++ b/pkg/storageincentives/agent_test.go @@ -21,9 +21,8 @@ import ( statestore "github.com/ethersphere/bee/pkg/statestore/mock" "github.com/ethersphere/bee/pkg/storageincentives" "github.com/ethersphere/bee/pkg/storageincentives/redistribution" + "github.com/ethersphere/bee/pkg/storageincentives/sampler" "github.com/ethersphere/bee/pkg/storageincentives/staking/mock" - "github.com/ethersphere/bee/pkg/storer" - resMock "github.com/ethersphere/bee/pkg/storer/mock" "github.com/ethersphere/bee/pkg/swarm" transactionmock "github.com/ethersphere/bee/pkg/transaction/mock" "github.com/ethersphere/bee/pkg/util/testutil" @@ -34,55 +33,43 @@ func TestAgent(t *testing.T) { bigBalance := big.NewInt(4_000_000_000) tests := []struct { - name string - blocksPerRound uint64 - blocksPerPhase uint64 - incrementBy uint64 - limit uint64 - expectedCalls bool - balance *big.Int + name string + incrementBy uint64 + limit uint64 + expectedCalls bool + balance *big.Int }{{ - name: "3 blocks per phase, same block number returns twice", - blocksPerRound: 9, - blocksPerPhase: 3, - incrementBy: 1, - expectedCalls: true, - limit: 108, // computed with blocksPerRound * (exptectedCalls + 2) - balance: bigBalance, + name: "3 blocks per phase, same block number returns twice", + incrementBy: 1, + expectedCalls: true, + limit: 108, // computed with blocksPerRound * (exptectedCalls + 2) + balance: bigBalance, }, { - name: "3 blocks per phase, block number returns every block", - blocksPerRound: 9, - blocksPerPhase: 3, - incrementBy: 1, - expectedCalls: true, - limit: 108, - balance: bigBalance, + name: "3 blocks per phase, block number returns every block", + incrementBy: 1, + expectedCalls: true, + limit: 108, + balance: bigBalance, }, { - name: "no expected calls - block number returns late after each phase", - blocksPerRound: 9, - blocksPerPhase: 3, - incrementBy: 6, - expectedCalls: false, - limit: 108, - balance: bigBalance, + name: "no expected calls - block number returns late after each phase", + incrementBy: 6, + expectedCalls: false, + limit: 108, + balance: bigBalance, }, { - name: "4 blocks per phase, block number returns every other block", - blocksPerRound: 12, - blocksPerPhase: 4, - incrementBy: 2, - expectedCalls: true, - limit: 144, - balance: bigBalance, + name: "4 blocks per phase, block number returns every other block", + incrementBy: 2, + expectedCalls: true, + limit: 144, + balance: bigBalance, }, { // This test case is based on previous, but this time agent will not have enough // balance to participate in the game so no calls are going to be made. - name: "no expected calls - insufficient balance", - blocksPerRound: 12, - blocksPerPhase: 4, - incrementBy: 2, - expectedCalls: false, - limit: 144, - balance: big.NewInt(0), + name: "no expected calls - insufficient balance", + incrementBy: 2, + expectedCalls: false, + limit: 144, + balance: big.NewInt(0), }, } @@ -103,12 +90,11 @@ func TestAgent(t *testing.T) { } }, incrementBy: tc.incrementBy, - block: tc.blocksPerRound, balance: tc.balance, } contract := &mockContract{} - service, _ := createService(t, addr, backend, contract, tc.blocksPerRound, tc.blocksPerPhase) + service, _ := createService(t, addr, backend, contract) testutil.CleanupCloser(t, service) <-wait @@ -153,10 +139,9 @@ func TestAgent(t *testing.T) { func createService( t *testing.T, addr swarm.Address, - backend storageincentives.ChainBackend, + backend postage.ChainBackend, contract redistribution.Contract, - blocksPerRound uint64, - blocksPerPhase uint64) (*storageincentives.Agent, error) { +) (*storageincentives.Agent, error) { t.Helper() postageContract := contractMock.New(contractMock.WithExpiresBatchesFunc(func(context.Context) error { @@ -167,29 +152,22 @@ func createService( return false, nil })) - reserve := resMock.NewReserve( - resMock.WithRadius(0), - resMock.WithSample(storer.RandSample(t, nil)), - ) - + var sampler *sampler.Sampler return storageincentives.New( - addr, common.Address{}, + log.Noop, + addr, + common.Address{}, + time.Millisecond*100, backend, contract, postageContract, stakingContract, - reserve, - func() bool { return true }, - time.Millisecond*100, - blocksPerRound, - blocksPerPhase, + sampler, statestore.NewStateStore(), - &postage.NoOpBatchStore{}, erc20mock.New(), transactionmock.New(), - &mockHealth{}, - log.Noop, ) + } type mockchainBackend struct { @@ -276,7 +254,7 @@ func (m *mockContract) IsWinner(context.Context) (bool, error) { return false, nil } -func (m *mockContract) Claim(context.Context, redistribution.ChunkInclusionProofs) (common.Hash, error) { +func (m *mockContract) Claim(context.Context, []redistribution.Proof) (common.Hash, error) { m.mtx.Lock() defer m.mtx.Unlock() m.callsList = append(m.callsList, claimCall) @@ -296,7 +274,3 @@ func (m *mockContract) Reveal(context.Context, uint8, []byte, []byte) (common.Ha m.callsList = append(m.callsList, revealCall) return common.Hash{}, nil } - -type mockHealth struct{} - -func (m *mockHealth) IsHealthy() bool { return true } diff --git a/pkg/storageincentives/events.go b/pkg/storageincentives/events.go index f8640405abc..bc5ede4676d 100644 --- a/pkg/storageincentives/events.go +++ b/pkg/storageincentives/events.go @@ -5,7 +5,6 @@ package storageincentives import ( - "context" "sync" ) @@ -16,6 +15,7 @@ const ( reveal claim sample + phaseCount ) func (p PhaseType) String() string { @@ -34,42 +34,34 @@ func (p PhaseType) String() string { } type events struct { - mtx sync.Mutex - ev map[PhaseType]*event + mtx sync.Mutex + events [phaseCount]event + quit [phaseCount]chan struct{} } -type event struct { - funcs []func(context.Context) - ctx context.Context - cancel context.CancelFunc -} +type event []func(quit chan struct{}, blockNumber uint64) -func newEvents() *events { - return &events{ - ev: make(map[PhaseType]*event), - } +func NewEvents() *events { + return &events{} } -func (e *events) On(phase PhaseType, f func(context.Context)) { +func (e *events) On(phase PhaseType, f func(quit chan struct{}, blockNumber uint64)) { e.mtx.Lock() defer e.mtx.Unlock() - if _, ok := e.ev[phase]; !ok { - ctx, cancel := context.WithCancel(context.Background()) - e.ev[phase] = &event{ctx: ctx, cancel: cancel} + fs := e.events[phase] + if len(fs) == 0 { + e.quit[phase] = make(chan struct{}) } - - e.ev[phase].funcs = append(e.ev[phase].funcs, f) + e.events[phase] = append(fs, f) } -func (e *events) Publish(phase PhaseType) { +func (e *events) Publish(phase PhaseType, blockNumber uint64) { e.mtx.Lock() defer e.mtx.Unlock() - if ev, ok := e.ev[phase]; ok { - for _, v := range ev.funcs { - go v(ev.ctx) - } + for _, v := range e.events[phase] { + go v(e.quit[phase], blockNumber) } } @@ -78,12 +70,8 @@ func (e *events) Cancel(phases ...PhaseType) { defer e.mtx.Unlock() for _, phase := range phases { - if ev, ok := e.ev[phase]; ok { - ev.cancel() - ctx, cancel := context.WithCancel(context.Background()) - ev.ctx = ctx - ev.cancel = cancel - } + close(e.quit[phase]) + e.quit[phase] = make(chan struct{}) } } @@ -91,8 +79,7 @@ func (e *events) Close() { e.mtx.Lock() defer e.mtx.Unlock() - for k, ev := range e.ev { - ev.cancel() - delete(e.ev, k) + for _, ch := range e.quit { + close(ch) } } diff --git a/pkg/storageincentives/events_test.go b/pkg/storageincentives/events_test.go index 6c40fe8fc2e..3c0ec8f3f11 100644 --- a/pkg/storageincentives/events_test.go +++ b/pkg/storageincentives/events_test.go @@ -5,7 +5,6 @@ package storageincentives_test import ( - "context" "testing" "time" @@ -21,33 +20,31 @@ func TestClose(t *testing.T) { done2 := make(chan struct{}) done3 := make(chan struct{}) - ev.On(1, func(ctx context.Context) { - <-ctx.Done() + ev.On(1, func(quit chan struct{}, blockNumber uint64) { + <-quit close(done1) }) - ev.On(1, func(ctx context.Context) { - <-ctx.Done() + ev.On(1, func(quit chan struct{}, blockNumber uint64) { + <-quit close(done2) }) - ev.On(2, func(ctx context.Context) { - <-ctx.Done() + ev.On(2, func(quit chan struct{}, blockNumber uint64) { + <-quit close(done3) }) - ev.Publish(1) - ev.Publish(2) + ev.Publish(1, 0) + ev.Publish(2, 0) ev.Close() - for i := 0; i < 3; i++ { + for i, c := range []chan struct{}{done1, done2, done3} { select { - case <-done1: - case <-done2: - case <-done3: + case <-c: case <-time.After(time.Second): - t.Fatal("timeout") + t.Fatalf("timeout waiting for process %d to quit", i) } } } @@ -59,35 +56,41 @@ func TestPhaseCancel(t *testing.T) { done1 := make(chan struct{}) done2 := make(chan struct{}) + done3 := make(chan struct{}) defer ev.Close() // ensure no panics occur on an empty publish - ev.Publish(0) + ev.Publish(0, 0) - ev.On(1, func(ctx context.Context) { - <-ctx.Done() + ev.On(1, func(quit chan struct{}, blockNumber uint64) { + <-quit close(done1) }) - ev.On(2, func(ctx context.Context) { - <-ctx.Done() + ev.On(2, func(quit chan struct{}, blockNumber uint64) { + <-quit close(done2) }) - ev.On(3, func(ctx context.Context) { - ev.Cancel(1, 2) + ev.On(3, func(quit chan struct{}, blockNumber uint64) { + }) - ev.Publish(1) - ev.Publish(2) - ev.Publish(3) + ev.Publish(1, 0) + ev.Publish(2, 0) + ev.Publish(3, 0) + ev.Cancel(1, 2) - for i := 0; i < 2; i++ { + for i, c := range []chan struct{}{done1, done2} { select { - case <-done1: - case <-done2: + case <-c: case <-time.After(time.Second): - t.Fatal("timeout") + t.Fatalf("timeout waiting for process %d to quit", i) } } + select { + case <-done3: + t.Fatalf("process 3 quit unexpectedly") + case <-time.After(50 * time.Millisecond): + } } diff --git a/pkg/storageincentives/export_test.go b/pkg/storageincentives/export_test.go deleted file mode 100644 index f617501ba29..00000000000 --- a/pkg/storageincentives/export_test.go +++ /dev/null @@ -1,11 +0,0 @@ -// Copyright 2022 The Swarm Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package storageincentives - -var ( - NewEvents = newEvents - SampleChunk = sampleChunk - MakeInclusionProofs = makeInclusionProofs -) diff --git a/pkg/storageincentives/proof.go b/pkg/storageincentives/proof.go deleted file mode 100644 index ccc04cd935b..00000000000 --- a/pkg/storageincentives/proof.go +++ /dev/null @@ -1,219 +0,0 @@ -// Copyright 2023 The Swarm Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package storageincentives - -import ( - "errors" - "fmt" - "hash" - "math/big" - - "github.com/ethersphere/bee/pkg/bmt" - "github.com/ethersphere/bee/pkg/bmtpool" - "github.com/ethersphere/bee/pkg/cac" - "github.com/ethersphere/bee/pkg/soc" - "github.com/ethersphere/bee/pkg/storageincentives/redistribution" - storer "github.com/ethersphere/bee/pkg/storer" - "github.com/ethersphere/bee/pkg/swarm" -) - -var errProofCreation = errors.New("reserve commitment hasher: failure in proof creation") - -// spanOffset returns the byte index of chunkdata where the spansize starts -func spanOffset(sampleItem storer.SampleItem) uint8 { - ch := swarm.NewChunk(sampleItem.ChunkAddress, sampleItem.ChunkData) - if soc.Valid(ch) { - return swarm.HashSize + swarm.SocSignatureSize - } - - return 0 -} - -// makeInclusionProofs creates transaction data for claim method. -// In the document this logic, result data, is also called Proof of entitlement (POE). -func makeInclusionProofs( - reserveSampleItems []storer.SampleItem, - anchor1 []byte, - anchor2 []byte, -) (redistribution.ChunkInclusionProofs, error) { - if len(reserveSampleItems) != storer.SampleSize { - return redistribution.ChunkInclusionProofs{}, fmt.Errorf("reserve sample items should have %d elements", storer.SampleSize) - } - if len(anchor1) == 0 { - return redistribution.ChunkInclusionProofs{}, errors.New("anchor1 is not set") - } - if len(anchor2) == 0 { - return redistribution.ChunkInclusionProofs{}, errors.New("anchor2 is not set") - } - - require3 := storer.SampleSize - 1 - require1 := new(big.Int).Mod(new(big.Int).SetBytes(anchor2), big.NewInt(int64(require3))).Uint64() - require2 := new(big.Int).Mod(new(big.Int).SetBytes(anchor2), big.NewInt(int64(require3-1))).Uint64() - if require2 >= require1 { - require2++ - } - - prefixHasherFactory := func() hash.Hash { - return swarm.NewPrefixHasher(anchor1) - } - prefixHasherPool := bmt.NewPool(bmt.NewConf(prefixHasherFactory, swarm.BmtBranches, 8)) - - // Sample chunk proofs - rccontent := bmt.Prover{Hasher: bmtpool.Get()} - rccontent.SetHeaderInt64(swarm.HashSize * storer.SampleSize * 2) - rsc, err := sampleChunk(reserveSampleItems) - if err != nil { - return redistribution.ChunkInclusionProofs{}, errProofCreation - } - rscData := rsc.Data() - _, err = rccontent.Write(rscData[swarm.SpanSize:]) - if err != nil { - return redistribution.ChunkInclusionProofs{}, errProofCreation - } - _, err = rccontent.Hash(nil) - if err != nil { - return redistribution.ChunkInclusionProofs{}, errProofCreation - } - proof1p1 := rccontent.Proof(int(require1) * 2) - proof2p1 := rccontent.Proof(int(require2) * 2) - proofLastp1 := rccontent.Proof(require3 * 2) - bmtpool.Put(rccontent.Hasher) - - // Witness1 proofs - segmentIndex := int(new(big.Int).Mod(new(big.Int).SetBytes(anchor2), big.NewInt(int64(128))).Uint64()) - // OG chunk proof - chunk1Content := bmt.Prover{Hasher: bmtpool.Get()} - chunk1Offset := spanOffset(reserveSampleItems[require1]) - chunk1Content.SetHeader(reserveSampleItems[require1].ChunkData[chunk1Offset : chunk1Offset+swarm.SpanSize]) - chunk1ContentPayload := reserveSampleItems[require1].ChunkData[chunk1Offset+swarm.SpanSize:] - _, err = chunk1Content.Write(chunk1ContentPayload) - if err != nil { - return redistribution.ChunkInclusionProofs{}, errProofCreation - } - _, err = chunk1Content.Hash(nil) - if err != nil { - return redistribution.ChunkInclusionProofs{}, errProofCreation - } - proof1p2 := chunk1Content.Proof(segmentIndex) - // TR chunk proof - chunk1TrContent := bmt.Prover{Hasher: prefixHasherPool.Get()} - chunk1TrContent.SetHeader(reserveSampleItems[require1].ChunkData[chunk1Offset : chunk1Offset+swarm.SpanSize]) - _, err = chunk1TrContent.Write(chunk1ContentPayload) - if err != nil { - return redistribution.ChunkInclusionProofs{}, errProofCreation - } - _, err = chunk1TrContent.Hash(nil) - if err != nil { - return redistribution.ChunkInclusionProofs{}, errProofCreation - } - proof1p3 := chunk1TrContent.Proof(segmentIndex) - // cleanup - bmtpool.Put(chunk1Content.Hasher) - prefixHasherPool.Put(chunk1TrContent.Hasher) - - // Witness2 proofs - // OG Chunk proof - chunk2Offset := spanOffset(reserveSampleItems[require2]) - chunk2Content := bmt.Prover{Hasher: bmtpool.Get()} - chunk2ContentPayload := reserveSampleItems[require2].ChunkData[chunk2Offset+swarm.SpanSize:] - chunk2Content.SetHeader(reserveSampleItems[require2].ChunkData[chunk2Offset : chunk2Offset+swarm.SpanSize]) - _, err = chunk2Content.Write(chunk2ContentPayload) - if err != nil { - return redistribution.ChunkInclusionProofs{}, errProofCreation - } - _, err = chunk2Content.Hash(nil) - if err != nil { - return redistribution.ChunkInclusionProofs{}, errProofCreation - } - proof2p2 := chunk2Content.Proof(segmentIndex) - // TR Chunk proof - chunk2TrContent := bmt.Prover{Hasher: prefixHasherPool.Get()} - chunk2TrContent.SetHeader(reserveSampleItems[require2].ChunkData[chunk2Offset : chunk2Offset+swarm.SpanSize]) - _, err = chunk2TrContent.Write(chunk2ContentPayload) - if err != nil { - return redistribution.ChunkInclusionProofs{}, errProofCreation - } - _, err = chunk2TrContent.Hash(nil) - if err != nil { - return redistribution.ChunkInclusionProofs{}, errProofCreation - } - proof2p3 := chunk2TrContent.Proof(segmentIndex) - // cleanup - bmtpool.Put(chunk2Content.Hasher) - prefixHasherPool.Put(chunk2TrContent.Hasher) - - // Witness3 proofs - // OG Chunk proof - chunkLastOffset := spanOffset(reserveSampleItems[require3]) - chunkLastContent := bmt.Prover{Hasher: bmtpool.Get()} - chunkLastContent.SetHeader(reserveSampleItems[require3].ChunkData[chunkLastOffset : chunkLastOffset+swarm.SpanSize]) - chunkLastContentPayload := reserveSampleItems[require3].ChunkData[chunkLastOffset+swarm.SpanSize:] - _, err = chunkLastContent.Write(chunkLastContentPayload) - if err != nil { - return redistribution.ChunkInclusionProofs{}, errProofCreation - } - _, err = chunkLastContent.Hash(nil) - if err != nil { - return redistribution.ChunkInclusionProofs{}, errProofCreation - } - proofLastp2 := chunkLastContent.Proof(segmentIndex) - // TR Chunk Proof - chunkLastTrContent := bmt.Prover{Hasher: prefixHasherPool.Get()} - chunkLastTrContent.SetHeader(reserveSampleItems[require3].ChunkData[chunkLastOffset : chunkLastOffset+swarm.SpanSize]) - _, err = chunkLastTrContent.Write(chunkLastContentPayload) - if err != nil { - return redistribution.ChunkInclusionProofs{}, errProofCreation - } - _, err = chunkLastTrContent.Hash(nil) - if err != nil { - return redistribution.ChunkInclusionProofs{}, errProofCreation - } - proofLastp3 := chunkLastTrContent.Proof(segmentIndex) - // cleanup - bmtpool.Put(chunkLastContent.Hasher) - prefixHasherPool.Put(chunkLastTrContent.Hasher) - - // map to output and add SOC related data if it is necessary - A, err := redistribution.NewChunkInclusionProof(proof1p1, proof1p2, proof1p3, reserveSampleItems[require1]) - if err != nil { - return redistribution.ChunkInclusionProofs{}, err - } - B, err := redistribution.NewChunkInclusionProof(proof2p1, proof2p2, proof2p3, reserveSampleItems[require2]) - if err != nil { - return redistribution.ChunkInclusionProofs{}, err - } - C, err := redistribution.NewChunkInclusionProof(proofLastp1, proofLastp2, proofLastp3, reserveSampleItems[require3]) - if err != nil { - return redistribution.ChunkInclusionProofs{}, err - } - return redistribution.ChunkInclusionProofs{ - A: A, - B: B, - C: C, - }, nil -} - -func sampleChunk(items []storer.SampleItem) (swarm.Chunk, error) { - contentSize := len(items) * 2 * swarm.HashSize - - pos := 0 - content := make([]byte, contentSize) - for _, s := range items { - copy(content[pos:], s.ChunkAddress.Bytes()) - pos += swarm.HashSize - copy(content[pos:], s.TransformedAddress.Bytes()) - pos += swarm.HashSize - } - - return cac.New(content) -} - -func sampleHash(items []storer.SampleItem) (swarm.Address, error) { - ch, err := sampleChunk(items) - if err != nil { - return swarm.ZeroAddress, err - } - return ch.Address(), nil -} diff --git a/pkg/storageincentives/proof_test.go b/pkg/storageincentives/proof_test.go deleted file mode 100644 index 9ee3745e1af..00000000000 --- a/pkg/storageincentives/proof_test.go +++ /dev/null @@ -1,203 +0,0 @@ -// Copyright 2023 The Swarm Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package storageincentives_test - -import ( - "bytes" - _ "embed" - "encoding/json" - "fmt" - "math/big" - "testing" - - "github.com/ethersphere/bee/pkg/cac" - "github.com/ethersphere/bee/pkg/crypto" - "github.com/ethersphere/bee/pkg/postage" - postagetesting "github.com/ethersphere/bee/pkg/postage/testing" - "github.com/ethersphere/bee/pkg/soc" - "github.com/ethersphere/bee/pkg/storageincentives" - "github.com/ethersphere/bee/pkg/storageincentives/redistribution" - storer "github.com/ethersphere/bee/pkg/storer" - "github.com/ethersphere/bee/pkg/swarm" - "github.com/ethersphere/bee/pkg/util/testutil" - "github.com/google/go-cmp/cmp" -) - -// Test asserts valid case for MakeInclusionProofs. -func TestMakeInclusionProofs(t *testing.T) { - t.Parallel() - - anchor := testutil.RandBytes(t, 1) - sample := storer.RandSample(t, anchor) - - _, err := storageincentives.MakeInclusionProofs(sample.Items, anchor, anchor) - if err != nil { - t.Fatal(err) - } -} - -//go:embed testdata/inclusion-proofs.json -var testData []byte - -// Test asserts that MakeInclusionProofs will generate the same -// output for given sample. -func TestMakeInclusionProofsRegression(t *testing.T) { - t.Parallel() - - const sampleSize = 16 - - keyRaw := `00000000000000000000000000000000` - privKey, err := crypto.DecodeSecp256k1PrivateKey([]byte(keyRaw)) - if err != nil { - t.Fatal(err) - } - signer := crypto.NewDefaultSigner(privKey) - - stampID, _ := crypto.LegacyKeccak256([]byte("The Inverted Jenny")) - index := []byte{0, 0, 0, 0, 0, 8, 3, 3} - timestamp := []byte{0, 0, 0, 0, 0, 3, 3, 8} - stamper := func(addr swarm.Address) *postage.Stamp { - sig := postagetesting.MustNewValidSignature(signer, addr, stampID, index, timestamp) - return postage.NewStamp(stampID, index, timestamp, sig) - } - - anchor1 := big.NewInt(100).Bytes() - anchor2 := big.NewInt(30).Bytes() // this anchor will pick chunks 3, 6, 15 - - // generate chunks that will be used as sample - sampleChunks := make([]swarm.Chunk, 0, sampleSize) - for i := 0; i < sampleSize; i++ { - ch, err := cac.New([]byte(fmt.Sprintf("Unstoppable data! Chunk #%d", i+1))) - if err != nil { - t.Fatal(err) - } - - if i%2 == 0 { - id, err := crypto.LegacyKeccak256([]byte(fmt.Sprintf("ID #%d", i+1))) - if err != nil { - t.Fatal(err) - } - - socCh, err := soc.New(id, ch).Sign(signer) - if err != nil { - t.Fatal(err) - } - - ch = socCh - } - - ch = ch.WithStamp(stamper(ch.Address())) - - sampleChunks = append(sampleChunks, ch) - } - - // make sample from chunks - sample, err := storer.MakeSampleUsingChunks(sampleChunks, anchor1) - if err != nil { - t.Fatal(err) - } - - // assert that sample chunk hash/address does not change - sch, err := storageincentives.SampleChunk(sample.Items) - if err != nil { - t.Fatal(err) - } - if want := swarm.MustParseHexAddress("193bbea3dd0656d813c2c1e27b821f141286bbe6ab0dbf8e26fc7dd491e8f921"); !sch.Address().Equal(want) { - t.Fatalf("expecting sample chunk address %v, got %v", want, sch.Address()) - } - - // assert that inclusion proofs values does not change - proofs, err := storageincentives.MakeInclusionProofs(sample.Items, anchor1, anchor2) - if err != nil { - t.Fatal(err) - } - - var expectedProofs redistribution.ChunkInclusionProofs - - err = json.Unmarshal(testData, &expectedProofs) - if err != nil { - t.Fatal(err) - } - - if diff := cmp.Diff(proofs, expectedProofs); diff != "" { - t.Fatalf("unexpected inclusion proofs (-want +have):\n%s", diff) - } -} - -// Test asserts cases when MakeInclusionProofs should return error. -func TestMakeInclusionProofsExpectedError(t *testing.T) { - t.Parallel() - - t.Run("invalid sample length", func(t *testing.T) { - anchor := testutil.RandBytes(t, 8) - sample := storer.RandSample(t, anchor) - - _, err := storageincentives.MakeInclusionProofs(sample.Items[:1], anchor, anchor) - if err == nil { - t.Fatal("expecting error") - } - }) - - t.Run("empty anchor", func(t *testing.T) { - sample := storer.RandSample(t, []byte{}) - - _, err := storageincentives.MakeInclusionProofs(sample.Items[:1], []byte{}, []byte{}) - if err == nil { - t.Fatal("expecting error") - } - }) -} - -// Tests asserts that creating sample chunk is valid for all lengths [1-MaxSampleSize] -func TestSampleChunk(t *testing.T) { - t.Parallel() - - sample := storer.RandSample(t, nil) - - for i := 0; i < len(sample.Items); i++ { - items := sample.Items[:i] - - chunk, err := storageincentives.SampleChunk(items) - if err != nil { - t.Fatal(err) - } - - data := chunk.Data()[swarm.SpanSize:] - pos := 0 - for _, item := range items { - if !bytes.Equal(data[pos:pos+swarm.HashSize], item.ChunkAddress.Bytes()) { - t.Error("expected chunk address") - } - pos += swarm.HashSize - - if !bytes.Equal(data[pos:pos+swarm.HashSize], item.TransformedAddress.Bytes()) { - t.Error("expected transformed address") - } - pos += swarm.HashSize - } - - if !chunk.Address().IsValidNonEmpty() { - t.Error("address shouldn't be empty") - } - } -} - -// Tests asserts that creating sample chunk should fail because it will exceed -// capacity of chunk data. -func TestSampleChunkExpectedError(t *testing.T) { - t.Parallel() - - sampleItem := storer.RandSample(t, nil).Items[0] - - items := make([]storer.SampleItem, 65) - for i := range items { - items[i] = sampleItem - } - - _, err := storageincentives.SampleChunk(items) - if err == nil { - t.Fatal("expecting error") - } -} diff --git a/pkg/storageincentives/redistribution/abi_types.go b/pkg/storageincentives/redistribution/abi_types.go new file mode 100644 index 00000000000..1b3cfb2d8b9 --- /dev/null +++ b/pkg/storageincentives/redistribution/abi_types.go @@ -0,0 +1,87 @@ +// Copyright 2023 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Used for inclusion proof utilities + +package redistribution + +import ( + "encoding/binary" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethersphere/bee/pkg/bmt" + "github.com/ethersphere/bee/pkg/soc" + "github.com/ethersphere/bee/pkg/swarm" +) + +// Proof structure must exactly match +// corresponding structure (of the same name) in Redistribution.sol smart contract. +// github.com/ethersphere/storage-incentives/blob/master/src/Redistribution.sol +type Proof struct { + Sisters []common.Hash + Data common.Hash + Sisters2 []common.Hash + Data2 common.Hash + Sisters3 []common.Hash + ChunkSpan uint64 + PostageProof PostageProof + SocProof []SOCProof +} + +// SOCProof structure must exactly match +// corresponding structure (of the same name) in Redistribution.sol smart contract. +type PostageProof struct { + Signature []byte + BatchId common.Hash + Index uint64 + TimeStamp uint64 +} + +// SOCProof structure must exactly match +// corresponding structure (of the same name) in Redistribution.sol smart contract. +type SOCProof struct { + Signer []byte + Signature []byte + Identifier common.Hash + ChunkAddr common.Hash +} + +func bytes32(bs ...[]byte) []common.Hash { + bbs := make([]common.Hash, len(bs)) + for i, b := range bs { + var bb [32]byte + copy(bb[:], b) + bbs[i] = common.Hash(bb) + } + return bbs +} + +// NewProof transforms arguments to abi-compatible Proof object +func NewProof(wp1, wp2, wp3 bmt.Proof, stamp swarm.Stamp, sch *soc.SOC) Proof { + var socProof []SOCProof + if sch == nil { + socProof = []SOCProof{{ + Signer: sch.OwnerAddress(), + Signature: sch.Signature(), + Identifier: bytes32(sch.ID())[0], + ChunkAddr: bytes32(sch.WrappedChunk().Address().Bytes())[0], + }} + } + + return Proof{ + Sisters: bytes32(wp1.Sisters...), + Data: bytes32(wp1.Data)[0], + Sisters2: bytes32(wp2.Sisters...), + Data2: bytes32(wp2.Data)[0], + Sisters3: bytes32(wp3.Sisters...), + ChunkSpan: binary.LittleEndian.Uint64(wp2.Span[:swarm.SpanSize]), // should be uint64 on the other size; copied from pkg/api/bytes.go + PostageProof: PostageProof{ + Signature: stamp.Sig(), + BatchId: bytes32(stamp.BatchID())[0], + Index: binary.BigEndian.Uint64(stamp.Index()), + TimeStamp: binary.BigEndian.Uint64(stamp.Timestamp()), + }, + SocProof: socProof, + } +} diff --git a/pkg/storageincentives/redistribution/abi_types_test.go b/pkg/storageincentives/redistribution/abi_types_test.go new file mode 100644 index 00000000000..0ad3e6146c5 --- /dev/null +++ b/pkg/storageincentives/redistribution/abi_types_test.go @@ -0,0 +1,3 @@ +package redistribution_test + +// must include tests for just the contract calldata abi packaging \ No newline at end of file diff --git a/pkg/storageincentives/redistribution/inclusionproof.go b/pkg/storageincentives/redistribution/inclusionproof.go deleted file mode 100644 index b786d1d3001..00000000000 --- a/pkg/storageincentives/redistribution/inclusionproof.go +++ /dev/null @@ -1,105 +0,0 @@ -// Copyright 2023 The Swarm Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package redistribution - -import ( - "encoding/binary" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethersphere/bee/pkg/bmt" - "github.com/ethersphere/bee/pkg/soc" - "github.com/ethersphere/bee/pkg/storer" - "github.com/ethersphere/bee/pkg/swarm" -) - -type ChunkInclusionProofs struct { - A ChunkInclusionProof `json:"proof1"` - B ChunkInclusionProof `json:"proof2"` - C ChunkInclusionProof `json:"proofLast"` -} - -// ChunkInclusionProof structure must exactly match -// corresponding structure (of the same name) in Redistribution.sol smart contract. -// github.com/ethersphere/storage-incentives/blob/ph_f2/src/Redistribution.sol -// github.com/ethersphere/storage-incentives/blob/master/src/Redistribution.sol (when merged to master) -type ChunkInclusionProof struct { - ProofSegments []common.Hash `json:"proofSegments"` - ProveSegment common.Hash `json:"proveSegment"` - ProofSegments2 []common.Hash `json:"proofSegments2"` - ProveSegment2 common.Hash `json:"proveSegment2"` - ChunkSpan uint64 `json:"chunkSpan"` - ProofSegments3 []common.Hash `json:"proofSegments3"` - PostageProof PostageProof `json:"postageProof"` - SocProof []SOCProof `json:"socProof"` -} - -// SOCProof structure must exactly match -// corresponding structure (of the same name) in Redistribution.sol smart contract. -type PostageProof struct { - Signature []byte `json:"signature"` - PostageId common.Hash `json:"postageId"` - Index uint64 `json:"index"` - TimeStamp uint64 `json:"timeStamp"` -} - -// SOCProof structure must exactly match -// corresponding structure (of the same name) in Redistribution.sol smart contract. -type SOCProof struct { - Signer common.Address `json:"signer"` - Signature []byte `json:"signature"` - Identifier common.Hash `json:"identifier"` - ChunkAddr common.Hash `json:"chunkAddr"` -} - -// NewChunkInclusionProof transforms arguments to ChunkInclusionProof object -func NewChunkInclusionProof(proofp1, proofp2, proofp3 bmt.Proof, sampleItem storer.SampleItem) (ChunkInclusionProof, error) { - socProof, err := makeSOCProof(sampleItem) - if err != nil { - return ChunkInclusionProof{}, err - } - - return ChunkInclusionProof{ - ProofSegments: toCommonHash(proofp1.ProofSegments), - ProveSegment: common.BytesToHash(proofp1.ProveSegment), - ProofSegments2: toCommonHash(proofp2.ProofSegments), - ProveSegment2: common.BytesToHash(proofp2.ProveSegment), - ChunkSpan: binary.LittleEndian.Uint64(proofp2.Span[:swarm.SpanSize]), // should be uint64 on the other size; copied from pkg/api/bytes.go - ProofSegments3: toCommonHash(proofp3.ProofSegments), - PostageProof: PostageProof{ - Signature: sampleItem.Stamp.Sig(), - PostageId: common.BytesToHash(sampleItem.Stamp.BatchID()), - Index: binary.BigEndian.Uint64(sampleItem.Stamp.Index()), - TimeStamp: binary.BigEndian.Uint64(sampleItem.Stamp.Timestamp()), - }, - SocProof: socProof, - }, nil -} - -func toCommonHash(hashes [][]byte) []common.Hash { - output := make([]common.Hash, len(hashes)) - for i, s := range hashes { - output[i] = common.BytesToHash(s) - } - return output -} - -func makeSOCProof(sampleItem storer.SampleItem) ([]SOCProof, error) { - ch := swarm.NewChunk(sampleItem.ChunkAddress, sampleItem.ChunkData) - if !soc.Valid(ch) { - return []SOCProof{}, nil - } - - socCh, err := soc.FromChunk(ch) - if err != nil { - return []SOCProof{}, err - } - - return []SOCProof{{ - Signer: common.BytesToAddress(socCh.OwnerAddress()), - Signature: socCh.Signature(), - Identifier: common.BytesToHash(socCh.ID()), - ChunkAddr: common.BytesToHash(socCh.WrappedChunk().Address().Bytes()), - }}, nil -} diff --git a/pkg/storageincentives/redistribution/redistribution.go b/pkg/storageincentives/redistribution/redistribution.go index 8b83537cf90..d63273434d4 100644 --- a/pkg/storageincentives/redistribution/redistribution.go +++ b/pkg/storageincentives/redistribution/redistribution.go @@ -23,7 +23,7 @@ type Contract interface { ReserveSalt(context.Context) ([]byte, error) IsPlaying(context.Context, uint8) (bool, error) IsWinner(context.Context) (bool, error) - Claim(context.Context, ChunkInclusionProofs) (common.Hash, error) + Claim(context.Context, []Proof) (common.Hash, error) Commit(context.Context, []byte, uint32) (common.Hash, error) Reveal(context.Context, uint8, []byte, []byte) (common.Hash, error) } @@ -92,8 +92,8 @@ func (c *contract) IsWinner(ctx context.Context) (isWinner bool, err error) { } // Claim sends a transaction to blockchain if a win is claimed. -func (c *contract) Claim(ctx context.Context, proofs ChunkInclusionProofs) (common.Hash, error) { - callData, err := c.incentivesContractABI.Pack("claim", proofs.A, proofs.B, proofs.C) +func (c *contract) Claim(ctx context.Context, proofs []Proof) (common.Hash, error) { + callData, err := c.incentivesContractABI.Pack("claim", proofs) if err != nil { return common.Hash{}, err } diff --git a/pkg/storageincentives/redistribution/redistribution_test.go b/pkg/storageincentives/redistribution/redistribution_test.go index 943d3013ba4..688c52c0689 100644 --- a/pkg/storageincentives/redistribution/redistribution_test.go +++ b/pkg/storageincentives/redistribution/redistribution_test.go @@ -7,15 +7,14 @@ package redistribution_test import ( "bytes" "context" - "encoding/binary" "errors" "fmt" "math/big" + "math/rand" "testing" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" - chaincfg "github.com/ethersphere/bee/pkg/config" "github.com/ethersphere/bee/pkg/log" "github.com/ethersphere/bee/pkg/sctx" "github.com/ethersphere/bee/pkg/storageincentives/redistribution" @@ -26,35 +25,45 @@ import ( "github.com/ethersphere/bee/pkg/util/testutil" ) -var redistributionContractABI = abiutil.MustParseABI(chaincfg.Testnet.RedistributionABI) +// var redistributionContractABI = abiutil.MustParseABI(chaincfg.Testnet.RedistributionABI) +var redistributionContractABI = abiutil.MustParseABI(redistribution.TestnetRedistributionABI) -func randChunkInclusionProof(t *testing.T) redistribution.ChunkInclusionProof { +func randomHashes(t testing.TB, n int) []common.Hash { + rhs := make([]common.Hash, n) + for _, rh := range rhs { + copy(rh[:], testutil.RandBytes(t, 32)) + } + return rhs +} + +// TODO uncomment when ABI is updated +func randProof(t *testing.T) redistribution.Proof { t.Helper() - return redistribution.ChunkInclusionProof{ - ProofSegments: []common.Hash{common.BytesToHash(testutil.RandBytes(t, 32))}, - ProveSegment: common.BytesToHash(testutil.RandBytes(t, 32)), - ProofSegments2: []common.Hash{common.BytesToHash(testutil.RandBytes(t, 32))}, - ProveSegment2: common.BytesToHash(testutil.RandBytes(t, 32)), - ProofSegments3: []common.Hash{common.BytesToHash(testutil.RandBytes(t, 32))}, + return redistribution.Proof{ + Sisters: randomHashes(t, 7), + Data: randomHashes(t, 1)[0], + Sisters2: randomHashes(t, 7), + Data2: randomHashes(t, 1)[0], + Sisters3: randomHashes(t, 7), PostageProof: redistribution.PostageProof{ Signature: testutil.RandBytes(t, 65), - PostageId: common.BytesToHash(testutil.RandBytes(t, 32)), - Index: binary.BigEndian.Uint64(testutil.RandBytes(t, 8)), - TimeStamp: binary.BigEndian.Uint64(testutil.RandBytes(t, 8)), + BatchId: randomHashes(t, 1)[0], + Index: uint64(rand.Int63()), + TimeStamp: uint64(rand.Int63()), }, ChunkSpan: 1, SocProof: []redistribution.SOCProof{}, } } -func randChunkInclusionProofs(t *testing.T) redistribution.ChunkInclusionProofs { +func randProofs(t *testing.T) []redistribution.Proof { t.Helper() - return redistribution.ChunkInclusionProofs{ - A: randChunkInclusionProof(t), - B: randChunkInclusionProof(t), - C: randChunkInclusionProof(t), + return []redistribution.Proof{ + randProof(t), + randProof(t), + randProof(t), } } @@ -184,9 +193,9 @@ func TestRedistribution(t *testing.T) { t.Run("Claim", func(t *testing.T) { t.Parallel() - proofs := randChunkInclusionProofs(t) + proofs := randProofs(t) - expectedCallData, err := redistributionContractABI.Pack("claim", proofs.A, proofs.B, proofs.C) + expectedCallData, err := redistributionContractABI.Pack("claim", proofs) if err != nil { t.Fatal(err) } @@ -225,8 +234,8 @@ func TestRedistribution(t *testing.T) { t.Run("Claim with tx reverted", func(t *testing.T) { t.Parallel() - proofs := randChunkInclusionProofs(t) - expectedCallData, err := redistributionContractABI.Pack("claim", proofs.A, proofs.B, proofs.C) + proofs := randProofs(t) + expectedCallData, err := redistributionContractABI.Pack("claim", proofs) if err != nil { t.Fatal(err) } diff --git a/pkg/storageincentives/redistributionstate.go b/pkg/storageincentives/redistributionstate.go index b1f4b60dd03..6cc201c7db1 100644 --- a/pkg/storageincentives/redistributionstate.go +++ b/pkg/storageincentives/redistributionstate.go @@ -15,8 +15,7 @@ import ( "github.com/ethersphere/bee/pkg/log" "github.com/ethersphere/bee/pkg/settlement/swap/erc20" "github.com/ethersphere/bee/pkg/storage" - storer "github.com/ethersphere/bee/pkg/storer" - "github.com/ethersphere/bee/pkg/swarm" + "github.com/ethersphere/bee/pkg/storageincentives/sampler" "github.com/ethersphere/bee/pkg/transaction" ) @@ -59,17 +58,10 @@ type Status struct { type RoundData struct { CommitKey []byte - SampleData *SampleData + Data *sampler.Data HasRevealed bool } -type SampleData struct { - Anchor1 []byte - ReserveSampleItems []storer.SampleItem - ReserveSampleHash swarm.Address - StorageRadius uint8 -} - func NewStatus() *Status { return &Status{ Reward: big.NewInt(0), @@ -232,26 +224,25 @@ func (r *RedistributionState) SetBalance(ctx context.Context) error { return nil } -func (r *RedistributionState) SampleData(round uint64) (SampleData, bool) { +func (r *RedistributionState) Data(round uint64) (sampler.Data, bool) { r.mtx.Lock() defer r.mtx.Unlock() rd, ok := r.status.RoundData[round] - if !ok || rd.SampleData == nil { - return SampleData{}, false + if !ok || rd.Data == nil { + return sampler.Data{}, false } - return *rd.SampleData, true + return *rd.Data, true } -func (r *RedistributionState) SetSampleData(round uint64, sd SampleData, dur time.Duration) { +func (r *RedistributionState) SetData(round uint64, sd sampler.Data) { r.mtx.Lock() defer r.mtx.Unlock() rd := r.status.RoundData[round] - rd.SampleData = &sd + rd.Data = &sd r.status.RoundData[round] = rd - r.status.SampleDuration = dur r.save() } diff --git a/pkg/storageincentives/redistributionstate_test.go b/pkg/storageincentives/redistributionstate_test.go index 9c7f750d64c..b71910d9338 100644 --- a/pkg/storageincentives/redistributionstate_test.go +++ b/pkg/storageincentives/redistributionstate_test.go @@ -13,6 +13,7 @@ import ( "github.com/ethereum/go-ethereum/common" erc20mock "github.com/ethersphere/bee/pkg/settlement/swap/erc20/mock" "github.com/ethersphere/bee/pkg/statestore/mock" + "github.com/ethersphere/bee/pkg/storageincentives/sampler" "github.com/ethersphere/bee/pkg/swarm" transactionmock "github.com/ethersphere/bee/pkg/transaction/mock" "github.com/ethersphere/bee/pkg/util/testutil" @@ -93,23 +94,23 @@ func TestState(t *testing.T) { func TestStateRoundData(t *testing.T) { t.Parallel() - t.Run("sample data", func(t *testing.T) { + t.Run("sample Data", func(t *testing.T) { t.Parallel() state := createRedistribution(t, nil, nil) - _, exists := state.SampleData(1) + _, exists := state.Data(1) if exists { t.Error("should not exists") } - savedSample := SampleData{ - ReserveSampleHash: swarm.RandAddress(t), - StorageRadius: 3, + savedSample := sampler.Data{ + Hash: swarm.RandAddress(t), + Depth: 3, } - state.SetSampleData(1, savedSample, 0) + state.SetData(1, savedSample) - sample, exists := state.SampleData(1) + sample, exists := state.Data(1) if !exists { t.Error("should exist") } @@ -163,30 +164,30 @@ func TestPurgeRoundData(t *testing.T) { state := createRedistribution(t, nil, nil) - // helper function which populates data at specified round + // helper function which populates sampler.Data at specified round populateDataAtRound := func(round uint64) { - savedSample := SampleData{ - ReserveSampleHash: swarm.RandAddress(t), - StorageRadius: 3, + savedSample := sampler.Data{ + Hash: swarm.RandAddress(t), + Depth: 3, } commitKey := testutil.RandBytes(t, swarm.HashSize) - state.SetSampleData(round, savedSample, 0) + state.SetData(round, savedSample) state.SetCommitKey(round, commitKey) state.SetHasRevealed(round) } - // asserts if there is, or there isn't, data at specified round + // asserts if there is, or there isn't, sampler.Data at specified round assertHasDataAtRound := func(round uint64, shouldHaveData bool) { check := func(exists bool) { if shouldHaveData && !exists { - t.Error("should have data") + t.Error("should have sampler.Data") } else if !shouldHaveData && exists { - t.Error("should not have data") + t.Error("should not have sampler.Data") } } - _, exists1 := state.SampleData(round) + _, exists1 := state.Data(round) _, exists2 := state.CommitKey(round) exists3 := state.HasRevealed(round) @@ -198,7 +199,7 @@ func TestPurgeRoundData(t *testing.T) { const roundsCount = 100 hasRoundData := make([]bool, roundsCount) - // Populate data at random rounds + // Populate sampler.Data at random rounds for i := uint64(0); i < roundsCount; i++ { v := rand.Int()%2 == 0 hasRoundData[i] = v @@ -208,8 +209,8 @@ func TestPurgeRoundData(t *testing.T) { assertHasDataAtRound(i, v) } - // Run purge successively and assert that all data is purged up to - // currentRound - purgeDataOlderThenXRounds + // Run purge successively and assert that all sampler.Data is purged up to + // currentRound - purgesampler.DataOlderThenXRounds for i := uint64(0); i < roundsCount; i++ { state.SetCurrentEvent(0, i) state.purgeStaleRoundData() @@ -223,7 +224,7 @@ func TestPurgeRoundData(t *testing.T) { } } - // Purge remaining data in single go + // Purge remaining sampler.Data in single go round := uint64(roundsCount + purgeStaleDataThreshold) state.SetCurrentEvent(0, round) state.purgeStaleRoundData() diff --git a/pkg/storageincentives/sampler/debug.go b/pkg/storageincentives/sampler/debug.go new file mode 100644 index 00000000000..4abbfc38a68 --- /dev/null +++ b/pkg/storageincentives/sampler/debug.go @@ -0,0 +1,35 @@ +// this file together with rchash endpoint and api support should be removed + +package sampler + +import ( + "context" + "fmt" + "time" + + "github.com/ethersphere/bee/pkg/storageincentives/redistribution" + "github.com/ethersphere/bee/pkg/swarm" +) + +type SampleWithProofs struct { + Hash swarm.Address `json:"hash"` + Proofs []redistribution.Proof `json:"proofs"` + Duration time.Duration `json:"duration"` +} + +// ReserveWithProofs is only called by rchash API +func (s *Sampler) ReserveSampleWithProofs(ctx context.Context, anchor1, anchor2 []byte, depth uint8, round uint64) (swp SampleWithProofs, err error) { + t := time.Now() + + sample, err := s.MakeSample(ctx, anchor1, depth, round) + if err != nil { + return swp, fmt.Errorf("mock reserve sampling: %w", err) + } + + proofs, err := MakeProofs(sample, anchor2) + if err != nil { + return swp, fmt.Errorf("make proofs: %w", err) + } + + return SampleWithProofs{sample.Hash, proofs, time.Since(t)}, nil +} diff --git a/pkg/storageincentives/sampler/export_test.go b/pkg/storageincentives/sampler/export_test.go new file mode 100644 index 00000000000..d61b1f72ca8 --- /dev/null +++ b/pkg/storageincentives/sampler/export_test.go @@ -0,0 +1,6 @@ +package sampler + +var ( + TransformedAddressCAC = transformedAddressCAC + NewStamp = newStamp +) diff --git a/pkg/storageincentives/sampler/proof.go b/pkg/storageincentives/sampler/proof.go new file mode 100644 index 00000000000..0337e749b9a --- /dev/null +++ b/pkg/storageincentives/sampler/proof.go @@ -0,0 +1,53 @@ +// Copyright 2023 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package sampler + +import ( + "math/big" + + "github.com/ethersphere/bee/pkg/bmt" + "github.com/ethersphere/bee/pkg/storageincentives/redistribution" +) + +// MakeProofs creates transaction data for the claim method aka Proof of entitlement (POE). +func MakeProofs( + sample Data, + anchor2 []byte, +) ([]redistribution.Proof, error) { + // witness indexes selecting the index-th item of the sample + index3 := uint64(Size) - 1 + rand := new(big.Int).SetBytes(anchor2) + index1 := new(big.Int).Mod(rand, new(big.Int).SetUint64(index3)).Uint64() + index2 := new(big.Int).Mod(rand, new(big.Int).SetUint64(index3-1)).Uint64() + if index2 >= index1 { + index2++ + } + + // reserve sample inclusion proofs for witness chunks + indexes := []uint64{index1, index2, index3} + witnessProofs := make([]*bmt.Proof, len(indexes)) + for i, idx := range indexes { + proof := sample.Prover.Proof(int(idx) * 2) + witnessProofs[i] = &proof + } + + // data retention proofs for the witness chunks + segmentIndex := int(new(big.Int).Mod(new(big.Int).SetBytes(anchor2), big.NewInt(int64(128))).Uint64()) + retentionProofs := make([]*bmt.Proof, len(indexes)) + transformProofs := make([]*bmt.Proof, len(indexes)) + for i, idx := range indexes { + item := sample.Items[idx] + proof := item.Prover.Proof(segmentIndex) + retentionProofs[i] = &proof + transProof := sample.Items[idx].TransProver.Proof(segmentIndex) + transformProofs[i] = &transProof + } + + proofs := make([]redistribution.Proof, len(indexes)) + for i := range proofs { + proofs[i] = redistribution.NewProof(*witnessProofs[i], *retentionProofs[i], *transformProofs[i], sample.Items[i].Stamp, sample.Items[i].SOC) + } + return proofs, nil +} diff --git a/pkg/storageincentives/sampler/proof_test.go b/pkg/storageincentives/sampler/proof_test.go new file mode 100644 index 00000000000..12323aa1942 --- /dev/null +++ b/pkg/storageincentives/sampler/proof_test.go @@ -0,0 +1,7 @@ +// Copyright 2023 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package sampler_test + +// must create Data from fixtures and run MakeProofs on it and compare to fixtures diff --git a/pkg/storageincentives/sampler/sampler.go b/pkg/storageincentives/sampler/sampler.go new file mode 100644 index 00000000000..c75312681bd --- /dev/null +++ b/pkg/storageincentives/sampler/sampler.go @@ -0,0 +1,470 @@ +// Copyright 2023 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package sampler + +import ( + "bytes" + "context" + "encoding/binary" + "fmt" + "hash" + "math/big" + "sync" + "time" + + "github.com/ethersphere/bee/pkg/bmt" + "github.com/ethersphere/bee/pkg/bmtpool" + "github.com/ethersphere/bee/pkg/postage" + "github.com/ethersphere/bee/pkg/soc" + storer "github.com/ethersphere/bee/pkg/storer" + + "github.com/ethersphere/bee/pkg/swarm" + "golang.org/x/sync/errgroup" +) + +const Size = 16 + +type Batchstore interface { + postage.ChainStateGetter + postage.ValueIterator +} + +type Sampler struct { + backend postage.ChainBackend + batchstore Batchstore + sampler storer.Sampler + fullSyncedFunc func() bool + healthyFunc func() bool +} + +func New( + backend postage.ChainBackend, + batchstore Batchstore, + sampler storer.Sampler, + fullSyncedFunc, + healthyFunc func() bool, +) *Sampler { + return &Sampler{ + batchstore: batchstore, + sampler: sampler, + fullSyncedFunc: fullSyncedFunc, + healthyFunc: healthyFunc, + } +} + +type Sample struct { + Stats Stats + Data Data +} +type Item struct { + Address swarm.Address + TransAddress swarm.Address + Prover bmt.Prover + TransProver bmt.Prover + LoadStamp func() (*postage.Stamp, error) + Stamp *postage.Stamp + SOC *soc.SOC +} + +func newStamp(s swarm.Stamp) *postage.Stamp { + return postage.NewStamp(s.BatchID(), s.Index(), s.Timestamp(), s.Sig()) +} + +type Data struct { + Depth uint8 + Hash swarm.Address + Prover bmt.Prover + Items []Item +} + +func Chunk(depth uint8, items []Item) (*Data, error) { + content := make([]byte, len(items)*2*swarm.HashSize) + for i, s := range items { + copy(content[i*2*swarm.HashSize:], s.Address.Bytes()) + copy(content[(i*2+1)*swarm.HashSize:], s.TransAddress.Bytes()) + } + hasher := bmtpool.Get() + prover := bmt.Prover{Hasher: hasher} + prover.SetHeaderInt64(int64(len(content))) + _, err := prover.Write(content) + if err != nil { + return nil, err + } + hash, err := prover.Hash(nil) + if err != nil { + return nil, err + } + return &Data{ + Depth: depth, + Hash: swarm.NewAddress(hash), + Prover: prover, + Items: items, + }, nil +} + +func (s *Sampler) MakeSample(ctx context.Context, salt []byte, depth uint8, round uint64) (Data, error) { + + maxTimeStamp, err := s.maxTimeStamp(ctx, round) + if err != nil { + return Data{}, err + } + + filterFunc, err := s.getBatchFilterFunc(round) + if err != nil { + return Data{}, fmt.Errorf("get batches with balance below value: %w", err) + } + + sample, err := s.reserveSample(ctx, salt, depth, maxTimeStamp, filterFunc) + return sample.Data, err +} + +// Reserve generates the sample of reserve storage of a node required for the +// storage incentives agent to participate in the lottery round. In order to generate +// this we need to iterate through all the chunks in the node's reserve and +// calculate the transformed hashes of all the chunks using the anchor as the salt. +// In order to generate the transformed hashes, we use bmt hash with a prefixed basehash +// keccak256 with anchor as the prefix. Nodes need to calculate the +// in the most optimal way and there are time restrictions. The lottery round is a +// time based round, so nodes participating in the round need to perform this +// calculation within the round limits. +// In order to optimize this we use a simple pipeline pattern: +// Iterate chunk addresses -> Get the chunk data and calculate transformed hash -> Assemble the +func (s *Sampler) reserveSample( + ctx context.Context, + anchor []byte, + depth uint8, + maxTimeStamp uint64, + filterFunc func(batchID []byte) bool, +) (Sample, error) { + g, ctx := errgroup.WithContext(ctx) + chunkC := make(chan storer.Chunk, 64) + // all performance stats should be removed, this is a benchmarking exercise, any interal data is not really meaningful + // as it is presented to the user + allStats := &Stats{} + statsLock := sync.Mutex{} + addStats := func(stats Stats) { + statsLock.Lock() + allStats.add(stats) + statsLock.Unlock() + } + start := time.Now() + + // Phase 1: Iterate chunk addresses + g.Go(func() error { + start := time.Now() + stats := Stats{} + defer func() { + stats.IterationDuration = time.Since(start) + close(chunkC) + addStats(stats) + }() + + err := s.sampler.Iterate(depth, func(c storer.Chunk) (bool, error) { + select { + case chunkC <- c: + stats.TotalIterated++ + return false, nil + case <-ctx.Done(): + return false, ctx.Err() + } + }) + return err + }) + + // Phase 2: Get the chunk data and calculate transformed hash + ItemChan := make(chan Item, 64) + + prefixHasherFactory := func() hash.Hash { + return swarm.NewPrefixHasher(anchor) + } + transHasherPool := bmt.NewPool(bmt.NewConf(prefixHasherFactory, swarm.BmtBranches, bmtpool.Capacity)) + + const workers = 6 + + for i := 0; i < workers; i++ { + g.Go(func() error { + wstat := Stats{} + defer func() { + addStats(wstat) + }() + + transProver := bmt.Prover{Hasher: transHasherPool.Get()} + for c := range chunkC { + // exclude chunks whose batches balance are below minimum + if filterFunc(c.BatchID) { + wstat.BelowBalanceIgnored++ + continue + } + + chunkLoadStart := time.Now() + + chunk, err := c.Chunk(ctx) + if err != nil { + wstat.ChunkLoadFailed++ + return fmt.Errorf("failed loading chunk at address=%x: %w", c.Address, err) + } + + wstat.ChunkLoadDuration += time.Since(chunkLoadStart) + + var sch *soc.SOC + if c.Type == swarm.ChunkTypeSingleOwner { + sch, err = soc.FromChunk(chunk) + if err != nil { + return err + } + } + + taddrStart := time.Now() + taddr, err := transformedAddress(transProver, chunk, c.Type) + if err != nil { + return err + } + wstat.TaddrDuration += time.Since(taddrStart) + + select { + case ItemChan <- Item{ + TransAddress: taddr, + Address: c.Address, + Stamp: postage.NewStamp(c.BatchID, nil, nil, nil), + LoadStamp: c.Stamp, + TransProver: transProver, + SOC: sch, + }: + case <-ctx.Done(): + return ctx.Err() + } + } + + return nil + }) + } + + go func() { + _ = g.Wait() + close(ItemChan) + }() + + Items := make([]Item, 0, Size) + // insert function will insert the new item in its correct place. If the + // size goes beyond what we need we omit the last item. + insert := func(item Item) { + added := false + for i, sItem := range Items { + if le(item.TransAddress, sItem.TransAddress) { + Items = append(Items[:i+1], Items[i:]...) + Items[i] = item + added = true + break + } + } + if len(Items) > Size { + Items = Items[:Size] + } + if len(Items) < Size && !added { + Items = append(Items, item) + } + } + + // Phase 3: Assemble the . Here we need to assemble only the first Size + // no of items from the results of the 2nd phase. + // In this step stamps are loaded and validated only if chunk will be added to . + stats := Stats{} + for item := range ItemChan { + currentMaxAddr := swarm.EmptyAddress + if len(Items) > 0 { + currentMaxAddr = Items[len(Items)-1].TransAddress + } + + if len(Items) >= Size && le(currentMaxAddr, item.TransAddress) { + continue + } + + stamp, err := item.LoadStamp() + if err != nil { + stats.StampLoadFailed++ + // db.logger.Debug("failed loading stamp", "chunk_address", item.ChunkAddress, "error", err) + continue + } + + // if _, err := db.validStamp(ch); err != nil { + // stats.InvalidStamp++ + // db.logger.Debug("invalid stamp for chunk", "chunk_address", ch.Address(), "error", err) + // continue + // } + + if binary.BigEndian.Uint64(stamp.Timestamp()) > maxTimeStamp { + stats.NewIgnored++ + continue + } + item.Stamp = stamp + + insert(item) + stats.Inserts++ + } + addStats(stats) + + allStats.TotalDuration = time.Since(start) + + if err := g.Wait(); err != nil { + // db.logger.Info("reserve r finished with error", "err", err, "duration", time.Since(t), "storage_radius", storageRadius, "stats", fmt.Sprintf("%+v", allStats)) + + return Sample{}, fmt.Errorf("r: failed creating : %w", err) + } + + // db.logger.Info("reserve r finished", "duration", time.Since(t), "storage_radius", storageRadius, "consensus_time_ns", consensusTime, "stats", fmt.Sprintf("%+v", allStats)) + + data, err := s.sampleChunk(depth, Items) + if err != nil { + return Sample{}, err + } + return Sample{Stats: *allStats, Data: *data}, nil +} +func (s *Sampler) sampleChunk(depth uint8, items []Item) (*Data, error) { + size := len(items) * 2 * swarm.HashSize + content := make([]byte, size) + for i, s := range items { + copy(content[i*swarm.HashSize:], s.Address.Bytes()) + copy(content[(i+1)*2*swarm.HashSize:], s.TransAddress.Bytes()) + } + prover := bmt.Prover{Hasher: bmtpool.Get()} + prover.SetHeaderInt64(int64(size)) + _, err := prover.Write(content) + if err != nil { + return &Data{}, err + } + hash, err := prover.Hash(nil) + if err != nil { + return &Data{}, err + } + return &Data{depth, swarm.NewAddress(hash), prover, items}, nil +} + +// less function uses the byte compare to check for lexicographic ordering +func le(a, b swarm.Address) bool { + return bytes.Compare(a.Bytes(), b.Bytes()) == -1 +} + +func transformedAddress(hasher bmt.Prover, chunk swarm.Chunk, chType swarm.ChunkType) (swarm.Address, error) { + switch chType { + case swarm.ChunkTypeContentAddressed: + return transformedAddressCAC(hasher, chunk) + case swarm.ChunkTypeSingleOwner: + return transformedAddressSOC(hasher, chunk) + default: + return swarm.ZeroAddress, fmt.Errorf("chunk type [%v] is is not valid", chType) + } +} + +func transformedAddressCAC(hasher bmt.Prover, chunk swarm.Chunk) (swarm.Address, error) { + hasher.Reset() + hasher.SetHeader(chunk.Data()[:bmt.SpanSize]) + + _, err := hasher.Write(chunk.Data()[bmt.SpanSize:]) + if err != nil { + return swarm.ZeroAddress, err + } + + taddr, err := hasher.Hash(nil) + if err != nil { + return swarm.ZeroAddress, err + } + + return swarm.NewAddress(taddr), nil +} + +func transformedAddressSOC(hasher bmt.Prover, chunk swarm.Chunk) (swarm.Address, error) { + // Calculate transformed address from wrapped chunk + sChunk, err := soc.FromChunk(chunk) + if err != nil { + return swarm.ZeroAddress, err + } + taddrCac, err := transformedAddressCAC(hasher, sChunk.WrappedChunk()) + if err != nil { + return swarm.ZeroAddress, err + } + + // Hash address and transformed address to make transformed address for this SOC + sHasher := swarm.NewHasher() + if _, err := sHasher.Write(chunk.Address().Bytes()); err != nil { + return swarm.ZeroAddress, err + } + if _, err := sHasher.Write(taddrCac.Bytes()); err != nil { + return swarm.ZeroAddress, err + } + + return swarm.NewAddress(sHasher.Sum(nil)), nil +} + +type Stats struct { + TotalDuration time.Duration + TotalIterated int64 + IterationDuration time.Duration + Inserts int64 + NewIgnored int64 + InvalidStamp int64 + BelowBalanceIgnored int64 + TaddrDuration time.Duration + ValidStampDuration time.Duration + BatchesBelowValueDuration time.Duration + RogueChunk int64 + ChunkLoadDuration time.Duration + ChunkLoadFailed int64 + StampLoadFailed int64 +} + +func (s *Stats) add(other Stats) { + s.TotalDuration += other.TotalDuration + s.TotalIterated += other.TotalIterated + s.IterationDuration += other.IterationDuration + s.Inserts += other.Inserts + s.NewIgnored += other.NewIgnored + s.InvalidStamp += other.InvalidStamp + s.BelowBalanceIgnored += other.BelowBalanceIgnored + s.TaddrDuration += other.TaddrDuration + s.ValidStampDuration += other.ValidStampDuration + s.BatchesBelowValueDuration += other.BatchesBelowValueDuration + s.RogueChunk += other.RogueChunk + s.ChunkLoadDuration += other.ChunkLoadDuration + s.ChunkLoadFailed += other.ChunkLoadFailed + s.StampLoadFailed += other.StampLoadFailed +} + +func (s *Sampler) StorageRadius() uint8 { + return s.sampler.StorageRadius() +} + +func (s *Sampler) getBatchFilterFunc(round uint64) (func(batchID []byte) bool, error) { + cs := s.batchstore.GetChainState() + blocksToLive := (round+2)*152 - cs.Block + // blocksToLive := (round+2)*blocksPerRound - cs.Block + minBalance := new(big.Int).Add(cs.TotalAmount, new(big.Int).Mul(cs.CurrentPrice, big.NewInt(int64(blocksToLive)))) + + excluded := make(map[string]struct{}) + err := s.batchstore.IterateByValue(func(id []byte, val *big.Int) (bool, error) { + if val.Cmp(minBalance) > 0 { + return true, nil + } + excluded[string(id)] = struct{}{} + return false, nil + }) + return func(id []byte) bool { + _, found := excluded[string(id)] + return found + }, err +} + +func (s *Sampler) maxTimeStamp(ctx context.Context, round uint64) (uint64, error) { + previousRoundBlockNumber := new(big.Int).SetUint64((round - 1) * 152) + // previousRoundBlockNumber := new(b/ig.Int).SetUint64((round - 1) * blocksPerRound) + + // s.metrics.BackendCalls.Inc() + lastBlock, err := s.backend.HeaderByNumber(ctx, previousRoundBlockNumber) + if err != nil { + // s.metrics.BackendErrors.Inc() + return 0, err + } + + return lastBlock.Time, nil +} diff --git a/pkg/storageincentives/sampler/sampler_test.go b/pkg/storageincentives/sampler/sampler_test.go new file mode 100644 index 00000000000..c4d6314dc53 --- /dev/null +++ b/pkg/storageincentives/sampler/sampler_test.go @@ -0,0 +1,58 @@ +package sampler_test + +import ( + "hash" + "sort" + + "github.com/ethersphere/bee/pkg/bmt" + "github.com/ethersphere/bee/pkg/bmtpool" + chunk "github.com/ethersphere/bee/pkg/storage/testing" + "github.com/ethersphere/bee/pkg/storageincentives/sampler" + "github.com/ethersphere/bee/pkg/swarm" +) + +// RandSample returns Sample with random values. +func RandSample(anchor []byte) (sampler.Data, error) { + chunks := make([]swarm.Chunk, sampler.Size) + for i := 0; i < sampler.Size; i++ { + ch := chunk.GenerateTestRandomChunk() + // if i%3 == 0 { + // ch = chunk.GenerateTestRandomSoChunk(t, ch) + // } + chunks[i] = ch + } + + return MakeSampleUsingChunks(chunks, anchor) +} + +// MakeSampleUsingChunks returns Sample constructed using supplied chunks. +func MakeSampleUsingChunks(chunks []swarm.Chunk, anchor []byte) (sampler.Data, error) { + prefixHasherFactory := func() hash.Hash { + return swarm.NewPrefixHasher(anchor) + } + items := make([]sampler.Item, len(chunks)) + for i, ch := range chunks { + // these provers need to be used for proofs later and assumed are in hashed state + // also the hasher taken from bmtpool musst not be put back + prover := bmt.Prover{Hasher: bmtpool.Get()} + transProver := bmt.Prover{Hasher: bmt.NewHasher(prefixHasherFactory)} + tr, err := sampler.TransformedAddressCAC(transProver, ch) + if err != nil { + return sampler.Data{}, err + } + + items[i] = sampler.Item{ + Address: ch.Address(), + TransAddress: tr, + Prover: prover, + TransProver: transProver, + Stamp: sampler.NewStamp(ch.Stamp()), + } + } + + sort.Slice(items, func(i, j int) bool { + return items[i].TransAddress.Compare(items[j].TransAddress) == -1 + }) + + return sampler.Data{Items: items}, nil +} diff --git a/pkg/storageincentives/testdata/inclusion-proofs.json b/pkg/storageincentives/testdata/inclusion-proofs.json index fe79666096e..7688022650e 100644 --- a/pkg/storageincentives/testdata/inclusion-proofs.json +++ b/pkg/storageincentives/testdata/inclusion-proofs.json @@ -1,3 +1,131 @@ +<<<<<<< HEAD +[ + { + "ChunkSpan" : 26, + "Data" : "0x7133885ac59dca7b97773acb740e978d41a4af45bd563067c8a3d863578488f1", + "Data2" : "0x0000000000000000000000000000000000000000000000000000000000000000", + "PostageProof" : { + "BatchId" : "0x4c8efc14c8e3cee608174f995d7afe155897bf643a31226e4f1363bc97686aef", + "Index" : 525059, + "Signature" : "p8jRioJ504AxaevPTlp9vdTf/vpZHqrY0c6qY2p5Otl15/exCHvOpBdlJbAALt3grL/aINvS37vnd8qziWj9xhs=", + "TimeStamp" : 197384 + }, + "Sisters" : [ + "0x0875605dea48e812c9685ffba220a2b848bdbafdb95e02d087ba4a32925ea34f", + "0xf873df729270d5f4064286f3f018385a07cb4228734d8aca794299fee6e3e3e5", + "0x1fa8767fe303fe7487f5d58e4d72e5e170cf135f58a91b4fe19e4b19e5b67b5a", + "0x0f64ed713e25291e2c5a0561f584fa78c55a399e31919903d215dd622bcfd0ec", + "0x34dac0c73538614801c1ad16e272ef57f0b96a972073d15418f38daf9eb401c0", + "0x0eb01ebfc9ed27500cd4dfc979272d1f0913cc9f66540d7e8005811109e1cf2d", + "0x887c22bd8750d34016ac3c66b5ff102dacdd73f6b014e710b51e8022af9a1968" + ], + "Sisters2" : [ + "0x0000000000000000000000000000000000000000000000000000000000000000", + "0xad3228b676f7d3cd4284a5443f17f1962b36e491b30a40b2405849e597ba5fb5", + "0xb4c11951957c6f8f642c4af61cd6b24640fec6dc7fc607ee8206a99e92410d30", + "0x21ddb9a356815c3fac1026b6dec5df3124afbadb485c9ba5a3e3398a04b7ba85", + "0x2047b070a295f8d517121d9ac9b3d5f9a944bac6cfab72dd5a7c625ab4558b0a", + "0x0eb01ebfc9ed27500cd4dfc979272d1f0913cc9f66540d7e8005811109e1cf2d", + "0x887c22bd8750d34016ac3c66b5ff102dacdd73f6b014e710b51e8022af9a1968" + ], + "Sisters3" : [ + "0x0000000000000000000000000000000000000000000000000000000000000000", + "0xa7f526447b68535121d36909a7585c9610d4fe6d4115540464c70499b0d7136d", + "0x066dd7ce6f4f1c97e78ff1c271916db25cb06128c92f8c8520807a0fa2ba93ff", + "0xdf43c86b00db2156e769e8a8df1f08dc89ab5661c6fbaa9563f96fb9c051fc63", + "0x7327aecc9178bab420bb6fe482e07b65af69775b55666ec1ac8ab3da5bcec6dc", + "0xb68323ecaad1185a5e078f41c94c59d0b6dda5d57e109866e64d44acb8702846", + "0x478adfa93a7bb904d0aa86ff0d559f43aa915ee7865592e717b72a24452181cb" + ], + "SocProof" : [ + { + "ChunkAddr" : "0xf32442586d93d8c002372ed41fa2ea1f281f38311c161d535c3665de5d9bfd92", + "Identifier" : "0x6223cfdd75a40440ccd32d0b11b24f08562ec63b1ea3b8cb1a59dfc3e3c33595", + "Signature" : "TpV2lJM45MI/RwO/gTZyVquFmzKTT+9Nsu5Gp2v2vjVOlqxii4eEst4Lvq5ZdUaXgxktbRcFSF/Krdje3ebiqhs=", + "Signer" : "gntE1T3yhUBXcTslzdZT63D+NsQ=" + } + ] + }, + { + "ChunkSpan" : 26, + "Data" : "0x535e6df58a122a8f5e6c851c19b3e042f4cd1b5c5a8c499581c9f6d4e3509182", + "Data2" : "0x0000000000000000000000000000000000000000000000000000000000000000", + "PostageProof" : { + "BatchId" : "0x4c8efc14c8e3cee608174f995d7afe155897bf643a31226e4f1363bc97686aef", + "Index" : 525059, + "Signature" : "sCdPzaWeiq/+6AMCGXGnZKAXziwPQcjOtu796oBwVvYhqY/qtevzO7YGXknAUPQT7IhAsAj8Ik2ILOUkTOPgFxw=", + "TimeStamp" : 197384 + }, + "Sisters" : [ + "0x463aeb4ca5f000064c082e56eba387004265d2f47bf1226ef2d86cb163bcca3a", + "0x829af58b2a2f1c6c156baa196f03be4df510a96419f2dd54c456d3da30166312", + "0xdee4815ec42efa507b79cf4eb1f272e07be1b526cbd48137a287d9e5b2b2808a", + "0x0f64ed713e25291e2c5a0561f584fa78c55a399e31919903d215dd622bcfd0ec", + "0x34dac0c73538614801c1ad16e272ef57f0b96a972073d15418f38daf9eb401c0", + "0x0eb01ebfc9ed27500cd4dfc979272d1f0913cc9f66540d7e8005811109e1cf2d", + "0x887c22bd8750d34016ac3c66b5ff102dacdd73f6b014e710b51e8022af9a1968" + ], + "Sisters2" : [ + "0x0000000000000000000000000000000000000000000000000000000000000000", + "0xad3228b676f7d3cd4284a5443f17f1962b36e491b30a40b2405849e597ba5fb5", + "0xb4c11951957c6f8f642c4af61cd6b24640fec6dc7fc607ee8206a99e92410d30", + "0x21ddb9a356815c3fac1026b6dec5df3124afbadb485c9ba5a3e3398a04b7ba85", + "0x46f43b515833749217540ac60c79e0c6a54c73f3500850b5869b31d5c89d101f", + "0x0eb01ebfc9ed27500cd4dfc979272d1f0913cc9f66540d7e8005811109e1cf2d", + "0x887c22bd8750d34016ac3c66b5ff102dacdd73f6b014e710b51e8022af9a1968" + ], + "Sisters3" : [ + "0x0000000000000000000000000000000000000000000000000000000000000000", + "0xa7f526447b68535121d36909a7585c9610d4fe6d4115540464c70499b0d7136d", + "0x066dd7ce6f4f1c97e78ff1c271916db25cb06128c92f8c8520807a0fa2ba93ff", + "0xdf43c86b00db2156e769e8a8df1f08dc89ab5661c6fbaa9563f96fb9c051fc63", + "0x4284c510d7d64c9e052c73bddadb1fca522fd26caf2ebf007faad50a9a0f09fa", + "0xb68323ecaad1185a5e078f41c94c59d0b6dda5d57e109866e64d44acb8702846", + "0x478adfa93a7bb904d0aa86ff0d559f43aa915ee7865592e717b72a24452181cb" + ], + "SocProof" : [] + }, + { + "ChunkSpan" : 27, + "Data" : "0x5ba2c8b912fad4aeb4a11a960946d07b9f66bc40ac54d87224914d75f5aeea5f", + "Data2" : "0x0000000000000000000000000000000000000000000000000000000000000000", + "PostageProof" : { + "BatchId" : "0x4c8efc14c8e3cee608174f995d7afe155897bf643a31226e4f1363bc97686aef", + "Index" : 525059, + "Signature" : "Z0fFjOhhNIbGlvW7c5PJxZCUNxlpw6Ur+vdRksYF9K18cMbnH90yDiDQBeQulO4yECwjTrRl9PX9nbYPytA1axw=", + "TimeStamp" : 197384 + }, + "Sisters" : [ + "0xfee18543782df46a86f85456e62dc973a4c84369b6b1cd4f93e57fe247f9730e", + "0x23a0858ee2b8b4cb0ba66d3533f468d6b583a6b77df0cc78fc6df64dc735a917", + "0xb6bffa54dec44ad57349f9aef6cb65a1f8807f15447462ec519751220e5a5bc3", + "0x553aae9948fc13c33d8b353cf5694ecadc7c40c8316ce09cbd4d864dbb94f026", + "0xaf7db874a9b5addf602b3e899194480a32afec6d6cd4ec0fadf9e065db739dd5", + "0x0eb01ebfc9ed27500cd4dfc979272d1f0913cc9f66540d7e8005811109e1cf2d", + "0x887c22bd8750d34016ac3c66b5ff102dacdd73f6b014e710b51e8022af9a1968" + ], + "Sisters2" : [ + "0x0000000000000000000000000000000000000000000000000000000000000000", + "0xad3228b676f7d3cd4284a5443f17f1962b36e491b30a40b2405849e597ba5fb5", + "0xb4c11951957c6f8f642c4af61cd6b24640fec6dc7fc607ee8206a99e92410d30", + "0x21ddb9a356815c3fac1026b6dec5df3124afbadb485c9ba5a3e3398a04b7ba85", + "0x7f575db255ef42dcaeb7658df9f33fe5a1aad5d41af51a72a381acea29d98a12", + "0x0eb01ebfc9ed27500cd4dfc979272d1f0913cc9f66540d7e8005811109e1cf2d", + "0x887c22bd8750d34016ac3c66b5ff102dacdd73f6b014e710b51e8022af9a1968" + ], + "Sisters3" : [ + "0x0000000000000000000000000000000000000000000000000000000000000000", + "0xa7f526447b68535121d36909a7585c9610d4fe6d4115540464c70499b0d7136d", + "0x066dd7ce6f4f1c97e78ff1c271916db25cb06128c92f8c8520807a0fa2ba93ff", + "0xdf43c86b00db2156e769e8a8df1f08dc89ab5661c6fbaa9563f96fb9c051fc63", + "0x7683427ba0ef1fbebf97f2fc36859df88ead8123369fe38d7b767b7a7eda5294", + "0xb68323ecaad1185a5e078f41c94c59d0b6dda5d57e109866e64d44acb8702846", + "0x478adfa93a7bb904d0aa86ff0d559f43aa915ee7865592e717b72a24452181cb" + ], + "SocProof" : [] + } +] +======= { "proof1": { "proofSegments": [ @@ -123,4 +251,5 @@ }, "socProof": [] } -} \ No newline at end of file +} +>>>>>>> origin/master diff --git a/pkg/storer/mock/mockreserve.go b/pkg/storer/mock/mockreserve.go index d6d70390242..241652e08f5 100644 --- a/pkg/storer/mock/mockreserve.go +++ b/pkg/storer/mock/mockreserve.go @@ -6,7 +6,6 @@ package mockstorer import ( "context" - "math/big" "sync" storage "github.com/ethersphere/bee/pkg/storage" @@ -83,12 +82,6 @@ func WithPutHook(f func(swarm.Chunk) error) Option { }) } -func WithSample(s storer.Sample) Option { - return optionFunc(func(p *ReserveStore) { - p.sample = s - }) -} - var _ storer.ReserveStore = (*ReserveStore)(nil) type ReserveStore struct { @@ -110,8 +103,6 @@ type ReserveStore struct { subResponses []chunksResponse putHook func(swarm.Chunk) error - - sample storer.Sample } // NewReserve returns a new Reserve mock. @@ -238,10 +229,6 @@ func (s *ReserveStore) ReserveHas(addr swarm.Address, batchID []byte) (bool, err return true, nil } -func (s *ReserveStore) ReserveSample(context.Context, []byte, uint8, uint64, *big.Int) (storer.Sample, error) { - return s.sample, nil -} - type Option interface { apply(*ReserveStore) } diff --git a/pkg/storer/sample_test.go b/pkg/storer/sample_test.go deleted file mode 100644 index 215b190fc1a..00000000000 --- a/pkg/storer/sample_test.go +++ /dev/null @@ -1,195 +0,0 @@ -// Copyright 2023 The Swarm Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package storer_test - -import ( - "context" - "github.com/ethersphere/bee/pkg/postage" - "math/rand" - "testing" - "time" - - postagetesting "github.com/ethersphere/bee/pkg/postage/testing" - chunk "github.com/ethersphere/bee/pkg/storage/testing" - "github.com/ethersphere/bee/pkg/storer" - "github.com/ethersphere/bee/pkg/swarm" - "github.com/google/go-cmp/cmp" -) - -func TestReserveSampler(t *testing.T) { - const chunkCountPerPO = 10 - const maxPO = 10 - - randChunks := func(baseAddr swarm.Address, timeVar uint64) []swarm.Chunk { - var chs []swarm.Chunk - for po := 0; po < maxPO; po++ { - for i := 0; i < chunkCountPerPO; i++ { - ch := chunk.GenerateValidRandomChunkAt(baseAddr, po).WithBatch(0, 3, 2, false) - if rand.Intn(2) == 0 { // 50% chance to wrap CAC into SOC - ch = chunk.GenerateTestRandomSoChunk(t, ch) - } - - // override stamp timestamp to be before the consensus timestamp - ch = ch.WithStamp(postagetesting.MustNewStampWithTimestamp(timeVar)) - chs = append(chs, ch) - } - } - return chs - } - - testF := func(t *testing.T, baseAddr swarm.Address, st *storer.DB) { - t.Helper() - - timeVar := uint64(time.Now().UnixNano()) - chs := randChunks(baseAddr, timeVar-1) - - putter := st.ReservePutter() - for _, ch := range chs { - err := putter.Put(context.Background(), ch) - if err != nil { - t.Fatal(err) - } - } - - t.Run("reserve size", reserveSizeTest(st.Reserve(), chunkCountPerPO*maxPO)) - - var sample1 storer.Sample - - t.Run("reserve sample 1", func(t *testing.T) { - sample, err := st.ReserveSample(context.TODO(), []byte("anchor"), 5, timeVar, nil) - if err != nil { - t.Fatal(err) - } - - assertValidSample(t, sample) - assertSampleNoErrors(t, sample) - - if sample.Stats.NewIgnored != 0 { - t.Fatalf("sample should not have ignored chunks") - } - - sample1 = sample - }) - - // We generate another 100 chunks. With these new chunks in the reserve, statistically - // some of them should definitely make it to the sample based on lex ordering. - chs = randChunks(baseAddr, timeVar+1) - putter = st.ReservePutter() - for _, ch := range chs { - err := putter.Put(context.Background(), ch) - if err != nil { - t.Fatal(err) - } - } - - time.Sleep(time.Second) - - t.Run("reserve size", reserveSizeTest(st.Reserve(), 2*chunkCountPerPO*maxPO)) - - // Now we generate another sample with the older timestamp. This should give us - // the exact same sample, ensuring that none of the later chunks were considered. - t.Run("reserve sample 2", func(t *testing.T) { - sample, err := st.ReserveSample(context.TODO(), []byte("anchor"), 5, timeVar, nil) - if err != nil { - t.Fatal(err) - } - - if diff := cmp.Diff(sample.Items, sample1.Items, cmp.AllowUnexported(postage.Stamp{})); diff != "" { - t.Fatalf("samples different (-want +have):\n%s", diff) - } - - if sample.Stats.NewIgnored == 0 { - t.Fatalf("sample should have some ignored chunks") - } - - assertSampleNoErrors(t, sample) - }) - - } - - t.Run("disk", func(t *testing.T) { - t.Parallel() - baseAddr := swarm.RandAddress(t) - opts := dbTestOps(baseAddr, 1000, nil, nil, time.Second) - opts.ValidStamp = func(ch swarm.Chunk) (swarm.Chunk, error) { return ch, nil } - - storer, err := diskStorer(t, opts)() - if err != nil { - t.Fatal(err) - } - testF(t, baseAddr, storer) - }) - t.Run("mem", func(t *testing.T) { - t.Parallel() - baseAddr := swarm.RandAddress(t) - opts := dbTestOps(baseAddr, 1000, nil, nil, time.Second) - opts.ValidStamp = func(ch swarm.Chunk) (swarm.Chunk, error) { return ch, nil } - - storer, err := memStorer(t, opts)() - if err != nil { - t.Fatal(err) - } - testF(t, baseAddr, storer) - }) -} - -func TestRandSample(t *testing.T) { - t.Parallel() - - sample := storer.RandSample(t, nil) - assertValidSample(t, sample) -} - -func assertValidSample(t *testing.T, sample storer.Sample) { - t.Helper() - - // Assert that sample size is exactly storer.SampleSize - if len(sample.Items) != storer.SampleSize { - t.Fatalf("incorrect no of sample items, exp %d found %d", storer.SampleSize, len(sample.Items)) - } - - // Assert that sample item has all fields set - assertSampleItem := func(item storer.SampleItem, i int) { - if !item.TransformedAddress.IsValidNonEmpty() { - t.Fatalf("sample item [%d]: transformed address should be set", i) - } - if !item.ChunkAddress.IsValidNonEmpty() { - t.Fatalf("sample item [%d]: chunk address should be set", i) - } - if item.ChunkData == nil { - t.Fatalf("sample item [%d]: chunk data should be set", i) - } - if item.Stamp == nil { - t.Fatalf("sample item [%d]: stamp should be set", i) - } - } - for i, item := range sample.Items { - assertSampleItem(item, i) - } - - // Assert that transformed addresses are in ascending order - for i := 0; i < len(sample.Items)-1; i++ { - if sample.Items[i].TransformedAddress.Compare(sample.Items[i+1].TransformedAddress) != -1 { - t.Fatalf("incorrect order of samples") - } - } -} - -func assertSampleNoErrors(t *testing.T, sample storer.Sample) { - t.Helper() - - if sample.Stats.ChunkLoadFailed != 0 { - t.Fatalf("got unexpected failed chunk loads") - } - if sample.Stats.RogueChunk != 0 { - t.Fatalf("got unexpected rouge chunks") - } - if sample.Stats.StampLoadFailed != 0 { - t.Fatalf("got unexpected failed stamp loads") - } - if sample.Stats.InvalidStamp != 0 { - t.Fatalf("got unexpected invalid stamps") - } -} diff --git a/pkg/storer/sampler.go b/pkg/storer/sampler.go new file mode 100644 index 00000000000..b9a1592ce84 --- /dev/null +++ b/pkg/storer/sampler.go @@ -0,0 +1,50 @@ +// Copyright 2023 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package storer + +import ( + "context" + + "github.com/ethersphere/bee/pkg/postage" + "github.com/ethersphere/bee/pkg/storer/internal/chunkstamp" + "github.com/ethersphere/bee/pkg/storer/internal/reserve" + "github.com/ethersphere/bee/pkg/swarm" +) + +// Sampler interface providing the iterator to iterate through the reserve +type Sampler interface { + Iterate(depth uint8, f func(Chunk) (bool, error)) error + StorageRadius() uint8 +} + +// Chunk serves to reify the partial info about a chunk coming from indexstore +// the Chunk and Stamp functions allow lazy loading of chunk data and/or postage stamp +type Chunk struct { + Address swarm.Address + BatchID []byte + Type swarm.ChunkType + db *DB +} + +// Chunk returns the swarm.Chunk +func (c *Chunk) Chunk(ctx context.Context) (swarm.Chunk, error) { + return c.db.ChunkStore().Get(ctx, c.Address) +} + +// Stamp returns the postage stamp for the chunk +func (c *Chunk) Stamp() (*postage.Stamp, error) { + s, err := chunkstamp.LoadWithBatchID(c.db.repo.IndexStore(), "reserve", c.Address, c.BatchID) + if err != nil { + return nil, err + } + return postage.NewStamp(c.BatchID, s.Index(), s.Timestamp(), s.Sig()), nil +} + +// Iterate iterates through the reserve and applies f to all chunks at and above the PO depth +func (db *DB) Iterate(depth uint8, f func(Chunk) (bool, error)) error { + return db.reserve.IterateChunksItems(db.repo, depth, func(chi reserve.ChunkItem) (bool, error) { + return f(Chunk{chi.ChunkAddress, chi.BatchID, chi.Type, db}) + }) +} diff --git a/pkg/storer/sampler_test.go b/pkg/storer/sampler_test.go new file mode 100644 index 00000000000..5b6b6952970 --- /dev/null +++ b/pkg/storer/sampler_test.go @@ -0,0 +1,193 @@ +// Copyright 2023 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package storer_test + +import ( + "context" + "math/rand" + "testing" + "time" + + postagetesting "github.com/ethersphere/bee/pkg/postage/testing" + chunk "github.com/ethersphere/bee/pkg/storage/testing" + "github.com/ethersphere/bee/pkg/storer" + "github.com/ethersphere/bee/pkg/swarm" +) + +func TestReserveSampler(t *testing.T) { + const chunkCountPerPO = 10 + const maxPO = 10 + + randChunks := func(baseAddr swarm.Address, timeVar uint64) []swarm.Chunk { + var chs []swarm.Chunk + for po := 0; po < maxPO; po++ { + for i := 0; i < chunkCountPerPO; i++ { + ch := chunk.GenerateValidRandomChunkAt(baseAddr, po).WithBatch(0, 3, 2, false) + if rand.Intn(2) == 0 { // 50% chance to wrap CAC into SOC + ch = chunk.GenerateTestRandomSoChunk(t, ch) + } + + // override stamp timestamp to be before the consensus timestamp + ch = ch.WithStamp(postagetesting.MustNewStampWithTimestamp(timeVar)) + chs = append(chs, ch) + } + } + return chs + } + + testF := func(t *testing.T, baseAddr swarm.Address, st *storer.DB) { + t.Helper() + + timeVar := uint64(time.Now().UnixNano()) + chs := randChunks(baseAddr, timeVar-1) + + putter := st.ReservePutter() + for _, ch := range chs { + err := putter.Put(context.Background(), ch) + if err != nil { + t.Fatal(err) + } + } + + t.Run("reserve size", reserveSizeTest(st.Reserve(), chunkCountPerPO*maxPO)) + + // var sample1 storer.Sample + + // t.Run("reserve sample 1", func(t *testing.T) { + // sample, err := st.ReserveSample(context.TODO(), []byte("anchor"), 5, timeVar, nil) + // if err != nil { + // t.Fatal(err) + // } + + // assertValidSample(t, sample) + // assertSampleNoErrors(t, sample) + + // if sample.Stats.NewIgnored != 0 { + // t.Fatalf("sample should not have ignored chunks") + // } + + // sample1 = sample + // }) + + // // We generate another 100 chunks. With these new chunks in the reserve, statistically + // // some of them should definitely make it to the sample based on lex ordering. + // chs = randChunks(baseAddr, timeVar+1) + // putter = st.ReservePutter() + // for _, ch := range chs { + // err := putter.Put(context.Background(), ch) + // if err != nil { + // t.Fatal(err) + // } + // } + + // time.Sleep(time.Second) + + // t.Run("reserve size", reserveSizeTest(st.Reserve(), 2*chunkCountPerPO*maxPO)) + + // // Now we generate another sample with the older timestamp. This should give us + // // the exact same sample, ensuring that none of the later chunks were considered. + // t.Run("reserve sample 2", func(t *testing.T) { + // sample, err := st.ReserveSample(context.TODO(), []byte("anchor"), 5, timeVar, nil) + // if err != nil { + // t.Fatal(err) + // } + + // if diff := cmp.Diff(sample.Items, sample1.Items, cmp.AllowUnexported(postage.Stamp{})); diff != "" { + // t.Fatalf("samples different (-want +have):\n%s", diff) + // } + + // if sample.Stats.NewIgnored == 0 { + // t.Fatalf("sample should have some ignored chunks") + // } + + // assertSampleNoErrors(t, sample) + // }) + + } + + t.Run("disk", func(t *testing.T) { + t.Parallel() + baseAddr := swarm.RandAddress(t) + opts := dbTestOps(baseAddr, 1000, nil, nil, time.Second) + opts.ValidStamp = func(ch swarm.Chunk) (swarm.Chunk, error) { return ch, nil } + + storer, err := diskStorer(t, opts)() + if err != nil { + t.Fatal(err) + } + testF(t, baseAddr, storer) + }) + t.Run("mem", func(t *testing.T) { + t.Parallel() + baseAddr := swarm.RandAddress(t) + opts := dbTestOps(baseAddr, 1000, nil, nil, time.Second) + opts.ValidStamp = func(ch swarm.Chunk) (swarm.Chunk, error) { return ch, nil } + + storer, err := memStorer(t, opts)() + if err != nil { + t.Fatal(err) + } + testF(t, baseAddr, storer) + }) +} + +// func TestRandSample(t *testing.T) { +// t.Parallel() + +// sample := storer.RandSample(t, nil) +// assertValidSample(t, sample) +// } + +// func assertValidSample(t *testing.T, sample storer.Sample) { +// t.Helper() + +// // Assert that sample size is exactly storer.SampleSize +// if len(sample.Items) != storer.SampleSize { +// t.Fatalf("incorrect no of sample items, exp %d found %d", storer.SampleSize, len(sample.Items)) +// } + +// // Assert that sample item has all fields set +// assertSampleItem := func(item storer.SampleItem, i int) { +// if !item.TransformedAddress.IsValidNonEmpty() { +// t.Fatalf("sample item [%d]: transformed address should be set", i) +// } +// if !item.ChunkAddress.IsValidNonEmpty() { +// t.Fatalf("sample item [%d]: chunk address should be set", i) +// } +// if item.ChunkData == nil { +// t.Fatalf("sample item [%d]: chunk data should be set", i) +// } +// if item.Stamp == nil { +// t.Fatalf("sample item [%d]: stamp should be set", i) +// } +// } +// for i, item := range sample.Items { +// assertSampleItem(item, i) +// } + +// // Assert that transformed addresses are in ascending order +// for i := 0; i < len(sample.Items)-1; i++ { +// if sample.Items[i].TransformedAddress.Compare(sample.Items[i+1].TransformedAddress) != -1 { +// t.Fatalf("incorrect order of samples") +// } +// } +// } + +// func assertSampleNoErrors(t *testing.T, sample storer.Sample) { +// t.Helper() + +// if sample.Stats.ChunkLoadFailed != 0 { +// t.Fatalf("got unexpected failed chunk loads") +// } +// if sample.Stats.RogueChunk != 0 { +// t.Fatalf("got unexpected rouge chunks") +// } +// if sample.Stats.StampLoadFailed != 0 { +// t.Fatalf("got unexpected failed stamp loads") +// } +// if sample.Stats.InvalidStamp != 0 { +// t.Fatalf("got unexpected invalid stamps") +// } +// } diff --git a/pkg/storer/storer.go b/pkg/storer/storer.go index 771889a9689..c74c4e177fa 100644 --- a/pkg/storer/storer.go +++ b/pkg/storer/storer.go @@ -10,7 +10,6 @@ import ( "fmt" "io" "io/fs" - "math/big" "os" "path" "path/filepath" @@ -139,7 +138,6 @@ var _ Reserve = (*DB)(nil) type Reserve interface { ReserveStore EvictBatch(ctx context.Context, batchID []byte) error - ReserveSample(context.Context, []byte, uint8, uint64, *big.Int) (Sample, error) ReserveSize() int }