diff --git a/internal/agent/agent_shard_send.go b/internal/agent/agent_shard_send.go index 83d7dd826..885959f5e 100644 --- a/internal/agent/agent_shard_send.go +++ b/internal/agent/agent_shard_send.go @@ -258,8 +258,8 @@ func (s *Shard) sampleBucket(bucket *data_model.MetricsBucket, rnd *rand.Rand) [ SampleKeys: config.SampleKeys, Meta: s.agent.metricStorage, Rand: rnd, - DiscardF: func(key data_model.Key, item *data_model.MultiItem, _ uint32) { - bucket.DeleteMultiItem(&key) + DiscardF: func(item *data_model.MultiItem, _ uint32) { + bucket.DeleteMultiItem(&item.Key) }, // remove from map }) for _, item := range bucket.MultiItems { @@ -279,7 +279,6 @@ func (s *Shard) sampleBucket(bucket *data_model.MetricsBucket, rnd *rand.Rand) [ } } sampler.Add(data_model.SamplingMultiItemPair{ - Key: item.Key, Item: item, WhaleWeight: whaleWeight, Size: sz, diff --git a/internal/aggregator/aggregator_insert.go b/internal/aggregator/aggregator_insert.go index c1cc8626f..e34fadfed 100644 --- a/internal/aggregator/aggregator_insert.go +++ b/internal/aggregator/aggregator_insert.go @@ -430,16 +430,16 @@ func (a *Aggregator) RowDataMarshalAppendPositions(buckets []*aggregatorBucket, metricCache := makeMetricCache(a.metricStorage) usedTimestamps := map[uint32]struct{}{} - insertItem := func(k data_model.Key, item *data_model.MultiItem, sf float64, bucketTs uint32) { // lambda is convenient here + insertItem := func(item *data_model.MultiItem, sf float64, bucketTs uint32) { // lambda is convenient here is := insertSize{} resPos := len(res) if !item.Tail.Empty() { // only tail - res = appendKeys(res, k, metricCache, usedTimestamps, newFormat, rnd, "") + res = appendKeys(res, item.Key, metricCache, usedTimestamps, newFormat, rnd, "") - res = multiValueMarshal(rnd, k.Metric, metricCache, res, &item.Tail, "", sf, newFormat) + res = multiValueMarshal(rnd, item.Key.Metric, metricCache, res, &item.Tail, "", sf, newFormat) - if k.Metric < 0 { + if item.Key.Metric < 0 { is.builtin += len(res) - resPos } else { switch { @@ -460,10 +460,10 @@ func (a *Aggregator) RowDataMarshalAppendPositions(buckets []*aggregatorBucket, continue } // We have no badges for string tops - res = appendKeys(res, k, metricCache, usedTimestamps, newFormat, rnd, skey) - res = multiValueMarshal(rnd, k.Metric, metricCache, res, value, skey, sf, newFormat) + res = appendKeys(res, item.Key, metricCache, usedTimestamps, newFormat, rnd, skey) + res = multiValueMarshal(rnd, item.Key.Metric, metricCache, res, value, skey, sf, newFormat) } - if k.Metric < 0 { + if item.Key.Metric < 0 { is.builtin += len(res) - resPos } else { // TODO - separate into 3 keys - is_string_top/is_builtin and hll/percentile/value/counter @@ -489,7 +489,7 @@ func (a *Aggregator) RowDataMarshalAppendPositions(buckets []*aggregatorBucket, res = appendBadge(rnd, res, key, data_model.SimpleItemValue(sf, 1, a.aggregatorHost), metricCache, usedTimestamps, newFormat) res = appendSimpleValueStat(rnd, res, key, sf, 1, a.aggregatorHost, metricCache, usedTimestamps, newFormat) }, - KeepF: func(k data_model.Key, item *data_model.MultiItem, bt uint32) { insertItem(k, item, item.SF, bt) }, + KeepF: func(item *data_model.MultiItem, bt uint32) { insertItem(item, item.SF, bt) }, }) // First, sample with global sampling factors, depending on cardinality. Collect relative sizes for 2nd stage sampling below. // TODO - actual sampleFactors are empty due to code commented out in estimator.go @@ -509,9 +509,8 @@ func (a *Aggregator) RowDataMarshalAppendPositions(buckets []*aggregatorBucket, hardwareMetric := format.HardwareMetric(item.Key.Metric) if !ingestionStatus && !hardwareMetric { // For now sample only ingestion statuses and hardware metrics on aggregator. Might be bad idea. TODO - check. - insertItem(item.Key, item, 1, b.time) + insertItem(item, 1, b.time) sampler.KeepBuiltin(data_model.SamplingMultiItemPair{ - Key: item.Key, Item: item, WhaleWeight: whaleWeight, Size: item.RowBinarySizeEstimate(), @@ -528,7 +527,6 @@ func (a *Aggregator) RowDataMarshalAppendPositions(buckets []*aggregatorBucket, } sz := item.RowBinarySizeEstimate() sampler.Add(data_model.SamplingMultiItemPair{ - Key: item.Key, Item: item, WhaleWeight: whaleWeight, Size: sz, @@ -560,17 +558,17 @@ func (a *Aggregator) RowDataMarshalAppendPositions(buckets []*aggregatorBucket, for _, v := range sampler.MetricGroups { // keep bytes key := a.aggKey(recentTime, format.BuiltinMetricIDAggSamplingSizeBytes, [16]int32{0, historicTag, format.TagValueIDSamplingDecisionKeep, v.NamespaceID, v.GroupID, v.MetricID}) - mi := data_model.MultiItem{Tail: data_model.MultiValue{Value: v.SumSizeKeep}} - insertItem(key, &mi, 1, buckets[0].time) + item := data_model.MultiItem{Key: key, Tail: data_model.MultiValue{Value: v.SumSizeKeep}} + insertItem(&item, 1, buckets[0].time) // discard bytes key = a.aggKey(recentTime, format.BuiltinMetricIDAggSamplingSizeBytes, [16]int32{0, historicTag, format.TagValueIDSamplingDecisionDiscard, v.NamespaceID, v.GroupID, v.MetricID}) - mi = data_model.MultiItem{Tail: data_model.MultiValue{Value: v.SumSizeDiscard}} - insertItem(key, &mi, 1, buckets[0].time) + item = data_model.MultiItem{Key: key, Tail: data_model.MultiValue{Value: v.SumSizeDiscard}} + insertItem(&item, 1, buckets[0].time) // budget key = a.aggKey(recentTime, format.BuiltinMetricIDAggSamplingGroupBudget, [16]int32{0, historicTag, v.NamespaceID, v.GroupID}) - item := data_model.MultiItem{} + item = data_model.MultiItem{Key: key} item.Tail.Value.AddValue(v.Budget()) - insertItem(key, &item, 1, buckets[0].time) + insertItem(&item, 1, buckets[0].time) } // report sampling engine time res = appendSimpleValueStat(rnd, res, a.aggKey(recentTime, format.BuiltinMetricIDAggSamplingEngineTime, [16]int32{0, 1, 0, 0, historicTag}), @@ -588,9 +586,9 @@ func (a *Aggregator) RowDataMarshalAppendPositions(buckets []*aggregatorBucket, // report budget used budgetKey := a.aggKey(recentTime, format.BuiltinMetricIDAggSamplingBudget, [16]int32{0, historicTag}) - budgetItem := data_model.MultiItem{} + budgetItem := data_model.MultiItem{Key: budgetKey} budgetItem.Tail.Value.AddValue(float64(remainingBudget)) - insertItem(budgetKey, &budgetItem, 1, buckets[0].time) + insertItem(&budgetItem, 1, buckets[0].time) res = appendSimpleValueStat(rnd, res, a.aggKey(recentTime, format.BuiltinMetricIDAggSamplingMetricCount, [16]int32{0, historicTag}), float64(sampler.MetricCount), 1, a.aggregatorHost, metricCache, usedTimestamps, newFormat) diff --git a/internal/data_model/sampling.go b/internal/data_model/sampling.go index 4ebd3a257..0ed024c8c 100644 --- a/internal/data_model/sampling.go +++ b/internal/data_model/sampling.go @@ -20,7 +20,6 @@ const maxFairKeyLen = 3 type ( SamplingMultiItemPair struct { - Key Key // TODO: remove Key from here in favor of MultiItem Item *MultiItem WhaleWeight float64 // whale selection criteria, for now sum Counters Size int @@ -62,8 +61,8 @@ type ( SampleFactorF func(int32, float64) // Called when sampling algorithm decides to either keep or discard the item - KeepF func(Key, *MultiItem, uint32) - DiscardF func(Key, *MultiItem, uint32) + KeepF func(*MultiItem, uint32) + DiscardF func(*MultiItem, uint32) // Unit tests support RoundF func(float64, *rand.Rand) float64 // rounds sample factor to an integer @@ -128,7 +127,7 @@ func (h *sampler) Add(p SamplingMultiItemPair) { if p.Size < 1 { p.Item.SF = math.MaxFloat32 if h.DiscardF != nil { - h.DiscardF(p.Key, p.Item, p.BucketTs) + h.DiscardF(p.Item, p.BucketTs) } h.sumSizeDiscard.AddValue(0) return @@ -161,8 +160,8 @@ func (h *sampler) Run(budget int64) { n = maxFairKeyLen } for j := 0; j < n; j++ { - if x := h.items[i].metric.FairKey[j]; 0 <= x && x < len(h.items[i].Key.Tags) { - h.items[i].fairKey[j] = h.items[i].Key.Tags[x] + if x := h.items[i].metric.FairKey[j]; 0 <= x && x < len(h.items[i].Item.Key.Tags) { + h.items[i].fairKey[j] = h.items[i].Item.Key.Tags[x] } } h.items[i].fairKeyLen = n @@ -318,7 +317,7 @@ func (g samplerGroup) keep(h *sampler) { func (p *SamplingMultiItemPair) keep(sf float64, h *sampler) { p.Item.SF = sf // communicate selected factor to next step of processing if h.KeepF != nil { - h.KeepF(p.Key, p.Item, p.BucketTs) + h.KeepF(p.Item, p.BucketTs) } h.currentGroup.SumSizeKeep.AddValue(float64(p.Size)) } @@ -326,7 +325,7 @@ func (p *SamplingMultiItemPair) keep(sf float64, h *sampler) { func (p *SamplingMultiItemPair) discard(sf float64, h *sampler) { p.Item.SF = sf // communicate selected factor to next step of processing if h.DiscardF != nil { - h.DiscardF(p.Key, p.Item, p.BucketTs) + h.DiscardF(p.Item, p.BucketTs) } h.currentGroup.SumSizeDiscard.AddValue(float64(p.Size)) } diff --git a/internal/data_model/sampling_test.go b/internal/data_model/sampling_test.go index f820d720e..fa81f552b 100644 --- a/internal/data_model/sampling_test.go +++ b/internal/data_model/sampling_test.go @@ -33,10 +33,10 @@ func TestSampling(t *testing.T) { var keepSumSize int64 m := make(map[int32]*metricInfo) s := NewSampler(len(b.MultiItems), SamplerConfig{ - KeepF: func(k Key, item *MultiItem, _ uint32) { + KeepF: func(item *MultiItem, _ uint32) { keepN++ - keepSumSize += int64(samplingTestSizeOf(k, item)) - stat := m[k.Metric] + keepSumSize += int64(samplingTestSizeOf(item)) + stat := m[item.Key.Metric] require.LessOrEqual(t, 1., item.SF) require.LessOrEqual(t, stat.maxSF, float32(item.SF)) if item.SF > 1 { @@ -48,10 +48,10 @@ func TestSampling(t *testing.T) { } } }, - DiscardF: func(k Key, item *MultiItem, _ uint32) { + DiscardF: func(item *MultiItem, _ uint32) { discardN++ - b.DeleteMultiItem(&k) - stat := m[k.Metric] + b.DeleteMultiItem(&item.Key) + stat := m[item.Key.Metric] require.LessOrEqual(t, 1., item.SF) require.LessOrEqual(t, stat.maxSF, float32(item.SF)) if item.SF > 1 { @@ -74,7 +74,7 @@ func TestSampling(t *testing.T) { v = &metricInfo{id: metric} m[metric] = v } - v.size += int64(samplingTestSizeOf(item.Key, item)) + v.size += int64(samplingTestSizeOf(item)) } budget := rapid.Int64Range(20, 20+b.sumSize*2).Draw(t, "budget") metricCount := len(b.MultiItems) @@ -122,8 +122,8 @@ func TestSamplingWithNilKeepF(t *testing.T) { }) s := NewSampler(len(b.MultiItems), SamplerConfig{ KeepF: nil, // agent doesn't set it - DiscardF: func(k Key, item *MultiItem, _ uint32) { - b.DeleteMultiItem(&k) + DiscardF: func(item *MultiItem, _ uint32) { + b.DeleteMultiItem(&item.Key) }, SelectF: func(s []SamplingMultiItemPair, sf float64, _ *rand.Rand) int { return int(float64(len(s)) / sf) @@ -165,10 +165,10 @@ func TestNoSamplingWhenFitBudget(t *testing.T) { b.generateSeriesCount(t, samplingTestSpec{maxSeriesCount: 256, maxMetricCount: 256}) var ( s = NewSampler(len(b.MultiItems), SamplerConfig{ - KeepF: func(k Key, v *MultiItem, _ uint32) { - b.DeleteMultiItem(&k) + KeepF: func(item *MultiItem, _ uint32) { + b.DeleteMultiItem(&item.Key) }, - DiscardF: func(k Key, _ *MultiItem, _ uint32) { + DiscardF: func(mi *MultiItem, _ uint32) { t.Fatal("budget is enough but series were discarded") }, }) @@ -185,11 +185,11 @@ func TestNormalDistributionPreserved(t *testing.T) { b = newSamplingTestBucket() r = rand.New() statM = make(map[Key]*samplingTestStat, len(b.MultiItems)) - keepF = func(k Key, item *MultiItem, _ uint32) { + keepF = func(item *MultiItem, _ uint32) { var s *samplingTestStat - if s = statM[k]; s == nil { + if s = statM[item.Key]; s == nil { s = &samplingTestStat{} - statM[k] = s + statM[item.Key] = s } s.aggregate(item) } @@ -244,8 +244,8 @@ func TestFairKeySampling(t *testing.T) { require.Equal(t, sf, math.Floor(sf)) return int(float64(len(s)) / sf) }, - KeepF: func(k Key, mi *MultiItem, u uint32) { - keepCount[k]++ + KeepF: func(item *MultiItem, u uint32) { + keepCount[item.Key]++ }, }) budget := b.sumSize * int64(n) / int64(len(b.MultiItems)) @@ -315,7 +315,7 @@ func TestCompareSampleFactors(t *testing.T) { }) var sumSize int for _, item := range b.MultiItems { - sumSize += samplingTestSizeOf(item.Key, item) + sumSize += samplingTestSizeOf(item) } bucket := MetricsBucket{MultiItemMap: b.MultiItemMap} config := samplerConfigEx{ @@ -329,13 +329,13 @@ func TestCompareSampleFactors(t *testing.T) { sampleBudget: rapid.IntRange(20, 20+sumSize*2).Draw(t, "max metric count"), } sizeSum := map[int]int{} - config.KeepF = func(k Key, v *MultiItem, _ uint32) { - sizeSum[int(k.Metric)] += samplingTestSizeOf(k, v) + config.KeepF = func(mi *MultiItem, _ uint32) { + sizeSum[int(mi.Key.Metric)] += samplingTestSizeOf(mi) } sf := sampleBucket(&bucket, config) sizeSumLegacy := map[int]int{} - config.KeepF = func(k Key, v *MultiItem, _ uint32) { - sizeSumLegacy[int(k.Metric)] += samplingTestSizeOf(k, v) + config.KeepF = func(mi *MultiItem, _ uint32) { + sizeSumLegacy[int(mi.Key.Metric)] += samplingTestSizeOf(mi) } sfLegacy := sampleBucketLegacy(&bucket, config) normSF := func(s []tlstatshouse.SampleFactor) []tlstatshouse.SampleFactor { @@ -442,9 +442,9 @@ func (b *samplingTestBucket) generateSeriesSize(t *rapid.T, s samplingTestSpec) ) for i := int32(1); size < sizeT; i++ { var k = Key{Metric: metricID, Tags: [format.MaxTags]int32{i}} - mi, _ := miMap.GetOrCreateMultiItem(k, 0, nil) - mi.Tail.Value.AddValueCounter(0, 1) - size += samplingTestSizeOf(k, mi) + item, _ := miMap.GetOrCreateMultiItem(k, 0, nil) + item.Tail.Value.AddValueCounter(0, 1) + size += samplingTestSizeOf(item) } sumSize += int64(size) } @@ -461,18 +461,17 @@ func (b *samplingTestBucket) generateNormValues(r *rand.Rand) { func (b *samplingTestBucket) run(s *sampler, budget int64) { for _, item := range b.MultiItems { s.Add(SamplingMultiItemPair{ - Key: item.Key, Item: item, WhaleWeight: item.FinishStringTop(rand.New(), 20), - Size: samplingTestSizeOf(item.Key, item), + Size: samplingTestSizeOf(item), MetricID: item.Key.Metric, }) } s.Run(budget) } -func samplingTestSizeOf(k Key, item *MultiItem) int { - return k.TLSizeEstimate(k.Timestamp) + item.TLSizeEstimate() +func samplingTestSizeOf(mi *MultiItem) int { + return mi.Key.TLSizeEstimate(mi.Key.Timestamp) + mi.TLSizeEstimate() } type samplingTestStat struct { @@ -566,7 +565,6 @@ func sampleBucket(bucket *MetricsBucket, config samplerConfigEx) []tlstatshouse. } } sampler.Add(SamplingMultiItemPair{ - Key: item.Key, Item: item, WhaleWeight: whaleWeight, Size: sz, @@ -624,7 +622,7 @@ func sampleBucketLegacy(bucket *MetricsBucket, config samplerConfigEx) []tlstats remainingWeight += metric.metricWeight } metric.sumSize += int64(sz) - metric.items = append(metric.items, SamplingMultiItemPair{Key: item.Key, Item: item, WhaleWeight: whaleWeight}) + metric.items = append(metric.items, SamplingMultiItemPair{Item: item, WhaleWeight: whaleWeight}) totalItemsSize += sz } @@ -650,7 +648,7 @@ func sampleBucketLegacy(bucket *MetricsBucket, config samplerConfigEx) []tlstats // Keep all elements in bucket if config.KeepF != nil { for _, v := range samplingMetric.items { - config.KeepF(v.Key, v.Item, bucket.Time) + config.KeepF(v.Item, bucket.Time) } } } @@ -659,7 +657,7 @@ func sampleBucketLegacy(bucket *MetricsBucket, config samplerConfigEx) []tlstats if samplingMetric.noSampleAgent { if config.KeepF != nil { for _, v := range samplingMetric.items { - config.KeepF(v.Key, v.Item, bucket.Time) + config.KeepF(v.Item, bucket.Time) } } continue @@ -670,7 +668,7 @@ func sampleBucketLegacy(bucket *MetricsBucket, config samplerConfigEx) []tlstats if sf <= 1 { // Many sample factors are between 1 and 2, so this is worthy optimization if config.KeepF != nil { for _, v := range samplingMetric.items { - config.KeepF(v.Key, v.Item, bucket.Time) + config.KeepF(v.Item, bucket.Time) } } continue @@ -698,7 +696,7 @@ func sampleBucketLegacy(bucket *MetricsBucket, config samplerConfigEx) []tlstats // Keep all whale elements in bucket if config.KeepF != nil { for _, v := range samplingMetric.items[:whalesAllowed] { - config.KeepF(v.Key, v.Item, bucket.Time) + config.KeepF(v.Item, bucket.Time) } } samplingMetric.items = samplingMetric.items[whalesAllowed:] @@ -708,13 +706,13 @@ func sampleBucketLegacy(bucket *MetricsBucket, config samplerConfigEx) []tlstats for _, v := range samplingMetric.items[:pos] { v.Item.SF = sf // communicate selected factor to next step of processing if config.KeepF != nil { - config.KeepF(v.Key, v.Item, bucket.Time) + config.KeepF(v.Item, bucket.Time) } } for _, v := range samplingMetric.items[pos:] { v.Item.SF = sf // communicate selected factor to next step of processing if config.DiscardF != nil { - config.DiscardF(v.Key, v.Item, bucket.Time) + config.DiscardF(v.Item, bucket.Time) } } }