Skip to content

Commit

Permalink
Write top tag into CH
Browse files Browse the repository at this point in the history
for v2 we always write string
for v3 we write mapping if it's avaliable
  • Loading branch information
razmser committed Jan 23, 2025
1 parent 8c3204f commit b11916d
Showing 1 changed file with 25 additions and 24 deletions.
49 changes: 25 additions & 24 deletions internal/aggregator/aggregator_insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,9 @@ func (p *metricIndexCache) skips(metricID int32) (skipMaxHost bool, skipMinHost
return false, false, false
}

func appendKeys(res []byte, k *data_model.Key, metricCache *metricIndexCache, usedTimestamps map[uint32]struct{}, v3Format bool, rnd *rand.Rand, stag string) []byte {
func appendKeys(res []byte, k *data_model.Key, metricCache *metricIndexCache, usedTimestamps map[uint32]struct{}, v3Format bool, top data_model.TagUnion) []byte {
if v3Format {
return appendKeysNewFormat(res, k, metricCache, usedTimestamps, stag)
return appendKeysNewFormat(res, k, metricCache, usedTimestamps, top)
}
appendTag := func(res []byte, v uint32) []byte {
res = binary.LittleEndian.AppendUint32(res, v)
Expand Down Expand Up @@ -152,7 +152,7 @@ func appendKeys(res []byte, k *data_model.Key, metricCache *metricIndexCache, us
return res
}

func appendKeysNewFormat(res []byte, k *data_model.Key, metricCache *metricIndexCache, usedTimestamps map[uint32]struct{}, stop string) []byte {
func appendKeysNewFormat(res []byte, k *data_model.Key, metricCache *metricIndexCache, usedTimestamps map[uint32]struct{}, top data_model.TagUnion) []byte {
appendTag := func(res []byte, k *data_model.Key, i int) []byte {
if i >= len(k.Tags) { // temporary while we in transition between 16 and 48 tags
res = binary.LittleEndian.AppendUint32(res, 0)
Expand Down Expand Up @@ -182,11 +182,12 @@ func appendKeysNewFormat(res []byte, k *data_model.Key, metricCache *metricIndex
res = appendTag(res, k, ki)
}
// write string top
if stop != "" {
res = binary.LittleEndian.AppendUint32(res, 0)
res = rowbinary.AppendString(res, stop)
if top.I > 0 || len(top.S) == 0 { // if we have both I and S use prefer I (we keep S to v2 compat)
res = binary.LittleEndian.AppendUint32(res, uint32(top.I))
res = rowbinary.AppendString(res, "")
} else {
res = appendTag(res, k, format.StringTopTagIndexV3)
res = binary.LittleEndian.AppendUint32(res, 0)
res = rowbinary.AppendString(res, top.S)
}
return res
}
Expand Down Expand Up @@ -230,21 +231,21 @@ func appendBadge(rng *rand.Rand, res []byte, k *data_model.Key, v data_model.Ite
format.TagValueIDSrcIngestionStatusWarnTimestampClampedPast,
format.TagValueIDSrcIngestionStatusWarnTimestampClampedFuture,
format.TagValueIDSrcIngestionStatusWarnMapInvalidRawTagValue:
return appendValueStat(rng, res, &data_model.Key{Timestamp: ts, Metric: format.BuiltinMetricIDBadges, Tags: [16]int32{0, format.TagValueIDBadgeIngestionWarnings, k.Tags[1]}}, "", v, metricCache, usedTimestamps, v3Format)
return appendValueStat(rng, res, &data_model.Key{Timestamp: ts, Metric: format.BuiltinMetricIDBadges, Tags: [16]int32{0, format.TagValueIDBadgeIngestionWarnings, k.Tags[1]}}, v, metricCache, usedTimestamps, v3Format)
}
return appendValueStat(rng, res, &data_model.Key{Timestamp: ts, Metric: format.BuiltinMetricIDBadges, Tags: [16]int32{0, format.TagValueIDBadgeIngestionErrors, k.Tags[1]}}, "", v, metricCache, usedTimestamps, v3Format)
return appendValueStat(rng, res, &data_model.Key{Timestamp: ts, Metric: format.BuiltinMetricIDBadges, Tags: [16]int32{0, format.TagValueIDBadgeIngestionErrors, k.Tags[1]}}, v, metricCache, usedTimestamps, v3Format)
case format.BuiltinMetricIDAgentSamplingFactor:
return appendValueStat(rng, res, &data_model.Key{Timestamp: ts, Metric: format.BuiltinMetricIDBadges, Tags: [16]int32{0, format.TagValueIDBadgeAgentSamplingFactor, k.Tags[1]}}, "", v, metricCache, usedTimestamps, v3Format)
return appendValueStat(rng, res, &data_model.Key{Timestamp: ts, Metric: format.BuiltinMetricIDBadges, Tags: [16]int32{0, format.TagValueIDBadgeAgentSamplingFactor, k.Tags[1]}}, v, metricCache, usedTimestamps, v3Format)
case format.BuiltinMetricIDAggSamplingFactor:
return appendValueStat(rng, res, &data_model.Key{Timestamp: ts, Metric: format.BuiltinMetricIDBadges, Tags: [16]int32{0, format.TagValueIDBadgeAggSamplingFactor, k.Tags[4]}}, "", v, metricCache, usedTimestamps, v3Format)
return appendValueStat(rng, res, &data_model.Key{Timestamp: ts, Metric: format.BuiltinMetricIDBadges, Tags: [16]int32{0, format.TagValueIDBadgeAggSamplingFactor, k.Tags[4]}}, v, metricCache, usedTimestamps, v3Format)
case format.BuiltinMetricIDAggMappingCreated:
if k.Tags[5] == format.TagValueIDAggMappingCreatedStatusOK ||
k.Tags[5] == format.TagValueIDAggMappingCreatedStatusCreated {
return res
}
return appendValueStat(rng, res, &data_model.Key{Timestamp: ts, Metric: format.BuiltinMetricIDBadges, Tags: [16]int32{0, format.TagValueIDBadgeAggMappingErrors, k.Tags[4]}}, "", v, metricCache, usedTimestamps, v3Format)
return appendValueStat(rng, res, &data_model.Key{Timestamp: ts, Metric: format.BuiltinMetricIDBadges, Tags: [16]int32{0, format.TagValueIDBadgeAggMappingErrors, k.Tags[4]}}, v, metricCache, usedTimestamps, v3Format)
case format.BuiltinMetricIDAggBucketReceiveDelaySec:
return appendValueStat(rng, res, &data_model.Key{Timestamp: ts, Metric: format.BuiltinMetricIDBadges, Tags: [16]int32{0, format.TagValueIDBadgeContributors, 0}}, "", v, metricCache, usedTimestamps, v3Format)
return appendValueStat(rng, res, &data_model.Key{Timestamp: ts, Metric: format.BuiltinMetricIDBadges, Tags: [16]int32{0, format.TagValueIDBadgeContributors, 0}}, v, metricCache, usedTimestamps, v3Format)
}
return res
}
Expand All @@ -259,13 +260,13 @@ func appendAggregates(res []byte, c float64, mi float64, ma float64, su float64,
return append(res, tmp[:]...)
}

func appendValueStat(rng *rand.Rand, res []byte, key *data_model.Key, skey string, v data_model.ItemValue, cache *metricIndexCache, usedTimestamps map[uint32]struct{}, newFormat bool) []byte {
func appendValueStat(rng *rand.Rand, res []byte, key *data_model.Key, v data_model.ItemValue, cache *metricIndexCache, usedTimestamps map[uint32]struct{}, newFormat bool) []byte {
count := v.Count()
if count <= 0 { // We have lots of built-in counters which are normally 0
return res
}
// for explanation of insert logic, see multiValueMarshal below
res = appendKeys(res, key, cache, usedTimestamps, newFormat, nil /* don't replace mappings with strings from builtin metrics */, skey)
res = appendKeys(res, key, cache, usedTimestamps, newFormat, data_model.TagUnion{})
skipMaxHost, skipMinHost, skipSumSquare := cache.skips(key.Metric)
if v.ValueSet {
res = appendAggregates(res, count, v.ValueMin, v.ValueMax, v.ValueSum, zeroIfTrue(v.ValueSumSquare, skipSumSquare))
Expand All @@ -276,7 +277,7 @@ func appendValueStat(rng *rand.Rand, res []byte, key *data_model.Key, skey strin
res = rowbinary.AppendEmptyCentroids(res)
res = rowbinary.AppendEmptyUnique(res)
if !newFormat {
res = rowbinary.AppendString(res, skey)
res = rowbinary.AppendString(res, "")
}

// min_host_legacy, max_host_legacy
Expand Down Expand Up @@ -332,10 +333,10 @@ func appendValueStat(rng *rand.Rand, res []byte, key *data_model.Key, skey strin
}

func appendSimpleValueStat(rng *rand.Rand, res []byte, key *data_model.Key, v float64, count float64, hostTag int32, metricCache *metricIndexCache, usedTimestamps map[uint32]struct{}, newFormat bool) []byte {
return appendValueStat(rng, res, key, "", data_model.SimpleItemValue(v, count, hostTag), metricCache, usedTimestamps, newFormat)
return appendValueStat(rng, res, key, data_model.SimpleItemValue(v, count, hostTag), metricCache, usedTimestamps, newFormat)
}

func multiValueMarshal(rng *rand.Rand, metricID int32, cache *metricIndexCache, res []byte, value *data_model.MultiValue, skey string, sf float64, v3Format bool) []byte {
func multiValueMarshal(rng *rand.Rand, metricID int32, cache *metricIndexCache, res []byte, value *data_model.MultiValue, top data_model.TagUnion, sf float64, v3Format bool) []byte {
skipMaxHost, skipMinHost, skipSumSquare := cache.skips(metricID)
counter := value.Value.Count() * sf
if value.Value.ValueSet {
Expand All @@ -349,7 +350,7 @@ func multiValueMarshal(rng *rand.Rand, metricID int32, cache *metricIndexCache,
res = rowbinary.AppendCentroids(res, value.ValueTDigest, sf)
res = value.HLL.MarshallAppend(res)
if !v3Format {
res = rowbinary.AppendString(res, skey)
res = rowbinary.AppendString(res, top.S)
}
// min_host_legacy, max_host_legacy
if value.Value.ValueSet {
Expand Down Expand Up @@ -444,9 +445,9 @@ func (a *Aggregator) RowDataMarshalAppendPositions(buckets []*aggregatorBucket,

resPos := len(res)
if !item.Tail.Empty() { // only tail
res = appendKeys(res, &item.Key, metricCache, usedTimestamps, v3Format, rnd, "")
res = appendKeys(res, &item.Key, metricCache, usedTimestamps, v3Format, data_model.TagUnion{})

res = multiValueMarshal(rnd, item.Key.Metric, metricCache, res, &item.Tail, "", sf, v3Format)
res = multiValueMarshal(rnd, item.Key.Metric, metricCache, res, &item.Tail, data_model.TagUnion{}, sf, v3Format)

if item.Key.Metric < 0 {
is.builtin += len(res) - resPos
Expand All @@ -469,8 +470,8 @@ func (a *Aggregator) RowDataMarshalAppendPositions(buckets []*aggregatorBucket,
continue
}
// We have no badges for string tops
res = appendKeys(res, &item.Key, metricCache, usedTimestamps, v3Format, rnd, key.S) // TODO - insert I
res = multiValueMarshal(rnd, item.Key.Metric, metricCache, res, value, key.S, sf, v3Format) // TODO - insert I
res = appendKeys(res, &item.Key, metricCache, usedTimestamps, v3Format, key) // TODO - insert I
res = multiValueMarshal(rnd, item.Key.Metric, metricCache, res, value, key, sf, v3Format) // TODO - insert I
}
if item.Key.Metric < 0 {
is.builtin += len(res) - resPos
Expand Down Expand Up @@ -591,7 +592,7 @@ func (a *Aggregator) RowDataMarshalAppendPositions(buckets []*aggregatorBucket,
res = appendSimpleValueStat(rnd, res, a.aggKey(recentTime, format.BuiltinMetricIDAggSamplingEngineTime, [16]int32{0, 5, 0, 0, historicTag}),
float64(sampler.TimeMetricMeta()), 1, a.aggregatorHost, metricCache, usedTimestamps, v3Format)
res = appendValueStat(rnd, res, a.aggKey(recentTime, format.BuiltinMetricIDAggSamplingEngineKeys, [16]int32{0, 0, 0, 0, historicTag}),
"", data_model.SimpleItemCounter(float64(sampler.ItemCount()), a.aggregatorHost), metricCache, usedTimestamps, v3Format)
data_model.SimpleItemCounter(float64(sampler.ItemCount()), a.aggregatorHost), metricCache, usedTimestamps, v3Format)

// report budget used
budgetKey := a.aggKey(recentTime, format.BuiltinMetricIDAggSamplingBudget, [16]int32{0, historicTag})
Expand Down

0 comments on commit b11916d

Please sign in to comment.