Skip to content

Commit

Permalink
string tops go to tag 47 on new conveyor, they are also mapped now
Browse files Browse the repository at this point in the history
  • Loading branch information
hrissan committed Jan 22, 2025
1 parent a360c99 commit 5a0a9ff
Show file tree
Hide file tree
Showing 14 changed files with 119 additions and 84 deletions.
24 changes: 12 additions & 12 deletions internal/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,7 @@ func (s *Agent) ApplyMetric(m tlstatshouse.MetricBytes, h data_model.MappedMetri
if len(m.Unique) == 1 { // very common case, optimize
uniqueShard := int(m.Unique[0] % int64(notSkippedShards)) // TODO - optimize %
shard2 := s.Shards[skipShards+uniqueShard]
shard2.ApplyUnique(&h.Key, keyHash, h.SValue, m.Unique, m.Counter, h.HostTag, h.MetricMeta)
shard2.ApplyUnique(&h.Key, keyHash, h.TopValue, m.Unique, m.Counter, h.HostTag.I, h.MetricMeta) // TODO - h.HostTag.S
return
}
uniqueValuesCache := shard.getUniqueValuesCache(notSkippedShards) // TOO - better reuse without lock?
Expand All @@ -642,18 +642,18 @@ func (s *Agent) ApplyMetric(m tlstatshouse.MetricBytes, h data_model.MappedMetri
continue
}
shard2 := s.Shards[skipShards+uniqueShard]
shard2.ApplyUnique(&h.Key, keyHash, h.SValue, vv, m.Counter*float64(len(vv))/float64(len(m.Unique)), h.HostTag, h.MetricMeta)
shard2.ApplyUnique(&h.Key, keyHash, h.TopValue, vv, m.Counter*float64(len(vv))/float64(len(m.Unique)), h.HostTag.I, h.MetricMeta) // TODO - h.HostTag.S
}
return
}
shard.ApplyUnique(&h.Key, keyHash, h.SValue, m.Unique, m.Counter, h.HostTag, h.MetricMeta)
shard.ApplyUnique(&h.Key, keyHash, h.TopValue, m.Unique, m.Counter, h.HostTag.I, h.MetricMeta) // TODO - h.HostTag.S
return
}
if len(m.Histogram) != 0 || len(m.Value) != 0 {
shard.ApplyValues(&h.Key, keyHash, h.SValue, m.Histogram, m.Value, m.Counter, h.HostTag, h.MetricMeta)
shard.ApplyValues(&h.Key, keyHash, h.TopValue, m.Histogram, m.Value, m.Counter, h.HostTag.I, h.MetricMeta) // TODO - h.HostTag.S
return
}
shard.ApplyCounter(&h.Key, keyHash, h.SValue, m.Counter, h.HostTag, h.MetricMeta)
shard.ApplyCounter(&h.Key, keyHash, h.TopValue, m.Counter, h.HostTag.I, h.MetricMeta) // TODO - h.HostTag.S
}

// count should be > 0 and not NaN
Expand Down Expand Up @@ -686,7 +686,7 @@ func (s *Agent) AddCounterHostStringBytes(key *data_model.Key, str []byte, count
return
}
shard := s.Shards[shardId]
shard.AddCounterHostStringBytes(key, keyHash, str, count, hostTagId, metricInfo)
shard.AddCounterHostStringBytes(key, keyHash, data_model.TagUnionBytes{S: str, I: 0}, count, hostTagId, metricInfo)
}

func (s *Agent) AddValueCounterHost(key *data_model.Key, value float64, counter float64, hostTagId int32, metricInfo *format.MetricMetaValue) {
Expand All @@ -707,7 +707,7 @@ func (s *Agent) AddValueCounter(key *data_model.Key, value float64, counter floa
s.AddValueCounterHost(key, value, counter, 0, metricInfo)
}

func (s *Agent) AddValueArrayCounterHostStringBytes(key *data_model.Key, values []float64, mult float64, hostTagId int32, str []byte, metricInfo *format.MetricMetaValue) {
func (s *Agent) AddValueArrayCounterHostStringBytes(key *data_model.Key, values []float64, mult float64, hostTagId int32, topValue data_model.TagUnionBytes, metricInfo *format.MetricMetaValue) {
if len(values) == 0 || mult < 0 {
return
}
Expand All @@ -717,10 +717,10 @@ func (s *Agent) AddValueArrayCounterHostStringBytes(key *data_model.Key, values
return
}
shard := s.Shards[shardId]
shard.AddValueArrayCounterHostStringBytes(key, keyHash, values, mult, hostTagId, str, metricInfo)
shard.AddValueArrayCounterHostStringBytes(key, keyHash, values, mult, hostTagId, topValue, metricInfo)
}

func (s *Agent) AddValueCounterHostStringBytes(key *data_model.Key, value float64, counter float64, hostTagId int32, str []byte, metricInfo *format.MetricMetaValue) {
func (s *Agent) AddValueCounterHostString(key *data_model.Key, value float64, counter float64, hostTagId int32, topValue data_model.TagUnion, metricInfo *format.MetricMetaValue) {
if counter <= 0 {
return
}
Expand All @@ -730,7 +730,7 @@ func (s *Agent) AddValueCounterHostStringBytes(key *data_model.Key, value float6
return
}
shard := s.Shards[shardId]
shard.AddValueCounterHostStringBytes(key, keyHash, value, counter, hostTagId, str, metricInfo)
shard.AddValueCounterHostString(key, keyHash, value, counter, hostTagId, topValue, metricInfo)
}

func (s *Agent) MergeItemValue(key *data_model.Key, item *data_model.ItemValue, metricInfo *format.MetricMetaValue) {
Expand All @@ -746,7 +746,7 @@ func (s *Agent) MergeItemValue(key *data_model.Key, item *data_model.ItemValue,
shard.MergeItemValue(key, keyHash, item, metricInfo)
}

func (s *Agent) AddUniqueHostStringBytes(key *data_model.Key, hostTagId int32, str []byte, hashes []int64, count float64, metricInfo *format.MetricMetaValue) {
func (s *Agent) AddUniqueHostStringBytes(key *data_model.Key, hostTagId int32, topValue data_model.TagUnionBytes, hashes []int64, count float64, metricInfo *format.MetricMetaValue) {
if len(hashes) == 0 || count < 0 {
return
}
Expand All @@ -756,7 +756,7 @@ func (s *Agent) AddUniqueHostStringBytes(key *data_model.Key, hostTagId int32, s
return
}
shard := s.Shards[shardId]
shard.ApplyUnique(key, keyHash, str, hashes, count, hostTagId, metricInfo)
shard.ApplyUnique(key, keyHash, topValue, hashes, count, hostTagId, metricInfo)
}

func (s *Agent) AggKey(time uint32, metricID int32, keys [format.MaxTags]int32) *data_model.Key {
Expand Down
34 changes: 22 additions & 12 deletions internal/agent/agent_mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,33 +36,41 @@ func (s *Agent) mapAllTags(h *data_model.MappedMetricHeader, metric *tlstatshous
if tagIDKey == 0 { // that tag is not in metric meta
continue
}
var tagValue data_model.TagUnionBytes
switch {
case tagMeta.SkipMapping:
h.SetSTag(tagMeta.Index, string(v.Value), tagIDKey)
case tagMeta.Index == format.StringTopTagIndex:
h.SValue = v.Value
if h.IsSKeySet {
h.TagSetTwiceKey = tagIDKey
}
h.IsSKeySet = true
case len(v.Value) == 0: // this case is also valid for raw values
h.SetTag(tagMeta.Index, 0, tagIDKey) // we interpret "1" => "vasya", "1" => "petya" as second one overriding the first, but generating a warning
case tagMeta.Raw:
id, ok := format.ContainsRawTagValue(mem.B(v.Value))
if !ok {
h.InvalidRawValue = v.Value
h.InvalidRawTagKey = tagIDKey
continue
}
h.SetTag(tagMeta.Index, id, tagIDKey)
tagValue.I = id
default:
id, found := s.mappingsCache.GetValueBytes(uint32(h.ReceiveTime.Unix()), v.Value)
if found {
h.SetTag(tagMeta.Index, id, tagIDKey)
tagValue.I = id
} else {
h.SetSTag(tagMeta.Index, string(v.Value), tagIDKey)
tagValue.S = v.Value
}
}
if tagMeta.Index == format.StringTopTagIndex || tagMeta.Index == format.StringTopTagIndexV3 {
// "_s" is alternative/legacy name for "47". We always have "top" function set for this tag.
// TODO - after old conveyor removed, we can simplify this code by setting tagMeta.Index to 47 for "_s"
// also we will remove IsSKeySet and use IsTagSet[47] automatically instead
h.TopValue = tagValue
if h.IsSKeySet {
h.TagSetTwiceKey = tagIDKey
}
h.IsSKeySet = true
continue
}
if tagValue.I != 0 {
h.SetTag(tagMeta.Index, tagValue.I, tagIDKey)
} else {
h.SetSTag(tagMeta.Index, tagValue.S, tagIDKey) // TODO - remove allocation here
}
}
}

Expand All @@ -75,6 +83,8 @@ func (s *Agent) mapEnvironmentTag(h *data_model.MappedMetricHeader, v *tl.Dictio
id, found := s.mappingsCache.GetValueBytes(uint32(h.ReceiveTime.Unix()), v.Value)
if found {
h.Key.Tags[0] = id
} else {
h.Key.STags[0] = string(v.Value) // TODO - remove allocation here
}
}

Expand Down
32 changes: 16 additions & 16 deletions internal/agent/agent_shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func (s *Shard) CreateBuiltInItemValue(key *data_model.Key) *BuiltInItemValue {
return result
}

func (s *Shard) ApplyUnique(key *data_model.Key, keyHash uint64, str []byte, hashes []int64, count float64, hostTag int32, metricInfo *format.MetricMetaValue) {
func (s *Shard) ApplyUnique(key *data_model.Key, keyHash uint64, topValue data_model.TagUnionBytes, hashes []int64, count float64, hostTag int32, metricInfo *format.MetricMetaValue) {
if count == 0 {
count = float64(len(hashes))
}
Expand All @@ -208,11 +208,11 @@ func (s *Shard) ApplyUnique(key *data_model.Key, keyHash uint64, str []byte, has
}
resolutionShard := s.resolutionShardFromHashLocked(key, keyHash, metricInfo)
item, _ := resolutionShard.GetOrCreateMultiItem(key, s.config.StringTopCapacity, metricInfo, nil)
mv := item.MapStringTopBytes(s.rng, str, count)
mv := item.MapStringTopBytes(s.rng, topValue, count)
mv.ApplyUnique(s.rng, hashes, count, hostTag)
}

func (s *Shard) ApplyValues(key *data_model.Key, keyHash uint64, str []byte, histogram [][2]float64, values []float64, count float64, hostTag int32, metricInfo *format.MetricMetaValue) {
func (s *Shard) ApplyValues(key *data_model.Key, keyHash uint64, topValue data_model.TagUnionBytes, histogram [][2]float64, values []float64, count float64, hostTag int32, metricInfo *format.MetricMetaValue) {
totalCount := float64(len(values))
for _, kv := range histogram {
totalCount += kv[1] // all counts are validated to be >= 0
Expand All @@ -230,11 +230,11 @@ func (s *Shard) ApplyValues(key *data_model.Key, keyHash uint64, str []byte, his
}
resolutionShard := s.resolutionShardFromHashLocked(key, keyHash, metricInfo)
item, _ := resolutionShard.GetOrCreateMultiItem(key, s.config.StringTopCapacity, metricInfo, nil)
mv := item.MapStringTopBytes(s.rng, str, count)
mv := item.MapStringTopBytes(s.rng, topValue, count)
mv.ApplyValues(s.rng, histogram, values, count, totalCount, hostTag, data_model.AgentPercentileCompression, metricInfo != nil && metricInfo.HasPercentiles)
}

func (s *Shard) ApplyCounter(key *data_model.Key, keyHash uint64, str []byte, count float64, hostTag int32, metricInfo *format.MetricMetaValue) {
func (s *Shard) ApplyCounter(key *data_model.Key, keyHash uint64, topValue data_model.TagUnionBytes, count float64, hostTag int32, metricInfo *format.MetricMetaValue) {
if count <= 0 {
return
}
Expand All @@ -245,7 +245,7 @@ func (s *Shard) ApplyCounter(key *data_model.Key, keyHash uint64, str []byte, co
}
resolutionShard := s.resolutionShardFromHashLocked(key, keyHash, metricInfo)
item, _ := resolutionShard.GetOrCreateMultiItem(key, s.config.StringTopCapacity, metricInfo, nil)
item.MapStringTopBytes(s.rng, str, count).AddCounterHost(s.rng, count, hostTag)
item.MapStringTopBytes(s.rng, topValue, count).AddCounterHost(s.rng, count, hostTag)
}

func (s *Shard) AddCounterHost(key *data_model.Key, keyHash uint64, count float64, hostTagId int32, metricInfo *format.MetricMetaValue) {
Expand All @@ -259,26 +259,26 @@ func (s *Shard) AddCounterHost(key *data_model.Key, keyHash uint64, count float6
item.Tail.AddCounterHost(s.rng, count, hostTagId)
}

func (s *Shard) AddCounterHostStringBytes(key *data_model.Key, keyHash uint64, str []byte, count float64, hostTag int32, metricInfo *format.MetricMetaValue) {
func (s *Shard) AddCounterHostStringBytes(key *data_model.Key, keyHash uint64, topValue data_model.TagUnionBytes, count float64, hostTag int32, metricInfo *format.MetricMetaValue) {
s.mu.Lock()
defer s.mu.Unlock()
if s.shouldDiscardIncomingData() {
return
}
resolutionShard := s.resolutionShardFromHashLocked(key, keyHash, metricInfo)
item, _ := resolutionShard.GetOrCreateMultiItem(key, s.config.StringTopCapacity, metricInfo, nil)
item.MapStringTopBytes(s.rng, str, count).AddCounterHost(s.rng, count, hostTag)
item.MapStringTopBytes(s.rng, topValue, count).AddCounterHost(s.rng, count, hostTag)
}

func (s *Shard) AddValueCounterHostStringBytes(key *data_model.Key, keyHash uint64, value float64, count float64, hostTag int32, str []byte, metricInfo *format.MetricMetaValue) {
func (s *Shard) AddValueCounterHostString(key *data_model.Key, keyHash uint64, value float64, count float64, hostTag int32, topValue data_model.TagUnion, metricInfo *format.MetricMetaValue) {
s.mu.Lock()
defer s.mu.Unlock()
if s.shouldDiscardIncomingData() {
return
}
resolutionShard := s.resolutionShardFromHashLocked(key, keyHash, metricInfo)
item, _ := resolutionShard.GetOrCreateMultiItem(key, s.config.StringTopCapacity, metricInfo, nil)
item.MapStringTopBytes(s.rng, str, count).AddValueCounterHost(s.rng, value, count, hostTag)
item.MapStringTop(s.rng, topValue, count).AddValueCounterHost(s.rng, value, count, hostTag)
}

func (s *Shard) AddValueCounterHost(key *data_model.Key, keyHash uint64, value float64, counter float64, hostTag int32, metricInfo *format.MetricMetaValue) {
Expand Down Expand Up @@ -311,7 +311,7 @@ func (s *Shard) AddValueArrayCounterHost(key *data_model.Key, keyHash uint64, va
}
}

func (s *Shard) AddValueArrayCounterHostStringBytes(key *data_model.Key, keyHash uint64, values []float64, mult float64, hostTag int32, str []byte, metricInfo *format.MetricMetaValue) {
func (s *Shard) AddValueArrayCounterHostStringBytes(key *data_model.Key, keyHash uint64, values []float64, mult float64, hostTag int32, topValue data_model.TagUnionBytes, metricInfo *format.MetricMetaValue) {
s.mu.Lock()
defer s.mu.Unlock()
if s.shouldDiscardIncomingData() {
Expand All @@ -321,9 +321,9 @@ func (s *Shard) AddValueArrayCounterHostStringBytes(key *data_model.Key, keyHash
item, _ := resolutionShard.GetOrCreateMultiItem(key, s.config.StringTopCapacity, metricInfo, nil)
count := float64(len(values)) * mult
if metricInfo != nil && metricInfo.HasPercentiles {
item.MapStringTopBytes(s.rng, str, count).AddValueArrayHostPercentile(s.rng, values, mult, hostTag, data_model.AgentPercentileCompression)
item.MapStringTopBytes(s.rng, topValue, count).AddValueArrayHostPercentile(s.rng, values, mult, hostTag, data_model.AgentPercentileCompression)
} else {
item.MapStringTopBytes(s.rng, str, count).Value.AddValueArrayHost(s.rng, values, mult, hostTag)
item.MapStringTopBytes(s.rng, topValue, count).Value.AddValueArrayHost(s.rng, values, mult, hostTag)
}
}

Expand Down Expand Up @@ -405,7 +405,7 @@ func (s *Shard) addBuiltInsLocked() {
writeJournalVersion := func(version int64, hash string, hashTag int32, count float64) {
key := s.agent.AggKey(s.CurrentTime, format.BuiltinMetricIDJournalVersions, [format.MaxTags]int32{0, s.agent.componentTag, 0, 0, 0, int32(version), hashTag})
item, _ := resolutionShard.GetOrCreateMultiItem(key, s.config.StringTopCapacity, nil, nil)
item.MapStringTop(s.rng, hash, count).AddCounterHost(s.rng, count, 0)
item.MapStringTop(s.rng, data_model.TagUnion{S: hash, I: 0}, count).AddCounterHost(s.rng, count, 0)
}
if s.agent.metricStorage != nil { // nil only on ingress proxy for now
metricJournalVersion := s.agent.metricStorage.Version()
Expand Down Expand Up @@ -473,10 +473,10 @@ func (s *Shard) addBuiltInsHeartbeatsLocked(resolutionShard *data_model.MetricsB

key := s.agent.AggKey(nowUnix, format.BuiltinMetricIDHeartbeatVersion, [format.MaxTags]int32{0, s.agent.componentTag, s.agent.heartBeatEventType})
item, _ := resolutionShard.GetOrCreateMultiItem(key, s.config.StringTopCapacity, nil, nil)
item.MapStringTop(s.rng, build.Commit(), count).AddValueCounterHost(s.rng, uptimeSec, count, 0)
item.MapStringTop(s.rng, data_model.TagUnion{S: build.Commit(), I: 0}, count).AddValueCounterHost(s.rng, uptimeSec, count, 0)

// we send format.BuiltinMetricIDHeartbeatArgs only. Args1, Args2, Args3 are deprecated
key = s.agent.AggKey(nowUnix, format.BuiltinMetricIDHeartbeatArgs, [format.MaxTags]int32{0, s.agent.componentTag, s.agent.heartBeatEventType, s.agent.argsHash, 0, 0, 0, 0, 0, s.agent.argsLen})
item, _ = resolutionShard.GetOrCreateMultiItem(key, s.config.StringTopCapacity, nil, nil)
item.MapStringTop(s.rng, s.agent.args, count).AddValueCounterHost(s.rng, uptimeSec, count, 0)
item.MapStringTop(s.rng, data_model.TagUnion{S: s.agent.args, I: 0}, count).AddValueCounterHost(s.rng, uptimeSec, count, 0)
}
8 changes: 4 additions & 4 deletions internal/agent/agent_shard_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ func bucketToSourceBucket2TL(bucket *data_model.MetricsBucket, perm []int, sampl
sizeBuf = item.Write(sizeBuf[:0])

var top []tlstatshouse.TopElement
for skey, value := range v.Top {
el := tlstatshouse.TopElement{Key: skey}
for key, value := range v.Top {
el := tlstatshouse.TopElement{Key: key.S} // TODO - send I
value.MultiValueToTL(&el.Value, v.SF, &el.FieldsMask, &marshalBuf)
top = append(top, el)
sizeBuf = el.Write(sizeBuf[:0])
Expand Down Expand Up @@ -221,8 +221,8 @@ func bucketToSourceBucket3TL(bucket *data_model.MetricsBucket, perm []int, sampl
sizeBuf = item.Write(sizeBuf[:0])

var top []tlstatshouse.TopElement
for skey, value := range v.Top {
el := tlstatshouse.TopElement{Key: skey}
for key, value := range v.Top {
el := tlstatshouse.TopElement{Key: key.S} // TODO - send I
value.MultiValueToTL(&el.Value, v.SF, &el.FieldsMask, &marshalBuf)
top = append(top, el)
sizeBuf = el.Write(sizeBuf[:0])
Expand Down
2 changes: 1 addition & 1 deletion internal/aggregator/aggregator_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ func (a *Aggregator) handleSendSourceBucketAny(hctx *rpc.HandlerContext, args tl
}
s := aggBucket.lockShard(&lockedShard, sID, &measurementLocks)
mi, created := s.GetOrCreateMultiItem(&k, data_model.AggregatorStringTopCapacity, nil, keyBytes)
mi.MergeWithTLMultiItem(rng, &item, hostTagId)
mi.MergeWithTLMultiItem(rng, &item, hostTagId) // TODO - try to map strings of top elements, as we do with SKeys above
if created {
if !args.IsSetSpare() { // Data from spares should not affect cardinality estimations
newKeys = append(newKeys, k)
Expand Down
6 changes: 3 additions & 3 deletions internal/aggregator/aggregator_insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,13 +464,13 @@ func (a *Aggregator) RowDataMarshalAppendPositions(buckets []*aggregatorBucket,
}
}
resPos = len(res)
for skey, value := range item.Top {
for key, value := range item.Top {
if value.Empty() { // must be never, but check is cheap
continue
}
// We have no badges for string tops
res = appendKeys(res, &item.Key, metricCache, usedTimestamps, v3Format, rnd, skey)
res = multiValueMarshal(rnd, item.Key.Metric, metricCache, res, value, skey, sf, v3Format)
res = appendKeys(res, &item.Key, metricCache, usedTimestamps, v3Format, rnd, key.S) // TODO - insert I
res = multiValueMarshal(rnd, item.Key.Metric, metricCache, res, value, key.S, sf, v3Format) // TODO - insert I
}
if item.Key.Metric < 0 {
is.builtin += len(res) - resPos
Expand Down
10 changes: 5 additions & 5 deletions internal/aggregator/simulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,8 @@ func generateStats(simID int, journal *metajournal.MetricsStorage, s *agent.Agen
value := rng.Float64() * 100
s.AddValueCounter(&kValue2, value, 1, metricInfo2)
s.AddValueCounter(&kValue3, value, 1, metricInfo3)
s.AddValueArrayCounterHostStringBytes(&kValue10, []float64{value}, 1, 0, sKey, metricInfo10)
s.AddValueArrayCounterHostStringBytes(&kValue11, []float64{value}, 1, 0, sKey, metricInfo11)
s.AddValueArrayCounterHostStringBytes(&kValue10, []float64{value}, 1, 0, data_model.TagUnionBytes{S: sKey, I: 0}, metricInfo10)
s.AddValueArrayCounterHostStringBytes(&kValue11, []float64{value}, 1, 0, data_model.TagUnionBytes{S: sKey, I: 0}, metricInfo11)
}
}
total = rng.Intn(totalRange)
Expand All @@ -261,9 +261,9 @@ func generateStats(simID int, journal *metajournal.MetricsStorage, s *agent.Agen
for ci := 0; ci < cc; ci++ {
const D = 1000000
r := D + int64(rng.Intn(D))*int64(rng.Intn(D))
s.AddUniqueHostStringBytes(&kUnique4, 0, nil, []int64{r}, 1, metricInfo4)
s.AddUniqueHostStringBytes(&kUnique7, 0, nil, []int64{r}, 1, metricInfo7)
s.AddUniqueHostStringBytes(&kUnique8, 0, sKey, []int64{r}, 1, metricInfo8)
s.AddUniqueHostStringBytes(&kUnique4, 0, data_model.TagUnionBytes{}, []int64{r}, 1, metricInfo4)
s.AddUniqueHostStringBytes(&kUnique7, 0, data_model.TagUnionBytes{}, []int64{r}, 1, metricInfo7)
s.AddUniqueHostStringBytes(&kUnique8, 0, data_model.TagUnionBytes{S: sKey, I: 0}, []int64{r}, 1, metricInfo8)
}
}

Expand Down
2 changes: 1 addition & 1 deletion internal/aggregator/tags_mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func NewTagsMapper(agg *Aggregator, sh2 *agent.Agent, metricStorage *metajournal
case format.TagValueIDAggMappingCreatedStatusCreated:
agg.appendInternalLog("map_tag", strconv.Itoa(int(extra.AgentEnv)), "created", askedKey, extra.Metric, strconv.Itoa(int(metricID)), strconv.Itoa(int(extra.TagIDKey)), strconv.Itoa(int(keyValue)))
// if askedKey is created, it is valid and safe to write
ms.sh2.AddValueCounterHostStringBytes(key, float64(keyValue), 1, extra.Host, []byte(askedKey), meta)
ms.sh2.AddValueCounterHostString(key, float64(keyValue), 1, extra.Host, data_model.TagUnion{S: askedKey, I: 0}, meta)
}
}
return pcache.Int32ToValue(keyValue), d, err
Expand Down
Loading

0 comments on commit 5a0a9ff

Please sign in to comment.