Skip to content

Commit

Permalink
Don't pass Key where it's passed with MultiItem
Browse files Browse the repository at this point in the history
  • Loading branch information
razmser committed Nov 11, 2024
1 parent ed90552 commit 631a3a8
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 67 deletions.
5 changes: 2 additions & 3 deletions internal/agent/agent_shard_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,8 @@ func (s *Shard) sampleBucket(bucket *data_model.MetricsBucket, rnd *rand.Rand) [
SampleKeys: config.SampleKeys,
Meta: s.agent.metricStorage,
Rand: rnd,
DiscardF: func(key data_model.Key, item *data_model.MultiItem, _ uint32) {
bucket.DeleteMultiItem(&key)
DiscardF: func(item *data_model.MultiItem, _ uint32) {
bucket.DeleteMultiItem(&item.Key)
}, // remove from map
})
for _, item := range bucket.MultiItems {
Expand All @@ -279,7 +279,6 @@ func (s *Shard) sampleBucket(bucket *data_model.MetricsBucket, rnd *rand.Rand) [
}
}
sampler.Add(data_model.SamplingMultiItemPair{
Key: item.Key,
Item: item,
WhaleWeight: whaleWeight,
Size: sz,
Expand Down
36 changes: 17 additions & 19 deletions internal/aggregator/aggregator_insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,16 +430,16 @@ func (a *Aggregator) RowDataMarshalAppendPositions(buckets []*aggregatorBucket,
metricCache := makeMetricCache(a.metricStorage)
usedTimestamps := map[uint32]struct{}{}

insertItem := func(k data_model.Key, item *data_model.MultiItem, sf float64, bucketTs uint32) { // lambda is convenient here
insertItem := func(item *data_model.MultiItem, sf float64, bucketTs uint32) { // lambda is convenient here
is := insertSize{}

resPos := len(res)
if !item.Tail.Empty() { // only tail
res = appendKeys(res, k, metricCache, usedTimestamps, newFormat, rnd, "")
res = appendKeys(res, item.Key, metricCache, usedTimestamps, newFormat, rnd, "")

res = multiValueMarshal(rnd, k.Metric, metricCache, res, &item.Tail, "", sf, newFormat)
res = multiValueMarshal(rnd, item.Key.Metric, metricCache, res, &item.Tail, "", sf, newFormat)

if k.Metric < 0 {
if item.Key.Metric < 0 {
is.builtin += len(res) - resPos
} else {
switch {
Expand All @@ -460,10 +460,10 @@ func (a *Aggregator) RowDataMarshalAppendPositions(buckets []*aggregatorBucket,
continue
}
// We have no badges for string tops
res = appendKeys(res, k, metricCache, usedTimestamps, newFormat, rnd, skey)
res = multiValueMarshal(rnd, k.Metric, metricCache, res, value, skey, sf, newFormat)
res = appendKeys(res, item.Key, metricCache, usedTimestamps, newFormat, rnd, skey)
res = multiValueMarshal(rnd, item.Key.Metric, metricCache, res, value, skey, sf, newFormat)
}
if k.Metric < 0 {
if item.Key.Metric < 0 {
is.builtin += len(res) - resPos
} else {
// TODO - separate into 3 keys - is_string_top/is_builtin and hll/percentile/value/counter
Expand All @@ -489,7 +489,7 @@ func (a *Aggregator) RowDataMarshalAppendPositions(buckets []*aggregatorBucket,
res = appendBadge(rnd, res, key, data_model.SimpleItemValue(sf, 1, a.aggregatorHost), metricCache, usedTimestamps, newFormat)
res = appendSimpleValueStat(rnd, res, key, sf, 1, a.aggregatorHost, metricCache, usedTimestamps, newFormat)
},
KeepF: func(k data_model.Key, item *data_model.MultiItem, bt uint32) { insertItem(k, item, item.SF, bt) },
KeepF: func(item *data_model.MultiItem, bt uint32) { insertItem(item, item.SF, bt) },
})
// First, sample with global sampling factors, depending on cardinality. Collect relative sizes for 2nd stage sampling below.
// TODO - actual sampleFactors are empty due to code commented out in estimator.go
Expand All @@ -509,9 +509,8 @@ func (a *Aggregator) RowDataMarshalAppendPositions(buckets []*aggregatorBucket,
hardwareMetric := format.HardwareMetric(item.Key.Metric)
if !ingestionStatus && !hardwareMetric {
// For now sample only ingestion statuses and hardware metrics on aggregator. Might be bad idea. TODO - check.
insertItem(item.Key, item, 1, b.time)
insertItem(item, 1, b.time)
sampler.KeepBuiltin(data_model.SamplingMultiItemPair{
Key: item.Key,
Item: item,
WhaleWeight: whaleWeight,
Size: item.RowBinarySizeEstimate(),
Expand All @@ -528,7 +527,6 @@ func (a *Aggregator) RowDataMarshalAppendPositions(buckets []*aggregatorBucket,
}
sz := item.RowBinarySizeEstimate()
sampler.Add(data_model.SamplingMultiItemPair{
Key: item.Key,
Item: item,
WhaleWeight: whaleWeight,
Size: sz,
Expand Down Expand Up @@ -560,17 +558,17 @@ func (a *Aggregator) RowDataMarshalAppendPositions(buckets []*aggregatorBucket,
for _, v := range sampler.MetricGroups {
// keep bytes
key := a.aggKey(recentTime, format.BuiltinMetricIDAggSamplingSizeBytes, [16]int32{0, historicTag, format.TagValueIDSamplingDecisionKeep, v.NamespaceID, v.GroupID, v.MetricID})
mi := data_model.MultiItem{Tail: data_model.MultiValue{Value: v.SumSizeKeep}}
insertItem(key, &mi, 1, buckets[0].time)
item := data_model.MultiItem{Key: key, Tail: data_model.MultiValue{Value: v.SumSizeKeep}}
insertItem(&item, 1, buckets[0].time)
// discard bytes
key = a.aggKey(recentTime, format.BuiltinMetricIDAggSamplingSizeBytes, [16]int32{0, historicTag, format.TagValueIDSamplingDecisionDiscard, v.NamespaceID, v.GroupID, v.MetricID})
mi = data_model.MultiItem{Tail: data_model.MultiValue{Value: v.SumSizeDiscard}}
insertItem(key, &mi, 1, buckets[0].time)
item = data_model.MultiItem{Key: key, Tail: data_model.MultiValue{Value: v.SumSizeDiscard}}
insertItem(&item, 1, buckets[0].time)
// budget
key = a.aggKey(recentTime, format.BuiltinMetricIDAggSamplingGroupBudget, [16]int32{0, historicTag, v.NamespaceID, v.GroupID})
item := data_model.MultiItem{}
item = data_model.MultiItem{Key: key}
item.Tail.Value.AddValue(v.Budget())
insertItem(key, &item, 1, buckets[0].time)
insertItem(&item, 1, buckets[0].time)
}
// report sampling engine time
res = appendSimpleValueStat(rnd, res, a.aggKey(recentTime, format.BuiltinMetricIDAggSamplingEngineTime, [16]int32{0, 1, 0, 0, historicTag}),
Expand All @@ -588,9 +586,9 @@ func (a *Aggregator) RowDataMarshalAppendPositions(buckets []*aggregatorBucket,

// report budget used
budgetKey := a.aggKey(recentTime, format.BuiltinMetricIDAggSamplingBudget, [16]int32{0, historicTag})
budgetItem := data_model.MultiItem{}
budgetItem := data_model.MultiItem{Key: budgetKey}
budgetItem.Tail.Value.AddValue(float64(remainingBudget))
insertItem(budgetKey, &budgetItem, 1, buckets[0].time)
insertItem(&budgetItem, 1, buckets[0].time)
res = appendSimpleValueStat(rnd, res, a.aggKey(recentTime, format.BuiltinMetricIDAggSamplingMetricCount, [16]int32{0, historicTag}),
float64(sampler.MetricCount), 1, a.aggregatorHost, metricCache, usedTimestamps, newFormat)

Expand Down
15 changes: 7 additions & 8 deletions internal/data_model/sampling.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ const maxFairKeyLen = 3

type (
SamplingMultiItemPair struct {
Key Key // TODO: remove Key from here in favor of MultiItem
Item *MultiItem
WhaleWeight float64 // whale selection criteria, for now sum Counters
Size int
Expand Down Expand Up @@ -62,8 +61,8 @@ type (
SampleFactorF func(int32, float64)

// Called when sampling algorithm decides to either keep or discard the item
KeepF func(Key, *MultiItem, uint32)
DiscardF func(Key, *MultiItem, uint32)
KeepF func(*MultiItem, uint32)
DiscardF func(*MultiItem, uint32)

// Unit tests support
RoundF func(float64, *rand.Rand) float64 // rounds sample factor to an integer
Expand Down Expand Up @@ -128,7 +127,7 @@ func (h *sampler) Add(p SamplingMultiItemPair) {
if p.Size < 1 {
p.Item.SF = math.MaxFloat32
if h.DiscardF != nil {
h.DiscardF(p.Key, p.Item, p.BucketTs)
h.DiscardF(p.Item, p.BucketTs)
}
h.sumSizeDiscard.AddValue(0)
return
Expand Down Expand Up @@ -161,8 +160,8 @@ func (h *sampler) Run(budget int64) {
n = maxFairKeyLen
}
for j := 0; j < n; j++ {
if x := h.items[i].metric.FairKey[j]; 0 <= x && x < len(h.items[i].Key.Tags) {
h.items[i].fairKey[j] = h.items[i].Key.Tags[x]
if x := h.items[i].metric.FairKey[j]; 0 <= x && x < len(h.items[i].Item.Key.Tags) {
h.items[i].fairKey[j] = h.items[i].Item.Key.Tags[x]
}
}
h.items[i].fairKeyLen = n
Expand Down Expand Up @@ -318,15 +317,15 @@ func (g samplerGroup) keep(h *sampler) {
func (p *SamplingMultiItemPair) keep(sf float64, h *sampler) {
p.Item.SF = sf // communicate selected factor to next step of processing
if h.KeepF != nil {
h.KeepF(p.Key, p.Item, p.BucketTs)
h.KeepF(p.Item, p.BucketTs)
}
h.currentGroup.SumSizeKeep.AddValue(float64(p.Size))
}

func (p *SamplingMultiItemPair) discard(sf float64, h *sampler) {
p.Item.SF = sf // communicate selected factor to next step of processing
if h.DiscardF != nil {
h.DiscardF(p.Key, p.Item, p.BucketTs)
h.DiscardF(p.Item, p.BucketTs)
}
h.currentGroup.SumSizeDiscard.AddValue(float64(p.Size))
}
Expand Down
72 changes: 35 additions & 37 deletions internal/data_model/sampling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ func TestSampling(t *testing.T) {
var keepSumSize int64
m := make(map[int32]*metricInfo)
s := NewSampler(len(b.MultiItems), SamplerConfig{
KeepF: func(k Key, item *MultiItem, _ uint32) {
KeepF: func(item *MultiItem, _ uint32) {
keepN++
keepSumSize += int64(samplingTestSizeOf(k, item))
stat := m[k.Metric]
keepSumSize += int64(samplingTestSizeOf(item))
stat := m[item.Key.Metric]
require.LessOrEqual(t, 1., item.SF)
require.LessOrEqual(t, stat.maxSF, float32(item.SF))
if item.SF > 1 {
Expand All @@ -48,10 +48,10 @@ func TestSampling(t *testing.T) {
}
}
},
DiscardF: func(k Key, item *MultiItem, _ uint32) {
DiscardF: func(item *MultiItem, _ uint32) {
discardN++
b.DeleteMultiItem(&k)
stat := m[k.Metric]
b.DeleteMultiItem(&item.Key)
stat := m[item.Key.Metric]
require.LessOrEqual(t, 1., item.SF)
require.LessOrEqual(t, stat.maxSF, float32(item.SF))
if item.SF > 1 {
Expand All @@ -74,7 +74,7 @@ func TestSampling(t *testing.T) {
v = &metricInfo{id: metric}
m[metric] = v
}
v.size += int64(samplingTestSizeOf(item.Key, item))
v.size += int64(samplingTestSizeOf(item))
}
budget := rapid.Int64Range(20, 20+b.sumSize*2).Draw(t, "budget")
metricCount := len(b.MultiItems)
Expand Down Expand Up @@ -122,8 +122,8 @@ func TestSamplingWithNilKeepF(t *testing.T) {
})
s := NewSampler(len(b.MultiItems), SamplerConfig{
KeepF: nil, // agent doesn't set it
DiscardF: func(k Key, item *MultiItem, _ uint32) {
b.DeleteMultiItem(&k)
DiscardF: func(item *MultiItem, _ uint32) {
b.DeleteMultiItem(&item.Key)
},
SelectF: func(s []SamplingMultiItemPair, sf float64, _ *rand.Rand) int {
return int(float64(len(s)) / sf)
Expand Down Expand Up @@ -165,10 +165,10 @@ func TestNoSamplingWhenFitBudget(t *testing.T) {
b.generateSeriesCount(t, samplingTestSpec{maxSeriesCount: 256, maxMetricCount: 256})
var (
s = NewSampler(len(b.MultiItems), SamplerConfig{
KeepF: func(k Key, v *MultiItem, _ uint32) {
b.DeleteMultiItem(&k)
KeepF: func(item *MultiItem, _ uint32) {
b.DeleteMultiItem(&item.Key)
},
DiscardF: func(k Key, _ *MultiItem, _ uint32) {
DiscardF: func(mi *MultiItem, _ uint32) {
t.Fatal("budget is enough but series were discarded")
},
})
Expand All @@ -185,11 +185,11 @@ func TestNormalDistributionPreserved(t *testing.T) {
b = newSamplingTestBucket()
r = rand.New()
statM = make(map[Key]*samplingTestStat, len(b.MultiItems))
keepF = func(k Key, item *MultiItem, _ uint32) {
keepF = func(item *MultiItem, _ uint32) {
var s *samplingTestStat
if s = statM[k]; s == nil {
if s = statM[item.Key]; s == nil {
s = &samplingTestStat{}
statM[k] = s
statM[item.Key] = s
}
s.aggregate(item)
}
Expand Down Expand Up @@ -244,8 +244,8 @@ func TestFairKeySampling(t *testing.T) {
require.Equal(t, sf, math.Floor(sf))
return int(float64(len(s)) / sf)
},
KeepF: func(k Key, mi *MultiItem, u uint32) {
keepCount[k]++
KeepF: func(item *MultiItem, u uint32) {
keepCount[item.Key]++
},
})
budget := b.sumSize * int64(n) / int64(len(b.MultiItems))
Expand Down Expand Up @@ -315,7 +315,7 @@ func TestCompareSampleFactors(t *testing.T) {
})
var sumSize int
for _, item := range b.MultiItems {
sumSize += samplingTestSizeOf(item.Key, item)
sumSize += samplingTestSizeOf(item)
}
bucket := MetricsBucket{MultiItemMap: b.MultiItemMap}
config := samplerConfigEx{
Expand All @@ -329,13 +329,13 @@ func TestCompareSampleFactors(t *testing.T) {
sampleBudget: rapid.IntRange(20, 20+sumSize*2).Draw(t, "max metric count"),
}
sizeSum := map[int]int{}
config.KeepF = func(k Key, v *MultiItem, _ uint32) {
sizeSum[int(k.Metric)] += samplingTestSizeOf(k, v)
config.KeepF = func(mi *MultiItem, _ uint32) {
sizeSum[int(mi.Key.Metric)] += samplingTestSizeOf(mi)
}
sf := sampleBucket(&bucket, config)
sizeSumLegacy := map[int]int{}
config.KeepF = func(k Key, v *MultiItem, _ uint32) {
sizeSumLegacy[int(k.Metric)] += samplingTestSizeOf(k, v)
config.KeepF = func(mi *MultiItem, _ uint32) {
sizeSumLegacy[int(mi.Key.Metric)] += samplingTestSizeOf(mi)
}
sfLegacy := sampleBucketLegacy(&bucket, config)
normSF := func(s []tlstatshouse.SampleFactor) []tlstatshouse.SampleFactor {
Expand Down Expand Up @@ -442,9 +442,9 @@ func (b *samplingTestBucket) generateSeriesSize(t *rapid.T, s samplingTestSpec)
)
for i := int32(1); size < sizeT; i++ {
var k = Key{Metric: metricID, Tags: [format.MaxTags]int32{i}}
mi, _ := miMap.GetOrCreateMultiItem(k, 0, nil)
mi.Tail.Value.AddValueCounter(0, 1)
size += samplingTestSizeOf(k, mi)
item, _ := miMap.GetOrCreateMultiItem(k, 0, nil)
item.Tail.Value.AddValueCounter(0, 1)
size += samplingTestSizeOf(item)
}
sumSize += int64(size)
}
Expand All @@ -461,18 +461,17 @@ func (b *samplingTestBucket) generateNormValues(r *rand.Rand) {
func (b *samplingTestBucket) run(s *sampler, budget int64) {
for _, item := range b.MultiItems {
s.Add(SamplingMultiItemPair{
Key: item.Key,
Item: item,
WhaleWeight: item.FinishStringTop(rand.New(), 20),
Size: samplingTestSizeOf(item.Key, item),
Size: samplingTestSizeOf(item),
MetricID: item.Key.Metric,
})
}
s.Run(budget)
}

func samplingTestSizeOf(k Key, item *MultiItem) int {
return k.TLSizeEstimate(k.Timestamp) + item.TLSizeEstimate()
func samplingTestSizeOf(mi *MultiItem) int {
return mi.Key.TLSizeEstimate(mi.Key.Timestamp) + mi.TLSizeEstimate()
}

type samplingTestStat struct {
Expand Down Expand Up @@ -566,7 +565,6 @@ func sampleBucket(bucket *MetricsBucket, config samplerConfigEx) []tlstatshouse.
}
}
sampler.Add(SamplingMultiItemPair{
Key: item.Key,
Item: item,
WhaleWeight: whaleWeight,
Size: sz,
Expand Down Expand Up @@ -624,7 +622,7 @@ func sampleBucketLegacy(bucket *MetricsBucket, config samplerConfigEx) []tlstats
remainingWeight += metric.metricWeight
}
metric.sumSize += int64(sz)
metric.items = append(metric.items, SamplingMultiItemPair{Key: item.Key, Item: item, WhaleWeight: whaleWeight})
metric.items = append(metric.items, SamplingMultiItemPair{Item: item, WhaleWeight: whaleWeight})
totalItemsSize += sz
}

Expand All @@ -650,7 +648,7 @@ func sampleBucketLegacy(bucket *MetricsBucket, config samplerConfigEx) []tlstats
// Keep all elements in bucket
if config.KeepF != nil {
for _, v := range samplingMetric.items {
config.KeepF(v.Key, v.Item, bucket.Time)
config.KeepF(v.Item, bucket.Time)
}
}
}
Expand All @@ -659,7 +657,7 @@ func sampleBucketLegacy(bucket *MetricsBucket, config samplerConfigEx) []tlstats
if samplingMetric.noSampleAgent {
if config.KeepF != nil {
for _, v := range samplingMetric.items {
config.KeepF(v.Key, v.Item, bucket.Time)
config.KeepF(v.Item, bucket.Time)
}
}
continue
Expand All @@ -670,7 +668,7 @@ func sampleBucketLegacy(bucket *MetricsBucket, config samplerConfigEx) []tlstats
if sf <= 1 { // Many sample factors are between 1 and 2, so this is worthy optimization
if config.KeepF != nil {
for _, v := range samplingMetric.items {
config.KeepF(v.Key, v.Item, bucket.Time)
config.KeepF(v.Item, bucket.Time)
}
}
continue
Expand Down Expand Up @@ -698,7 +696,7 @@ func sampleBucketLegacy(bucket *MetricsBucket, config samplerConfigEx) []tlstats
// Keep all whale elements in bucket
if config.KeepF != nil {
for _, v := range samplingMetric.items[:whalesAllowed] {
config.KeepF(v.Key, v.Item, bucket.Time)
config.KeepF(v.Item, bucket.Time)
}
}
samplingMetric.items = samplingMetric.items[whalesAllowed:]
Expand All @@ -708,13 +706,13 @@ func sampleBucketLegacy(bucket *MetricsBucket, config samplerConfigEx) []tlstats
for _, v := range samplingMetric.items[:pos] {
v.Item.SF = sf // communicate selected factor to next step of processing
if config.KeepF != nil {
config.KeepF(v.Key, v.Item, bucket.Time)
config.KeepF(v.Item, bucket.Time)
}
}
for _, v := range samplingMetric.items[pos:] {
v.Item.SF = sf // communicate selected factor to next step of processing
if config.DiscardF != nil {
config.DiscardF(v.Key, v.Item, bucket.Time)
config.DiscardF(v.Item, bucket.Time)
}
}
}
Expand Down

0 comments on commit 631a3a8

Please sign in to comment.