Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: sampler optimization #4882

Merged
merged 5 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions pkg/soc/soc.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,22 @@ func (s *SOC) Sign(signer crypto.Signer) (swarm.Chunk, error) {
return s.Chunk()
}

// UnwrapCAC extracts the CAC inside the SOC.
func UnwrapCAC(sch swarm.Chunk) (swarm.Chunk, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sch = soc.FromChunk and sch.WrappedChunk() is not that performant?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if so, I think later the chunk types could be refactored to serialize/deserialize chunk data only once trough one parent aggregation connection or interface.

chunkData := sch.Data()
if len(chunkData) < swarm.SocMinChunkSize {
return nil, errWrongChunkSize
}

cursor := swarm.HashSize + swarm.SocSignatureSize
ch, err := cac.NewWithDataSpan(chunkData[cursor:])
if err != nil {
return nil, err
}

return ch, nil
}

// FromChunk recreates a SOC representation from swarm.Chunk data.
func FromChunk(sch swarm.Chunk) (*SOC, error) {
chunkData := sch.Data()
Expand Down
9 changes: 9 additions & 0 deletions pkg/soc/soc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,15 @@ func TestFromChunk(t *testing.T) {
if !ch.Equal(recoveredSOC.WrappedChunk()) {
t.Fatalf("wrapped chunk mismatch. got %s want %s", recoveredSOC.WrappedChunk().Address(), ch.Address())
}

unwrapped, err := soc.UnwrapCAC(sch)
if err != nil {
t.Fatal(err)
}

if !ch.Equal(unwrapped) {
t.Fatalf("wrapped chunk mismatch. got %s want %s", recoveredSOC.WrappedChunk().Address(), ch.Address())
}
}

func TestCreateAddress(t *testing.T) {
Expand Down
142 changes: 74 additions & 68 deletions pkg/storer/sample.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"fmt"
"hash"
"math/big"
"runtime"
"sort"
"sync"
"testing"
Expand Down Expand Up @@ -41,67 +42,6 @@ type Sample struct {
Items []SampleItem
}

// RandSample returns Sample with random values.
func RandSample(t *testing.T, anchor []byte) Sample {
t.Helper()

chunks := make([]swarm.Chunk, SampleSize)
for i := 0; i < SampleSize; i++ {
ch := chunk.GenerateTestRandomChunk()
if i%3 == 0 {
ch = chunk.GenerateTestRandomSoChunk(t, ch)
}
chunks[i] = ch
}

sample, err := MakeSampleUsingChunks(chunks, anchor)
if err != nil {
t.Fatal(err)
}

return sample
}

// MakeSampleUsingChunks returns Sample constructed using supplied chunks.
func MakeSampleUsingChunks(chunks []swarm.Chunk, anchor []byte) (Sample, error) {
prefixHasherFactory := func() hash.Hash {
return swarm.NewPrefixHasher(anchor)
}
items := make([]SampleItem, len(chunks))
for i, ch := range chunks {
tr, err := transformedAddress(bmt.NewHasher(prefixHasherFactory), ch, getChunkType(ch))
if err != nil {
return Sample{}, err
}

items[i] = SampleItem{
TransformedAddress: tr,
ChunkAddress: ch.Address(),
ChunkData: ch.Data(),
Stamp: newStamp(ch.Stamp()),
}
}

sort.Slice(items, func(i, j int) bool {
return items[i].TransformedAddress.Compare(items[j].TransformedAddress) == -1
})

return Sample{Items: items}, nil
}

func newStamp(s swarm.Stamp) *postage.Stamp {
return postage.NewStamp(s.BatchID(), s.Index(), s.Timestamp(), s.Sig())
}

func getChunkType(chunk swarm.Chunk) swarm.ChunkType {
if cac.Valid(chunk) {
return swarm.ChunkTypeContentAddressed
} else if soc.Valid(chunk) {
return swarm.ChunkTypeSingleOwner
}
return swarm.ChunkTypeUnspecified
}

// ReserveSample generates the sample of reserve storage of a node required for the
// storage incentives agent to participate in the lottery round. In order to generate
// this sample we need to iterate through all the chunks in the node's reserve and
Expand All @@ -125,8 +65,9 @@ func (db *DB) ReserveSample(
consensusTime uint64,
minBatchBalance *big.Int,
) (Sample, error) {

g, ctx := errgroup.WithContext(ctx)
chunkC := make(chan *reserve.ChunkBinItem, 64)

allStats := &SampleStats{}
statsLock := sync.Mutex{}
addStats := func(stats SampleStats) {
Expand All @@ -144,6 +85,8 @@ func (db *DB) ReserveSample(

allStats.BatchesBelowValueDuration = time.Since(t)

chunkC := make(chan *reserve.ChunkBinItem)

// Phase 1: Iterate chunk addresses
g.Go(func() error {
start := time.Now()
Expand All @@ -170,13 +113,14 @@ func (db *DB) ReserveSample(
})

// Phase 2: Get the chunk data and calculate transformed hash
sampleItemChan := make(chan SampleItem, 64)
sampleItemChan := make(chan SampleItem)

prefixHasherFactory := func() hash.Hash {
return swarm.NewPrefixHasher(anchor)
}

const workers = 6
workers := max(4, runtime.NumCPU())
db.logger.Debug("reserve sampler workers", "count", workers)

for i := 0; i < workers; i++ {
g.Go(func() error {
Expand Down Expand Up @@ -241,6 +185,7 @@ func (db *DB) ReserveSample(
}()

sampleItems := make([]SampleItem, 0, SampleSize)

// insert function will insert the new item in its correct place. If the sample
// size goes beyond what we need we omit the last item.
insert := func(item SampleItem) {
Expand Down Expand Up @@ -376,20 +321,20 @@ func transformedAddressCAC(hasher *bmt.Hasher, chunk swarm.Chunk) (swarm.Address
return swarm.NewAddress(taddr), nil
}

func transformedAddressSOC(hasher *bmt.Hasher, chunk swarm.Chunk) (swarm.Address, error) {
func transformedAddressSOC(hasher *bmt.Hasher, socChunk swarm.Chunk) (swarm.Address, error) {
// Calculate transformed address from wrapped chunk
sChunk, err := soc.FromChunk(chunk)
cacChunk, err := soc.UnwrapCAC(socChunk)
if err != nil {
return swarm.ZeroAddress, err
}
taddrCac, err := transformedAddressCAC(hasher, sChunk.WrappedChunk())
taddrCac, err := transformedAddressCAC(hasher, cacChunk)
if err != nil {
return swarm.ZeroAddress, err
}

// Hash address and transformed address to make transformed address for this SOC
sHasher := swarm.NewHasher()
if _, err := sHasher.Write(chunk.Address().Bytes()); err != nil {
if _, err := sHasher.Write(socChunk.Address().Bytes()); err != nil {
return swarm.ZeroAddress, err
}
if _, err := sHasher.Write(taddrCac.Bytes()); err != nil {
Expand Down Expand Up @@ -432,3 +377,64 @@ func (s *SampleStats) add(other SampleStats) {
s.ChunkLoadFailed += other.ChunkLoadFailed
s.StampLoadFailed += other.StampLoadFailed
}

// RandSample returns Sample with random values.
func RandSample(t *testing.T, anchor []byte) Sample {
t.Helper()

chunks := make([]swarm.Chunk, SampleSize)
for i := 0; i < SampleSize; i++ {
ch := chunk.GenerateTestRandomChunk()
if i%3 == 0 {
ch = chunk.GenerateTestRandomSoChunk(t, ch)
}
chunks[i] = ch
}

sample, err := MakeSampleUsingChunks(chunks, anchor)
if err != nil {
t.Fatal(err)
}

return sample
}

// MakeSampleUsingChunks returns Sample constructed using supplied chunks.
func MakeSampleUsingChunks(chunks []swarm.Chunk, anchor []byte) (Sample, error) {
prefixHasherFactory := func() hash.Hash {
return swarm.NewPrefixHasher(anchor)
}
items := make([]SampleItem, len(chunks))
for i, ch := range chunks {
tr, err := transformedAddress(bmt.NewHasher(prefixHasherFactory), ch, getChunkType(ch))
if err != nil {
return Sample{}, err
}

items[i] = SampleItem{
TransformedAddress: tr,
ChunkAddress: ch.Address(),
ChunkData: ch.Data(),
Stamp: newStamp(ch.Stamp()),
}
}

sort.Slice(items, func(i, j int) bool {
return items[i].TransformedAddress.Compare(items[j].TransformedAddress) == -1
})

return Sample{Items: items}, nil
}

func newStamp(s swarm.Stamp) *postage.Stamp {
return postage.NewStamp(s.BatchID(), s.Index(), s.Timestamp(), s.Sig())
}

func getChunkType(chunk swarm.Chunk) swarm.ChunkType {
if cac.Valid(chunk) {
return swarm.ChunkTypeContentAddressed
} else if soc.Valid(chunk) {
return swarm.ChunkTypeSingleOwner
}
return swarm.ChunkTypeUnspecified
}
Loading