diff --git a/.github/workflows/beekeeper.yml b/.github/workflows/beekeeper.yml index a41be1cbfdd..1bc839291cc 100644 --- a/.github/workflows/beekeeper.yml +++ b/.github/workflows/beekeeper.yml @@ -12,7 +12,7 @@ env: REPLICA: 3 RUN_TYPE: "PR RUN" SETUP_CONTRACT_IMAGE: "ethersphere/bee-localchain" - SETUP_CONTRACT_IMAGE_TAG: "0.9.2-rc1" + SETUP_CONTRACT_IMAGE_TAG: "0.9.2-rc6" BEELOCAL_BRANCH: "update-k3s-0.30.3" BEEKEEPER_BRANCH: "update-k8s-0.30.3" BEEKEEPER_METRICS_ENABLED: false diff --git a/go.mod b/go.mod index 9baf8639388..bda88034b37 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/coreos/go-semver v0.3.0 github.com/ethereum/go-ethereum v1.14.3 github.com/ethersphere/go-price-oracle-abi v0.2.0 - github.com/ethersphere/go-storage-incentives-abi v0.9.2-rc1 + github.com/ethersphere/go-storage-incentives-abi v0.9.2-rc6 github.com/ethersphere/go-sw3-abi v0.6.5 github.com/ethersphere/langos v1.0.0 github.com/go-playground/validator/v10 v10.11.1 diff --git a/go.sum b/go.sum index b837278926d..b6b34b27d69 100644 --- a/go.sum +++ b/go.sum @@ -236,8 +236,8 @@ github.com/ethereum/go-ethereum v1.14.3 h1:5zvnAqLtnCZrU9uod1JCvHWJbPMURzYFHfc2e github.com/ethereum/go-ethereum v1.14.3/go.mod h1:1STrq471D0BQbCX9He0hUj4bHxX2k6mt5nOQJhDNOJ8= github.com/ethersphere/go-price-oracle-abi v0.2.0 h1:wtIcYLgNZHY4BjYwJCnu93SvJdVAZVvBaKinspyyHvQ= github.com/ethersphere/go-price-oracle-abi v0.2.0/go.mod h1:sI/Qj4/zJ23/b1enzwMMv0/hLTpPNVNacEwCWjo6yBk= -github.com/ethersphere/go-storage-incentives-abi v0.9.2-rc1 h1:Cf3LFlz87FqlTqcuN4q4Hry4iUaAbbroaFxpCgHVhtY= -github.com/ethersphere/go-storage-incentives-abi v0.9.2-rc1/go.mod h1:SXvJVtM4sEsaSKD0jc1ClpDLw8ErPoROZDme4Wrc/Nc= +github.com/ethersphere/go-storage-incentives-abi v0.9.2-rc6 h1:3s6c2w9JrToXdiX9vSGxatmRUBzOe57joZk1Xf4SWHU= +github.com/ethersphere/go-storage-incentives-abi v0.9.2-rc6/go.mod h1:SXvJVtM4sEsaSKD0jc1ClpDLw8ErPoROZDme4Wrc/Nc= github.com/ethersphere/go-sw3-abi v0.6.5 h1:M5dcIe1zQYvGpY2K07UNkNU9Obc4U+A1fz68Ho/Q+XE= github.com/ethersphere/go-sw3-abi v0.6.5/go.mod h1:BmpsvJ8idQZdYEtWnvxA8POYQ8Rl/NhyCdF0zLMOOJU= github.com/ethersphere/langos v1.0.0 h1:NBtNKzXTTRSue95uOlzPN4py7Aofs0xWPzyj4AI1Vcc= diff --git a/pkg/api/status.go b/pkg/api/status.go index 30e09e1f166..fdd6f0cc584 100644 --- a/pkg/api/status.go +++ b/pkg/api/status.go @@ -194,7 +194,7 @@ func (s *Service) statusGetNeighborhoods(w http.ResponseWriter, r *http.Request) neighborhoods = append(neighborhoods, statusNeighborhoodResponse{ Neighborhood: n.Neighborhood.String(), ReserveSizeWithinRadius: n.ReserveSizeWithinRadius, - Proximity: swarm.Proximity(s.overlay.Bytes(), n.Neighborhood.Bytes()), + Proximity: n.Proximity, }) } diff --git a/pkg/postage/postagecontract/contract.go b/pkg/postage/postagecontract/contract.go index b454f249c11..b0e5d06908a 100644 --- a/pkg/postage/postagecontract/contract.go +++ b/pkg/postage/postagecontract/contract.go @@ -170,7 +170,7 @@ func (c *postageContract) sendApproveTransaction(ctx context.Context, amount *bi To: &c.bzzTokenAddress, Data: callData, GasPrice: sctx.GetGasPrice(ctx), - GasLimit: 65000, + GasLimit: max(sctx.GetGasLimit(ctx), c.gasLimit), Value: big.NewInt(0), Description: approveDescription, } diff --git a/pkg/soc/soc.go b/pkg/soc/soc.go index 65b44f2e66e..bf4711cee61 100644 --- a/pkg/soc/soc.go +++ b/pkg/soc/soc.go @@ -130,6 +130,22 @@ func (s *SOC) Sign(signer crypto.Signer) (swarm.Chunk, error) { return s.Chunk() } +// UnwrapCAC extracts the CAC inside the SOC. +func UnwrapCAC(sch swarm.Chunk) (swarm.Chunk, error) { + chunkData := sch.Data() + if len(chunkData) < swarm.SocMinChunkSize { + return nil, errWrongChunkSize + } + + cursor := swarm.HashSize + swarm.SocSignatureSize + ch, err := cac.NewWithDataSpan(chunkData[cursor:]) + if err != nil { + return nil, err + } + + return ch, nil +} + // FromChunk recreates a SOC representation from swarm.Chunk data. func FromChunk(sch swarm.Chunk) (*SOC, error) { chunkData := sch.Data() diff --git a/pkg/soc/soc_test.go b/pkg/soc/soc_test.go index 6346242fa1b..cbf5c341904 100644 --- a/pkg/soc/soc_test.go +++ b/pkg/soc/soc_test.go @@ -327,6 +327,15 @@ func TestFromChunk(t *testing.T) { if !ch.Equal(recoveredSOC.WrappedChunk()) { t.Fatalf("wrapped chunk mismatch. got %s want %s", recoveredSOC.WrappedChunk().Address(), ch.Address()) } + + unwrapped, err := soc.UnwrapCAC(sch) + if err != nil { + t.Fatal(err) + } + + if !ch.Equal(unwrapped) { + t.Fatalf("wrapped chunk mismatch. got %s want %s", recoveredSOC.WrappedChunk().Address(), ch.Address()) + } } func TestCreateAddress(t *testing.T) { diff --git a/pkg/storageincentives/staking/contract.go b/pkg/storageincentives/staking/contract.go index 1ff87c6e2a4..882721fd797 100644 --- a/pkg/storageincentives/staking/contract.go +++ b/pkg/storageincentives/staking/contract.go @@ -253,7 +253,7 @@ func (c *contract) sendApproveTransaction(ctx context.Context, amount *big.Int) To: &c.bzzTokenAddress, Data: callData, GasPrice: sctx.GetGasPrice(ctx), - GasLimit: 65000, + GasLimit: max(sctx.GetGasLimit(ctx), c.gasLimit), Value: big.NewInt(0), Description: approveDescription, } diff --git a/pkg/storer/reserve.go b/pkg/storer/reserve.go index 41d76e66d1a..ecbeb340588 100644 --- a/pkg/storer/reserve.go +++ b/pkg/storer/reserve.go @@ -500,23 +500,27 @@ func (db *DB) SubscribeBin(ctx context.Context, bin uint8, start uint64) (<-chan type NeighborhoodStat struct { Neighborhood swarm.Neighborhood ReserveSizeWithinRadius int + Proximity uint8 } func (db *DB) NeighborhoodsStat(ctx context.Context) ([]*NeighborhoodStat, error) { radius := db.StorageRadius() - networkRadius := radius + uint8(db.reserveOptions.capacityDoubling) + responsibilityRadius := radius + uint8(db.reserveOptions.capacityDoubling) prefixes := neighborhoodPrefixes(db.baseAddr, int(radius), db.reserveOptions.capacityDoubling) neighs := make([]*NeighborhoodStat, len(prefixes)) for i, n := range prefixes { - neighs[i] = &NeighborhoodStat{swarm.NewNeighborhood(n, networkRadius), 0} + neighs[i] = &NeighborhoodStat{ + Neighborhood: swarm.NewNeighborhood(n, responsibilityRadius), + ReserveSizeWithinRadius: 0, + Proximity: min(responsibilityRadius, swarm.Proximity(n.Bytes(), db.baseAddr.Bytes()))} } err := db.reserve.IterateChunksItems(0, func(ch *reserve.ChunkBinItem) (bool, error) { for _, n := range neighs { - if swarm.Proximity(ch.Address.Bytes(), n.Neighborhood.Bytes()) >= networkRadius { + if swarm.Proximity(ch.Address.Bytes(), n.Neighborhood.Bytes()) >= responsibilityRadius { n.ReserveSizeWithinRadius++ break } diff --git a/pkg/storer/reserve_test.go b/pkg/storer/reserve_test.go index 564fec4ca6c..79b38ac2ecd 100644 --- a/pkg/storer/reserve_test.go +++ b/pkg/storer/reserve_test.go @@ -670,11 +670,11 @@ func TestNeighborhoodStats(t *testing.T) { t.Parallel() const ( - chunkCountPerPO = 16 - maxPO = 5 - networkRadius uint8 = 4 - doublingFactor uint8 = 2 - localRadius uint8 = networkRadius - doublingFactor + chunkCountPerPO = 16 + maxPO = 5 + committedDepth uint8 = 4 + doublingFactor uint8 = 2 + responsibiliyDepth uint8 = committedDepth - doublingFactor ) mustParse := func(s string) swarm.Address { @@ -706,10 +706,10 @@ func TestNeighborhoodStats(t *testing.T) { testF := func(t *testing.T, st *storer.DB) { t.Helper() - putChunks(baseAddr, int(networkRadius), st) - putChunks(sister1, int(networkRadius), st) - putChunks(sister2, int(networkRadius), st) - putChunks(sister3, int(networkRadius), st) + putChunks(baseAddr, int(committedDepth), st) + putChunks(sister1, int(committedDepth), st) + putChunks(sister2, int(committedDepth), st) + putChunks(sister3, int(committedDepth), st) neighs, err := st.NeighborhoodsStat(context.Background()) if err != nil { @@ -726,12 +726,19 @@ func TestNeighborhoodStats(t *testing.T) { } } - if !neighs[0].Neighborhood.Equal(swarm.NewNeighborhood(baseAddr, networkRadius)) || - !neighs[1].Neighborhood.Equal(swarm.NewNeighborhood(sister1, networkRadius)) || - !neighs[2].Neighborhood.Equal(swarm.NewNeighborhood(sister2, networkRadius)) || - !neighs[3].Neighborhood.Equal(swarm.NewNeighborhood(sister3, networkRadius)) { + if !neighs[0].Neighborhood.Equal(swarm.NewNeighborhood(baseAddr, committedDepth)) || + !neighs[1].Neighborhood.Equal(swarm.NewNeighborhood(sister1, committedDepth)) || + !neighs[2].Neighborhood.Equal(swarm.NewNeighborhood(sister2, committedDepth)) || + !neighs[3].Neighborhood.Equal(swarm.NewNeighborhood(sister3, committedDepth)) { t.Fatal("chunk addresses do not match") } + + if neighs[0].Proximity != committedDepth || + neighs[1].Proximity != 3 || + neighs[2].Proximity != 2 || + neighs[3].Proximity != 2 { + t.Fatalf("wrong proximity") + } } t.Run("disk", func(t *testing.T) { @@ -742,8 +749,8 @@ func TestNeighborhoodStats(t *testing.T) { if err != nil { t.Fatal(err) } - storer.StartReserveWorker(context.Background(), pullerMock.NewMockRateReporter(0), networkRadiusFunc(localRadius)) - err = spinlock.Wait(time.Minute, func() bool { return storer.StorageRadius() == localRadius }) + storer.StartReserveWorker(context.Background(), pullerMock.NewMockRateReporter(0), networkRadiusFunc(responsibiliyDepth)) + err = spinlock.Wait(time.Minute, func() bool { return storer.StorageRadius() == responsibiliyDepth }) if err != nil { t.Fatal(err) } @@ -757,8 +764,8 @@ func TestNeighborhoodStats(t *testing.T) { if err != nil { t.Fatal(err) } - storer.StartReserveWorker(context.Background(), pullerMock.NewMockRateReporter(0), networkRadiusFunc(localRadius)) - err = spinlock.Wait(time.Minute, func() bool { return storer.StorageRadius() == localRadius }) + storer.StartReserveWorker(context.Background(), pullerMock.NewMockRateReporter(0), networkRadiusFunc(responsibiliyDepth)) + err = spinlock.Wait(time.Minute, func() bool { return storer.StorageRadius() == responsibiliyDepth }) if err != nil { t.Fatal(err) } diff --git a/pkg/storer/sample.go b/pkg/storer/sample.go index 409d4b31094..965e8e5b8ad 100644 --- a/pkg/storer/sample.go +++ b/pkg/storer/sample.go @@ -11,6 +11,7 @@ import ( "fmt" "hash" "math/big" + "runtime" "sort" "sync" "testing" @@ -41,67 +42,6 @@ type Sample struct { Items []SampleItem } -// RandSample returns Sample with random values. -func RandSample(t *testing.T, anchor []byte) Sample { - t.Helper() - - chunks := make([]swarm.Chunk, SampleSize) - for i := 0; i < SampleSize; i++ { - ch := chunk.GenerateTestRandomChunk() - if i%3 == 0 { - ch = chunk.GenerateTestRandomSoChunk(t, ch) - } - chunks[i] = ch - } - - sample, err := MakeSampleUsingChunks(chunks, anchor) - if err != nil { - t.Fatal(err) - } - - return sample -} - -// MakeSampleUsingChunks returns Sample constructed using supplied chunks. -func MakeSampleUsingChunks(chunks []swarm.Chunk, anchor []byte) (Sample, error) { - prefixHasherFactory := func() hash.Hash { - return swarm.NewPrefixHasher(anchor) - } - items := make([]SampleItem, len(chunks)) - for i, ch := range chunks { - tr, err := transformedAddress(bmt.NewHasher(prefixHasherFactory), ch, getChunkType(ch)) - if err != nil { - return Sample{}, err - } - - items[i] = SampleItem{ - TransformedAddress: tr, - ChunkAddress: ch.Address(), - ChunkData: ch.Data(), - Stamp: newStamp(ch.Stamp()), - } - } - - sort.Slice(items, func(i, j int) bool { - return items[i].TransformedAddress.Compare(items[j].TransformedAddress) == -1 - }) - - return Sample{Items: items}, nil -} - -func newStamp(s swarm.Stamp) *postage.Stamp { - return postage.NewStamp(s.BatchID(), s.Index(), s.Timestamp(), s.Sig()) -} - -func getChunkType(chunk swarm.Chunk) swarm.ChunkType { - if cac.Valid(chunk) { - return swarm.ChunkTypeContentAddressed - } else if soc.Valid(chunk) { - return swarm.ChunkTypeSingleOwner - } - return swarm.ChunkTypeUnspecified -} - // ReserveSample generates the sample of reserve storage of a node required for the // storage incentives agent to participate in the lottery round. In order to generate // this sample we need to iterate through all the chunks in the node's reserve and @@ -125,8 +65,9 @@ func (db *DB) ReserveSample( consensusTime uint64, minBatchBalance *big.Int, ) (Sample, error) { + g, ctx := errgroup.WithContext(ctx) - chunkC := make(chan *reserve.ChunkBinItem, 64) + allStats := &SampleStats{} statsLock := sync.Mutex{} addStats := func(stats SampleStats) { @@ -144,6 +85,8 @@ func (db *DB) ReserveSample( allStats.BatchesBelowValueDuration = time.Since(t) + chunkC := make(chan *reserve.ChunkBinItem) + // Phase 1: Iterate chunk addresses g.Go(func() error { start := time.Now() @@ -170,13 +113,14 @@ func (db *DB) ReserveSample( }) // Phase 2: Get the chunk data and calculate transformed hash - sampleItemChan := make(chan SampleItem, 64) + sampleItemChan := make(chan SampleItem) prefixHasherFactory := func() hash.Hash { return swarm.NewPrefixHasher(anchor) } - const workers = 6 + workers := max(4, runtime.NumCPU()) + db.logger.Debug("reserve sampler workers", "count", workers) for i := 0; i < workers; i++ { g.Go(func() error { @@ -241,6 +185,7 @@ func (db *DB) ReserveSample( }() sampleItems := make([]SampleItem, 0, SampleSize) + // insert function will insert the new item in its correct place. If the sample // size goes beyond what we need we omit the last item. insert := func(item SampleItem) { @@ -376,20 +321,20 @@ func transformedAddressCAC(hasher *bmt.Hasher, chunk swarm.Chunk) (swarm.Address return swarm.NewAddress(taddr), nil } -func transformedAddressSOC(hasher *bmt.Hasher, chunk swarm.Chunk) (swarm.Address, error) { +func transformedAddressSOC(hasher *bmt.Hasher, socChunk swarm.Chunk) (swarm.Address, error) { // Calculate transformed address from wrapped chunk - sChunk, err := soc.FromChunk(chunk) + cacChunk, err := soc.UnwrapCAC(socChunk) if err != nil { return swarm.ZeroAddress, err } - taddrCac, err := transformedAddressCAC(hasher, sChunk.WrappedChunk()) + taddrCac, err := transformedAddressCAC(hasher, cacChunk) if err != nil { return swarm.ZeroAddress, err } // Hash address and transformed address to make transformed address for this SOC sHasher := swarm.NewHasher() - if _, err := sHasher.Write(chunk.Address().Bytes()); err != nil { + if _, err := sHasher.Write(socChunk.Address().Bytes()); err != nil { return swarm.ZeroAddress, err } if _, err := sHasher.Write(taddrCac.Bytes()); err != nil { @@ -432,3 +377,64 @@ func (s *SampleStats) add(other SampleStats) { s.ChunkLoadFailed += other.ChunkLoadFailed s.StampLoadFailed += other.StampLoadFailed } + +// RandSample returns Sample with random values. +func RandSample(t *testing.T, anchor []byte) Sample { + t.Helper() + + chunks := make([]swarm.Chunk, SampleSize) + for i := 0; i < SampleSize; i++ { + ch := chunk.GenerateTestRandomChunk() + if i%3 == 0 { + ch = chunk.GenerateTestRandomSoChunk(t, ch) + } + chunks[i] = ch + } + + sample, err := MakeSampleUsingChunks(chunks, anchor) + if err != nil { + t.Fatal(err) + } + + return sample +} + +// MakeSampleUsingChunks returns Sample constructed using supplied chunks. +func MakeSampleUsingChunks(chunks []swarm.Chunk, anchor []byte) (Sample, error) { + prefixHasherFactory := func() hash.Hash { + return swarm.NewPrefixHasher(anchor) + } + items := make([]SampleItem, len(chunks)) + for i, ch := range chunks { + tr, err := transformedAddress(bmt.NewHasher(prefixHasherFactory), ch, getChunkType(ch)) + if err != nil { + return Sample{}, err + } + + items[i] = SampleItem{ + TransformedAddress: tr, + ChunkAddress: ch.Address(), + ChunkData: ch.Data(), + Stamp: newStamp(ch.Stamp()), + } + } + + sort.Slice(items, func(i, j int) bool { + return items[i].TransformedAddress.Compare(items[j].TransformedAddress) == -1 + }) + + return Sample{Items: items}, nil +} + +func newStamp(s swarm.Stamp) *postage.Stamp { + return postage.NewStamp(s.BatchID(), s.Index(), s.Timestamp(), s.Sig()) +} + +func getChunkType(chunk swarm.Chunk) swarm.ChunkType { + if cac.Valid(chunk) { + return swarm.ChunkTypeContentAddressed + } else if soc.Valid(chunk) { + return swarm.ChunkTypeSingleOwner + } + return swarm.ChunkTypeUnspecified +}