diff --git a/internal/aggregator/aggregator_insert.go b/internal/aggregator/aggregator_insert.go index 399b6a466..fd99b7725 100644 --- a/internal/aggregator/aggregator_insert.go +++ b/internal/aggregator/aggregator_insert.go @@ -120,9 +120,9 @@ func (p *metricIndexCache) skips(metricID int32) (skipMaxHost bool, skipMinHost return false, false, false } -func appendKeys(res []byte, k *data_model.Key, metricCache *metricIndexCache, usedTimestamps map[uint32]struct{}, v3Format bool, rnd *rand.Rand, stag string) []byte { +func appendKeys(res []byte, k *data_model.Key, metricCache *metricIndexCache, usedTimestamps map[uint32]struct{}, v3Format bool, top data_model.TagUnion) []byte { if v3Format { - return appendKeysNewFormat(res, k, metricCache, usedTimestamps, stag) + return appendKeysNewFormat(res, k, metricCache, usedTimestamps, top) } appendTag := func(res []byte, v uint32) []byte { res = binary.LittleEndian.AppendUint32(res, v) @@ -152,7 +152,7 @@ func appendKeys(res []byte, k *data_model.Key, metricCache *metricIndexCache, us return res } -func appendKeysNewFormat(res []byte, k *data_model.Key, metricCache *metricIndexCache, usedTimestamps map[uint32]struct{}, stop string) []byte { +func appendKeysNewFormat(res []byte, k *data_model.Key, metricCache *metricIndexCache, usedTimestamps map[uint32]struct{}, top data_model.TagUnion) []byte { appendTag := func(res []byte, k *data_model.Key, i int) []byte { if i >= len(k.Tags) { // temporary while we in transition between 16 and 48 tags res = binary.LittleEndian.AppendUint32(res, 0) @@ -182,11 +182,12 @@ func appendKeysNewFormat(res []byte, k *data_model.Key, metricCache *metricIndex res = appendTag(res, k, ki) } // write string top - if stop != "" { - res = binary.LittleEndian.AppendUint32(res, 0) - res = rowbinary.AppendString(res, stop) + if top.I > 0 || len(top.S) == 0 { // if we have both I and S use prefer I (we keep S to v2 compat) + res = binary.LittleEndian.AppendUint32(res, uint32(top.I)) + res = rowbinary.AppendString(res, "") } else { - res = appendTag(res, k, format.StringTopTagIndexV3) + res = binary.LittleEndian.AppendUint32(res, 0) + res = rowbinary.AppendString(res, top.S) } return res } @@ -230,21 +231,21 @@ func appendBadge(rng *rand.Rand, res []byte, k *data_model.Key, v data_model.Ite format.TagValueIDSrcIngestionStatusWarnTimestampClampedPast, format.TagValueIDSrcIngestionStatusWarnTimestampClampedFuture, format.TagValueIDSrcIngestionStatusWarnMapInvalidRawTagValue: - return appendValueStat(rng, res, &data_model.Key{Timestamp: ts, Metric: format.BuiltinMetricIDBadges, Tags: [16]int32{0, format.TagValueIDBadgeIngestionWarnings, k.Tags[1]}}, "", v, metricCache, usedTimestamps, v3Format) + return appendValueStat(rng, res, &data_model.Key{Timestamp: ts, Metric: format.BuiltinMetricIDBadges, Tags: [16]int32{0, format.TagValueIDBadgeIngestionWarnings, k.Tags[1]}}, v, metricCache, usedTimestamps, v3Format) } - return appendValueStat(rng, res, &data_model.Key{Timestamp: ts, Metric: format.BuiltinMetricIDBadges, Tags: [16]int32{0, format.TagValueIDBadgeIngestionErrors, k.Tags[1]}}, "", v, metricCache, usedTimestamps, v3Format) + return appendValueStat(rng, res, &data_model.Key{Timestamp: ts, Metric: format.BuiltinMetricIDBadges, Tags: [16]int32{0, format.TagValueIDBadgeIngestionErrors, k.Tags[1]}}, v, metricCache, usedTimestamps, v3Format) case format.BuiltinMetricIDAgentSamplingFactor: - return appendValueStat(rng, res, &data_model.Key{Timestamp: ts, Metric: format.BuiltinMetricIDBadges, Tags: [16]int32{0, format.TagValueIDBadgeAgentSamplingFactor, k.Tags[1]}}, "", v, metricCache, usedTimestamps, v3Format) + return appendValueStat(rng, res, &data_model.Key{Timestamp: ts, Metric: format.BuiltinMetricIDBadges, Tags: [16]int32{0, format.TagValueIDBadgeAgentSamplingFactor, k.Tags[1]}}, v, metricCache, usedTimestamps, v3Format) case format.BuiltinMetricIDAggSamplingFactor: - return appendValueStat(rng, res, &data_model.Key{Timestamp: ts, Metric: format.BuiltinMetricIDBadges, Tags: [16]int32{0, format.TagValueIDBadgeAggSamplingFactor, k.Tags[4]}}, "", v, metricCache, usedTimestamps, v3Format) + return appendValueStat(rng, res, &data_model.Key{Timestamp: ts, Metric: format.BuiltinMetricIDBadges, Tags: [16]int32{0, format.TagValueIDBadgeAggSamplingFactor, k.Tags[4]}}, v, metricCache, usedTimestamps, v3Format) case format.BuiltinMetricIDAggMappingCreated: if k.Tags[5] == format.TagValueIDAggMappingCreatedStatusOK || k.Tags[5] == format.TagValueIDAggMappingCreatedStatusCreated { return res } - return appendValueStat(rng, res, &data_model.Key{Timestamp: ts, Metric: format.BuiltinMetricIDBadges, Tags: [16]int32{0, format.TagValueIDBadgeAggMappingErrors, k.Tags[4]}}, "", v, metricCache, usedTimestamps, v3Format) + return appendValueStat(rng, res, &data_model.Key{Timestamp: ts, Metric: format.BuiltinMetricIDBadges, Tags: [16]int32{0, format.TagValueIDBadgeAggMappingErrors, k.Tags[4]}}, v, metricCache, usedTimestamps, v3Format) case format.BuiltinMetricIDAggBucketReceiveDelaySec: - return appendValueStat(rng, res, &data_model.Key{Timestamp: ts, Metric: format.BuiltinMetricIDBadges, Tags: [16]int32{0, format.TagValueIDBadgeContributors, 0}}, "", v, metricCache, usedTimestamps, v3Format) + return appendValueStat(rng, res, &data_model.Key{Timestamp: ts, Metric: format.BuiltinMetricIDBadges, Tags: [16]int32{0, format.TagValueIDBadgeContributors, 0}}, v, metricCache, usedTimestamps, v3Format) } return res } @@ -259,13 +260,13 @@ func appendAggregates(res []byte, c float64, mi float64, ma float64, su float64, return append(res, tmp[:]...) } -func appendValueStat(rng *rand.Rand, res []byte, key *data_model.Key, skey string, v data_model.ItemValue, cache *metricIndexCache, usedTimestamps map[uint32]struct{}, newFormat bool) []byte { +func appendValueStat(rng *rand.Rand, res []byte, key *data_model.Key, v data_model.ItemValue, cache *metricIndexCache, usedTimestamps map[uint32]struct{}, newFormat bool) []byte { count := v.Count() if count <= 0 { // We have lots of built-in counters which are normally 0 return res } // for explanation of insert logic, see multiValueMarshal below - res = appendKeys(res, key, cache, usedTimestamps, newFormat, nil /* don't replace mappings with strings from builtin metrics */, skey) + res = appendKeys(res, key, cache, usedTimestamps, newFormat, data_model.TagUnion{}) skipMaxHost, skipMinHost, skipSumSquare := cache.skips(key.Metric) if v.ValueSet { res = appendAggregates(res, count, v.ValueMin, v.ValueMax, v.ValueSum, zeroIfTrue(v.ValueSumSquare, skipSumSquare)) @@ -276,7 +277,7 @@ func appendValueStat(rng *rand.Rand, res []byte, key *data_model.Key, skey strin res = rowbinary.AppendEmptyCentroids(res) res = rowbinary.AppendEmptyUnique(res) if !newFormat { - res = rowbinary.AppendString(res, skey) + res = rowbinary.AppendString(res, "") } // min_host_legacy, max_host_legacy @@ -332,10 +333,10 @@ func appendValueStat(rng *rand.Rand, res []byte, key *data_model.Key, skey strin } func appendSimpleValueStat(rng *rand.Rand, res []byte, key *data_model.Key, v float64, count float64, hostTag int32, metricCache *metricIndexCache, usedTimestamps map[uint32]struct{}, newFormat bool) []byte { - return appendValueStat(rng, res, key, "", data_model.SimpleItemValue(v, count, hostTag), metricCache, usedTimestamps, newFormat) + return appendValueStat(rng, res, key, data_model.SimpleItemValue(v, count, hostTag), metricCache, usedTimestamps, newFormat) } -func multiValueMarshal(rng *rand.Rand, metricID int32, cache *metricIndexCache, res []byte, value *data_model.MultiValue, skey string, sf float64, v3Format bool) []byte { +func multiValueMarshal(rng *rand.Rand, metricID int32, cache *metricIndexCache, res []byte, value *data_model.MultiValue, top data_model.TagUnion, sf float64, v3Format bool) []byte { skipMaxHost, skipMinHost, skipSumSquare := cache.skips(metricID) counter := value.Value.Count() * sf if value.Value.ValueSet { @@ -349,7 +350,7 @@ func multiValueMarshal(rng *rand.Rand, metricID int32, cache *metricIndexCache, res = rowbinary.AppendCentroids(res, value.ValueTDigest, sf) res = value.HLL.MarshallAppend(res) if !v3Format { - res = rowbinary.AppendString(res, skey) + res = rowbinary.AppendString(res, top.S) } // min_host_legacy, max_host_legacy if value.Value.ValueSet { @@ -444,9 +445,9 @@ func (a *Aggregator) RowDataMarshalAppendPositions(buckets []*aggregatorBucket, resPos := len(res) if !item.Tail.Empty() { // only tail - res = appendKeys(res, &item.Key, metricCache, usedTimestamps, v3Format, rnd, "") + res = appendKeys(res, &item.Key, metricCache, usedTimestamps, v3Format, data_model.TagUnion{}) - res = multiValueMarshal(rnd, item.Key.Metric, metricCache, res, &item.Tail, "", sf, v3Format) + res = multiValueMarshal(rnd, item.Key.Metric, metricCache, res, &item.Tail, data_model.TagUnion{}, sf, v3Format) if item.Key.Metric < 0 { is.builtin += len(res) - resPos @@ -469,8 +470,8 @@ func (a *Aggregator) RowDataMarshalAppendPositions(buckets []*aggregatorBucket, continue } // We have no badges for string tops - res = appendKeys(res, &item.Key, metricCache, usedTimestamps, v3Format, rnd, key.S) // TODO - insert I - res = multiValueMarshal(rnd, item.Key.Metric, metricCache, res, value, key.S, sf, v3Format) // TODO - insert I + res = appendKeys(res, &item.Key, metricCache, usedTimestamps, v3Format, key) // TODO - insert I + res = multiValueMarshal(rnd, item.Key.Metric, metricCache, res, value, key, sf, v3Format) // TODO - insert I } if item.Key.Metric < 0 { is.builtin += len(res) - resPos @@ -591,7 +592,7 @@ func (a *Aggregator) RowDataMarshalAppendPositions(buckets []*aggregatorBucket, res = appendSimpleValueStat(rnd, res, a.aggKey(recentTime, format.BuiltinMetricIDAggSamplingEngineTime, [16]int32{0, 5, 0, 0, historicTag}), float64(sampler.TimeMetricMeta()), 1, a.aggregatorHost, metricCache, usedTimestamps, v3Format) res = appendValueStat(rnd, res, a.aggKey(recentTime, format.BuiltinMetricIDAggSamplingEngineKeys, [16]int32{0, 0, 0, 0, historicTag}), - "", data_model.SimpleItemCounter(float64(sampler.ItemCount()), a.aggregatorHost), metricCache, usedTimestamps, v3Format) + data_model.SimpleItemCounter(float64(sampler.ItemCount()), a.aggregatorHost), metricCache, usedTimestamps, v3Format) // report budget used budgetKey := a.aggKey(recentTime, format.BuiltinMetricIDAggSamplingBudget, [16]int32{0, historicTag})