From 336ff7759a08cdcec796e017d86e71a911845256 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Fri, 1 Nov 2024 02:06:48 +0300 Subject: [PATCH] perf: sampler optimization (#4882) --- pkg/soc/soc.go | 16 +++++ pkg/soc/soc_test.go | 9 +++ pkg/storer/sample.go | 142 ++++++++++++++++++++++--------------------- 3 files changed, 99 insertions(+), 68 deletions(-) diff --git a/pkg/soc/soc.go b/pkg/soc/soc.go index 65b44f2e66e..bf4711cee61 100644 --- a/pkg/soc/soc.go +++ b/pkg/soc/soc.go @@ -130,6 +130,22 @@ func (s *SOC) Sign(signer crypto.Signer) (swarm.Chunk, error) { return s.Chunk() } +// UnwrapCAC extracts the CAC inside the SOC. +func UnwrapCAC(sch swarm.Chunk) (swarm.Chunk, error) { + chunkData := sch.Data() + if len(chunkData) < swarm.SocMinChunkSize { + return nil, errWrongChunkSize + } + + cursor := swarm.HashSize + swarm.SocSignatureSize + ch, err := cac.NewWithDataSpan(chunkData[cursor:]) + if err != nil { + return nil, err + } + + return ch, nil +} + // FromChunk recreates a SOC representation from swarm.Chunk data. func FromChunk(sch swarm.Chunk) (*SOC, error) { chunkData := sch.Data() diff --git a/pkg/soc/soc_test.go b/pkg/soc/soc_test.go index 6346242fa1b..cbf5c341904 100644 --- a/pkg/soc/soc_test.go +++ b/pkg/soc/soc_test.go @@ -327,6 +327,15 @@ func TestFromChunk(t *testing.T) { if !ch.Equal(recoveredSOC.WrappedChunk()) { t.Fatalf("wrapped chunk mismatch. got %s want %s", recoveredSOC.WrappedChunk().Address(), ch.Address()) } + + unwrapped, err := soc.UnwrapCAC(sch) + if err != nil { + t.Fatal(err) + } + + if !ch.Equal(unwrapped) { + t.Fatalf("wrapped chunk mismatch. got %s want %s", recoveredSOC.WrappedChunk().Address(), ch.Address()) + } } func TestCreateAddress(t *testing.T) { diff --git a/pkg/storer/sample.go b/pkg/storer/sample.go index 409d4b31094..965e8e5b8ad 100644 --- a/pkg/storer/sample.go +++ b/pkg/storer/sample.go @@ -11,6 +11,7 @@ import ( "fmt" "hash" "math/big" + "runtime" "sort" "sync" "testing" @@ -41,67 +42,6 @@ type Sample struct { Items []SampleItem } -// RandSample returns Sample with random values. -func RandSample(t *testing.T, anchor []byte) Sample { - t.Helper() - - chunks := make([]swarm.Chunk, SampleSize) - for i := 0; i < SampleSize; i++ { - ch := chunk.GenerateTestRandomChunk() - if i%3 == 0 { - ch = chunk.GenerateTestRandomSoChunk(t, ch) - } - chunks[i] = ch - } - - sample, err := MakeSampleUsingChunks(chunks, anchor) - if err != nil { - t.Fatal(err) - } - - return sample -} - -// MakeSampleUsingChunks returns Sample constructed using supplied chunks. -func MakeSampleUsingChunks(chunks []swarm.Chunk, anchor []byte) (Sample, error) { - prefixHasherFactory := func() hash.Hash { - return swarm.NewPrefixHasher(anchor) - } - items := make([]SampleItem, len(chunks)) - for i, ch := range chunks { - tr, err := transformedAddress(bmt.NewHasher(prefixHasherFactory), ch, getChunkType(ch)) - if err != nil { - return Sample{}, err - } - - items[i] = SampleItem{ - TransformedAddress: tr, - ChunkAddress: ch.Address(), - ChunkData: ch.Data(), - Stamp: newStamp(ch.Stamp()), - } - } - - sort.Slice(items, func(i, j int) bool { - return items[i].TransformedAddress.Compare(items[j].TransformedAddress) == -1 - }) - - return Sample{Items: items}, nil -} - -func newStamp(s swarm.Stamp) *postage.Stamp { - return postage.NewStamp(s.BatchID(), s.Index(), s.Timestamp(), s.Sig()) -} - -func getChunkType(chunk swarm.Chunk) swarm.ChunkType { - if cac.Valid(chunk) { - return swarm.ChunkTypeContentAddressed - } else if soc.Valid(chunk) { - return swarm.ChunkTypeSingleOwner - } - return swarm.ChunkTypeUnspecified -} - // ReserveSample generates the sample of reserve storage of a node required for the // storage incentives agent to participate in the lottery round. In order to generate // this sample we need to iterate through all the chunks in the node's reserve and @@ -125,8 +65,9 @@ func (db *DB) ReserveSample( consensusTime uint64, minBatchBalance *big.Int, ) (Sample, error) { + g, ctx := errgroup.WithContext(ctx) - chunkC := make(chan *reserve.ChunkBinItem, 64) + allStats := &SampleStats{} statsLock := sync.Mutex{} addStats := func(stats SampleStats) { @@ -144,6 +85,8 @@ func (db *DB) ReserveSample( allStats.BatchesBelowValueDuration = time.Since(t) + chunkC := make(chan *reserve.ChunkBinItem) + // Phase 1: Iterate chunk addresses g.Go(func() error { start := time.Now() @@ -170,13 +113,14 @@ func (db *DB) ReserveSample( }) // Phase 2: Get the chunk data and calculate transformed hash - sampleItemChan := make(chan SampleItem, 64) + sampleItemChan := make(chan SampleItem) prefixHasherFactory := func() hash.Hash { return swarm.NewPrefixHasher(anchor) } - const workers = 6 + workers := max(4, runtime.NumCPU()) + db.logger.Debug("reserve sampler workers", "count", workers) for i := 0; i < workers; i++ { g.Go(func() error { @@ -241,6 +185,7 @@ func (db *DB) ReserveSample( }() sampleItems := make([]SampleItem, 0, SampleSize) + // insert function will insert the new item in its correct place. If the sample // size goes beyond what we need we omit the last item. insert := func(item SampleItem) { @@ -376,20 +321,20 @@ func transformedAddressCAC(hasher *bmt.Hasher, chunk swarm.Chunk) (swarm.Address return swarm.NewAddress(taddr), nil } -func transformedAddressSOC(hasher *bmt.Hasher, chunk swarm.Chunk) (swarm.Address, error) { +func transformedAddressSOC(hasher *bmt.Hasher, socChunk swarm.Chunk) (swarm.Address, error) { // Calculate transformed address from wrapped chunk - sChunk, err := soc.FromChunk(chunk) + cacChunk, err := soc.UnwrapCAC(socChunk) if err != nil { return swarm.ZeroAddress, err } - taddrCac, err := transformedAddressCAC(hasher, sChunk.WrappedChunk()) + taddrCac, err := transformedAddressCAC(hasher, cacChunk) if err != nil { return swarm.ZeroAddress, err } // Hash address and transformed address to make transformed address for this SOC sHasher := swarm.NewHasher() - if _, err := sHasher.Write(chunk.Address().Bytes()); err != nil { + if _, err := sHasher.Write(socChunk.Address().Bytes()); err != nil { return swarm.ZeroAddress, err } if _, err := sHasher.Write(taddrCac.Bytes()); err != nil { @@ -432,3 +377,64 @@ func (s *SampleStats) add(other SampleStats) { s.ChunkLoadFailed += other.ChunkLoadFailed s.StampLoadFailed += other.StampLoadFailed } + +// RandSample returns Sample with random values. +func RandSample(t *testing.T, anchor []byte) Sample { + t.Helper() + + chunks := make([]swarm.Chunk, SampleSize) + for i := 0; i < SampleSize; i++ { + ch := chunk.GenerateTestRandomChunk() + if i%3 == 0 { + ch = chunk.GenerateTestRandomSoChunk(t, ch) + } + chunks[i] = ch + } + + sample, err := MakeSampleUsingChunks(chunks, anchor) + if err != nil { + t.Fatal(err) + } + + return sample +} + +// MakeSampleUsingChunks returns Sample constructed using supplied chunks. +func MakeSampleUsingChunks(chunks []swarm.Chunk, anchor []byte) (Sample, error) { + prefixHasherFactory := func() hash.Hash { + return swarm.NewPrefixHasher(anchor) + } + items := make([]SampleItem, len(chunks)) + for i, ch := range chunks { + tr, err := transformedAddress(bmt.NewHasher(prefixHasherFactory), ch, getChunkType(ch)) + if err != nil { + return Sample{}, err + } + + items[i] = SampleItem{ + TransformedAddress: tr, + ChunkAddress: ch.Address(), + ChunkData: ch.Data(), + Stamp: newStamp(ch.Stamp()), + } + } + + sort.Slice(items, func(i, j int) bool { + return items[i].TransformedAddress.Compare(items[j].TransformedAddress) == -1 + }) + + return Sample{Items: items}, nil +} + +func newStamp(s swarm.Stamp) *postage.Stamp { + return postage.NewStamp(s.BatchID(), s.Index(), s.Timestamp(), s.Sig()) +} + +func getChunkType(chunk swarm.Chunk) swarm.ChunkType { + if cac.Valid(chunk) { + return swarm.ChunkTypeContentAddressed + } else if soc.Valid(chunk) { + return swarm.ChunkTypeSingleOwner + } + return swarm.ChunkTypeUnspecified +}