From 5a0a9ff4d4862a6e795bac57844ed6bbae499abf Mon Sep 17 00:00:00 2001 From: Grigory Buteyko Date: Wed, 22 Jan 2025 21:48:44 +0300 Subject: [PATCH] string tops go to tag 47 on new conveyor, they are also mapped now --- internal/agent/agent.go | 24 +++++++------- internal/agent/agent_mapping.go | 34 +++++++++++++------- internal/agent/agent_shard.go | 32 +++++++++---------- internal/agent/agent_shard_send.go | 8 ++--- internal/aggregator/aggregator_handlers.go | 2 +- internal/aggregator/aggregator_insert.go | 6 ++-- internal/aggregator/simulator.go | 10 +++--- internal/aggregator/tags_mapper.go | 2 +- internal/aggregator/tags_mapper2.go | 3 +- internal/data_model/bucket.go | 35 +++++++++++++-------- internal/data_model/mapped_metric_header.go | 24 +++++++++----- internal/data_model/transfer.go | 18 ++++++++--- internal/format/format.go | 1 - internal/mapping/pipeline_v2.go | 4 +-- 14 files changed, 119 insertions(+), 84 deletions(-) diff --git a/internal/agent/agent.go b/internal/agent/agent.go index c0dd23961..6cfe21829 100644 --- a/internal/agent/agent.go +++ b/internal/agent/agent.go @@ -628,7 +628,7 @@ func (s *Agent) ApplyMetric(m tlstatshouse.MetricBytes, h data_model.MappedMetri if len(m.Unique) == 1 { // very common case, optimize uniqueShard := int(m.Unique[0] % int64(notSkippedShards)) // TODO - optimize % shard2 := s.Shards[skipShards+uniqueShard] - shard2.ApplyUnique(&h.Key, keyHash, h.SValue, m.Unique, m.Counter, h.HostTag, h.MetricMeta) + shard2.ApplyUnique(&h.Key, keyHash, h.TopValue, m.Unique, m.Counter, h.HostTag.I, h.MetricMeta) // TODO - h.HostTag.S return } uniqueValuesCache := shard.getUniqueValuesCache(notSkippedShards) // TOO - better reuse without lock? @@ -642,18 +642,18 @@ func (s *Agent) ApplyMetric(m tlstatshouse.MetricBytes, h data_model.MappedMetri continue } shard2 := s.Shards[skipShards+uniqueShard] - shard2.ApplyUnique(&h.Key, keyHash, h.SValue, vv, m.Counter*float64(len(vv))/float64(len(m.Unique)), h.HostTag, h.MetricMeta) + shard2.ApplyUnique(&h.Key, keyHash, h.TopValue, vv, m.Counter*float64(len(vv))/float64(len(m.Unique)), h.HostTag.I, h.MetricMeta) // TODO - h.HostTag.S } return } - shard.ApplyUnique(&h.Key, keyHash, h.SValue, m.Unique, m.Counter, h.HostTag, h.MetricMeta) + shard.ApplyUnique(&h.Key, keyHash, h.TopValue, m.Unique, m.Counter, h.HostTag.I, h.MetricMeta) // TODO - h.HostTag.S return } if len(m.Histogram) != 0 || len(m.Value) != 0 { - shard.ApplyValues(&h.Key, keyHash, h.SValue, m.Histogram, m.Value, m.Counter, h.HostTag, h.MetricMeta) + shard.ApplyValues(&h.Key, keyHash, h.TopValue, m.Histogram, m.Value, m.Counter, h.HostTag.I, h.MetricMeta) // TODO - h.HostTag.S return } - shard.ApplyCounter(&h.Key, keyHash, h.SValue, m.Counter, h.HostTag, h.MetricMeta) + shard.ApplyCounter(&h.Key, keyHash, h.TopValue, m.Counter, h.HostTag.I, h.MetricMeta) // TODO - h.HostTag.S } // count should be > 0 and not NaN @@ -686,7 +686,7 @@ func (s *Agent) AddCounterHostStringBytes(key *data_model.Key, str []byte, count return } shard := s.Shards[shardId] - shard.AddCounterHostStringBytes(key, keyHash, str, count, hostTagId, metricInfo) + shard.AddCounterHostStringBytes(key, keyHash, data_model.TagUnionBytes{S: str, I: 0}, count, hostTagId, metricInfo) } func (s *Agent) AddValueCounterHost(key *data_model.Key, value float64, counter float64, hostTagId int32, metricInfo *format.MetricMetaValue) { @@ -707,7 +707,7 @@ func (s *Agent) AddValueCounter(key *data_model.Key, value float64, counter floa s.AddValueCounterHost(key, value, counter, 0, metricInfo) } -func (s *Agent) AddValueArrayCounterHostStringBytes(key *data_model.Key, values []float64, mult float64, hostTagId int32, str []byte, metricInfo *format.MetricMetaValue) { +func (s *Agent) AddValueArrayCounterHostStringBytes(key *data_model.Key, values []float64, mult float64, hostTagId int32, topValue data_model.TagUnionBytes, metricInfo *format.MetricMetaValue) { if len(values) == 0 || mult < 0 { return } @@ -717,10 +717,10 @@ func (s *Agent) AddValueArrayCounterHostStringBytes(key *data_model.Key, values return } shard := s.Shards[shardId] - shard.AddValueArrayCounterHostStringBytes(key, keyHash, values, mult, hostTagId, str, metricInfo) + shard.AddValueArrayCounterHostStringBytes(key, keyHash, values, mult, hostTagId, topValue, metricInfo) } -func (s *Agent) AddValueCounterHostStringBytes(key *data_model.Key, value float64, counter float64, hostTagId int32, str []byte, metricInfo *format.MetricMetaValue) { +func (s *Agent) AddValueCounterHostString(key *data_model.Key, value float64, counter float64, hostTagId int32, topValue data_model.TagUnion, metricInfo *format.MetricMetaValue) { if counter <= 0 { return } @@ -730,7 +730,7 @@ func (s *Agent) AddValueCounterHostStringBytes(key *data_model.Key, value float6 return } shard := s.Shards[shardId] - shard.AddValueCounterHostStringBytes(key, keyHash, value, counter, hostTagId, str, metricInfo) + shard.AddValueCounterHostString(key, keyHash, value, counter, hostTagId, topValue, metricInfo) } func (s *Agent) MergeItemValue(key *data_model.Key, item *data_model.ItemValue, metricInfo *format.MetricMetaValue) { @@ -746,7 +746,7 @@ func (s *Agent) MergeItemValue(key *data_model.Key, item *data_model.ItemValue, shard.MergeItemValue(key, keyHash, item, metricInfo) } -func (s *Agent) AddUniqueHostStringBytes(key *data_model.Key, hostTagId int32, str []byte, hashes []int64, count float64, metricInfo *format.MetricMetaValue) { +func (s *Agent) AddUniqueHostStringBytes(key *data_model.Key, hostTagId int32, topValue data_model.TagUnionBytes, hashes []int64, count float64, metricInfo *format.MetricMetaValue) { if len(hashes) == 0 || count < 0 { return } @@ -756,7 +756,7 @@ func (s *Agent) AddUniqueHostStringBytes(key *data_model.Key, hostTagId int32, s return } shard := s.Shards[shardId] - shard.ApplyUnique(key, keyHash, str, hashes, count, hostTagId, metricInfo) + shard.ApplyUnique(key, keyHash, topValue, hashes, count, hostTagId, metricInfo) } func (s *Agent) AggKey(time uint32, metricID int32, keys [format.MaxTags]int32) *data_model.Key { diff --git a/internal/agent/agent_mapping.go b/internal/agent/agent_mapping.go index 8f1ba7d2a..1d56b23b8 100644 --- a/internal/agent/agent_mapping.go +++ b/internal/agent/agent_mapping.go @@ -36,17 +36,9 @@ func (s *Agent) mapAllTags(h *data_model.MappedMetricHeader, metric *tlstatshous if tagIDKey == 0 { // that tag is not in metric meta continue } + var tagValue data_model.TagUnionBytes switch { - case tagMeta.SkipMapping: - h.SetSTag(tagMeta.Index, string(v.Value), tagIDKey) - case tagMeta.Index == format.StringTopTagIndex: - h.SValue = v.Value - if h.IsSKeySet { - h.TagSetTwiceKey = tagIDKey - } - h.IsSKeySet = true case len(v.Value) == 0: // this case is also valid for raw values - h.SetTag(tagMeta.Index, 0, tagIDKey) // we interpret "1" => "vasya", "1" => "petya" as second one overriding the first, but generating a warning case tagMeta.Raw: id, ok := format.ContainsRawTagValue(mem.B(v.Value)) if !ok { @@ -54,15 +46,31 @@ func (s *Agent) mapAllTags(h *data_model.MappedMetricHeader, metric *tlstatshous h.InvalidRawTagKey = tagIDKey continue } - h.SetTag(tagMeta.Index, id, tagIDKey) + tagValue.I = id default: id, found := s.mappingsCache.GetValueBytes(uint32(h.ReceiveTime.Unix()), v.Value) if found { - h.SetTag(tagMeta.Index, id, tagIDKey) + tagValue.I = id } else { - h.SetSTag(tagMeta.Index, string(v.Value), tagIDKey) + tagValue.S = v.Value } } + if tagMeta.Index == format.StringTopTagIndex || tagMeta.Index == format.StringTopTagIndexV3 { + // "_s" is alternative/legacy name for "47". We always have "top" function set for this tag. + // TODO - after old conveyor removed, we can simplify this code by setting tagMeta.Index to 47 for "_s" + // also we will remove IsSKeySet and use IsTagSet[47] automatically instead + h.TopValue = tagValue + if h.IsSKeySet { + h.TagSetTwiceKey = tagIDKey + } + h.IsSKeySet = true + continue + } + if tagValue.I != 0 { + h.SetTag(tagMeta.Index, tagValue.I, tagIDKey) + } else { + h.SetSTag(tagMeta.Index, tagValue.S, tagIDKey) // TODO - remove allocation here + } } } @@ -75,6 +83,8 @@ func (s *Agent) mapEnvironmentTag(h *data_model.MappedMetricHeader, v *tl.Dictio id, found := s.mappingsCache.GetValueBytes(uint32(h.ReceiveTime.Unix()), v.Value) if found { h.Key.Tags[0] = id + } else { + h.Key.STags[0] = string(v.Value) // TODO - remove allocation here } } diff --git a/internal/agent/agent_shard.go b/internal/agent/agent_shard.go index 24993152f..fe9a4f19e 100644 --- a/internal/agent/agent_shard.go +++ b/internal/agent/agent_shard.go @@ -194,7 +194,7 @@ func (s *Shard) CreateBuiltInItemValue(key *data_model.Key) *BuiltInItemValue { return result } -func (s *Shard) ApplyUnique(key *data_model.Key, keyHash uint64, str []byte, hashes []int64, count float64, hostTag int32, metricInfo *format.MetricMetaValue) { +func (s *Shard) ApplyUnique(key *data_model.Key, keyHash uint64, topValue data_model.TagUnionBytes, hashes []int64, count float64, hostTag int32, metricInfo *format.MetricMetaValue) { if count == 0 { count = float64(len(hashes)) } @@ -208,11 +208,11 @@ func (s *Shard) ApplyUnique(key *data_model.Key, keyHash uint64, str []byte, has } resolutionShard := s.resolutionShardFromHashLocked(key, keyHash, metricInfo) item, _ := resolutionShard.GetOrCreateMultiItem(key, s.config.StringTopCapacity, metricInfo, nil) - mv := item.MapStringTopBytes(s.rng, str, count) + mv := item.MapStringTopBytes(s.rng, topValue, count) mv.ApplyUnique(s.rng, hashes, count, hostTag) } -func (s *Shard) ApplyValues(key *data_model.Key, keyHash uint64, str []byte, histogram [][2]float64, values []float64, count float64, hostTag int32, metricInfo *format.MetricMetaValue) { +func (s *Shard) ApplyValues(key *data_model.Key, keyHash uint64, topValue data_model.TagUnionBytes, histogram [][2]float64, values []float64, count float64, hostTag int32, metricInfo *format.MetricMetaValue) { totalCount := float64(len(values)) for _, kv := range histogram { totalCount += kv[1] // all counts are validated to be >= 0 @@ -230,11 +230,11 @@ func (s *Shard) ApplyValues(key *data_model.Key, keyHash uint64, str []byte, his } resolutionShard := s.resolutionShardFromHashLocked(key, keyHash, metricInfo) item, _ := resolutionShard.GetOrCreateMultiItem(key, s.config.StringTopCapacity, metricInfo, nil) - mv := item.MapStringTopBytes(s.rng, str, count) + mv := item.MapStringTopBytes(s.rng, topValue, count) mv.ApplyValues(s.rng, histogram, values, count, totalCount, hostTag, data_model.AgentPercentileCompression, metricInfo != nil && metricInfo.HasPercentiles) } -func (s *Shard) ApplyCounter(key *data_model.Key, keyHash uint64, str []byte, count float64, hostTag int32, metricInfo *format.MetricMetaValue) { +func (s *Shard) ApplyCounter(key *data_model.Key, keyHash uint64, topValue data_model.TagUnionBytes, count float64, hostTag int32, metricInfo *format.MetricMetaValue) { if count <= 0 { return } @@ -245,7 +245,7 @@ func (s *Shard) ApplyCounter(key *data_model.Key, keyHash uint64, str []byte, co } resolutionShard := s.resolutionShardFromHashLocked(key, keyHash, metricInfo) item, _ := resolutionShard.GetOrCreateMultiItem(key, s.config.StringTopCapacity, metricInfo, nil) - item.MapStringTopBytes(s.rng, str, count).AddCounterHost(s.rng, count, hostTag) + item.MapStringTopBytes(s.rng, topValue, count).AddCounterHost(s.rng, count, hostTag) } func (s *Shard) AddCounterHost(key *data_model.Key, keyHash uint64, count float64, hostTagId int32, metricInfo *format.MetricMetaValue) { @@ -259,7 +259,7 @@ func (s *Shard) AddCounterHost(key *data_model.Key, keyHash uint64, count float6 item.Tail.AddCounterHost(s.rng, count, hostTagId) } -func (s *Shard) AddCounterHostStringBytes(key *data_model.Key, keyHash uint64, str []byte, count float64, hostTag int32, metricInfo *format.MetricMetaValue) { +func (s *Shard) AddCounterHostStringBytes(key *data_model.Key, keyHash uint64, topValue data_model.TagUnionBytes, count float64, hostTag int32, metricInfo *format.MetricMetaValue) { s.mu.Lock() defer s.mu.Unlock() if s.shouldDiscardIncomingData() { @@ -267,10 +267,10 @@ func (s *Shard) AddCounterHostStringBytes(key *data_model.Key, keyHash uint64, s } resolutionShard := s.resolutionShardFromHashLocked(key, keyHash, metricInfo) item, _ := resolutionShard.GetOrCreateMultiItem(key, s.config.StringTopCapacity, metricInfo, nil) - item.MapStringTopBytes(s.rng, str, count).AddCounterHost(s.rng, count, hostTag) + item.MapStringTopBytes(s.rng, topValue, count).AddCounterHost(s.rng, count, hostTag) } -func (s *Shard) AddValueCounterHostStringBytes(key *data_model.Key, keyHash uint64, value float64, count float64, hostTag int32, str []byte, metricInfo *format.MetricMetaValue) { +func (s *Shard) AddValueCounterHostString(key *data_model.Key, keyHash uint64, value float64, count float64, hostTag int32, topValue data_model.TagUnion, metricInfo *format.MetricMetaValue) { s.mu.Lock() defer s.mu.Unlock() if s.shouldDiscardIncomingData() { @@ -278,7 +278,7 @@ func (s *Shard) AddValueCounterHostStringBytes(key *data_model.Key, keyHash uint } resolutionShard := s.resolutionShardFromHashLocked(key, keyHash, metricInfo) item, _ := resolutionShard.GetOrCreateMultiItem(key, s.config.StringTopCapacity, metricInfo, nil) - item.MapStringTopBytes(s.rng, str, count).AddValueCounterHost(s.rng, value, count, hostTag) + item.MapStringTop(s.rng, topValue, count).AddValueCounterHost(s.rng, value, count, hostTag) } func (s *Shard) AddValueCounterHost(key *data_model.Key, keyHash uint64, value float64, counter float64, hostTag int32, metricInfo *format.MetricMetaValue) { @@ -311,7 +311,7 @@ func (s *Shard) AddValueArrayCounterHost(key *data_model.Key, keyHash uint64, va } } -func (s *Shard) AddValueArrayCounterHostStringBytes(key *data_model.Key, keyHash uint64, values []float64, mult float64, hostTag int32, str []byte, metricInfo *format.MetricMetaValue) { +func (s *Shard) AddValueArrayCounterHostStringBytes(key *data_model.Key, keyHash uint64, values []float64, mult float64, hostTag int32, topValue data_model.TagUnionBytes, metricInfo *format.MetricMetaValue) { s.mu.Lock() defer s.mu.Unlock() if s.shouldDiscardIncomingData() { @@ -321,9 +321,9 @@ func (s *Shard) AddValueArrayCounterHostStringBytes(key *data_model.Key, keyHash item, _ := resolutionShard.GetOrCreateMultiItem(key, s.config.StringTopCapacity, metricInfo, nil) count := float64(len(values)) * mult if metricInfo != nil && metricInfo.HasPercentiles { - item.MapStringTopBytes(s.rng, str, count).AddValueArrayHostPercentile(s.rng, values, mult, hostTag, data_model.AgentPercentileCompression) + item.MapStringTopBytes(s.rng, topValue, count).AddValueArrayHostPercentile(s.rng, values, mult, hostTag, data_model.AgentPercentileCompression) } else { - item.MapStringTopBytes(s.rng, str, count).Value.AddValueArrayHost(s.rng, values, mult, hostTag) + item.MapStringTopBytes(s.rng, topValue, count).Value.AddValueArrayHost(s.rng, values, mult, hostTag) } } @@ -405,7 +405,7 @@ func (s *Shard) addBuiltInsLocked() { writeJournalVersion := func(version int64, hash string, hashTag int32, count float64) { key := s.agent.AggKey(s.CurrentTime, format.BuiltinMetricIDJournalVersions, [format.MaxTags]int32{0, s.agent.componentTag, 0, 0, 0, int32(version), hashTag}) item, _ := resolutionShard.GetOrCreateMultiItem(key, s.config.StringTopCapacity, nil, nil) - item.MapStringTop(s.rng, hash, count).AddCounterHost(s.rng, count, 0) + item.MapStringTop(s.rng, data_model.TagUnion{S: hash, I: 0}, count).AddCounterHost(s.rng, count, 0) } if s.agent.metricStorage != nil { // nil only on ingress proxy for now metricJournalVersion := s.agent.metricStorage.Version() @@ -473,10 +473,10 @@ func (s *Shard) addBuiltInsHeartbeatsLocked(resolutionShard *data_model.MetricsB key := s.agent.AggKey(nowUnix, format.BuiltinMetricIDHeartbeatVersion, [format.MaxTags]int32{0, s.agent.componentTag, s.agent.heartBeatEventType}) item, _ := resolutionShard.GetOrCreateMultiItem(key, s.config.StringTopCapacity, nil, nil) - item.MapStringTop(s.rng, build.Commit(), count).AddValueCounterHost(s.rng, uptimeSec, count, 0) + item.MapStringTop(s.rng, data_model.TagUnion{S: build.Commit(), I: 0}, count).AddValueCounterHost(s.rng, uptimeSec, count, 0) // we send format.BuiltinMetricIDHeartbeatArgs only. Args1, Args2, Args3 are deprecated key = s.agent.AggKey(nowUnix, format.BuiltinMetricIDHeartbeatArgs, [format.MaxTags]int32{0, s.agent.componentTag, s.agent.heartBeatEventType, s.agent.argsHash, 0, 0, 0, 0, 0, s.agent.argsLen}) item, _ = resolutionShard.GetOrCreateMultiItem(key, s.config.StringTopCapacity, nil, nil) - item.MapStringTop(s.rng, s.agent.args, count).AddValueCounterHost(s.rng, uptimeSec, count, 0) + item.MapStringTop(s.rng, data_model.TagUnion{S: s.agent.args, I: 0}, count).AddValueCounterHost(s.rng, uptimeSec, count, 0) } diff --git a/internal/agent/agent_shard_send.go b/internal/agent/agent_shard_send.go index 9c0ff66a8..f45a9d87c 100644 --- a/internal/agent/agent_shard_send.go +++ b/internal/agent/agent_shard_send.go @@ -143,8 +143,8 @@ func bucketToSourceBucket2TL(bucket *data_model.MetricsBucket, perm []int, sampl sizeBuf = item.Write(sizeBuf[:0]) var top []tlstatshouse.TopElement - for skey, value := range v.Top { - el := tlstatshouse.TopElement{Key: skey} + for key, value := range v.Top { + el := tlstatshouse.TopElement{Key: key.S} // TODO - send I value.MultiValueToTL(&el.Value, v.SF, &el.FieldsMask, &marshalBuf) top = append(top, el) sizeBuf = el.Write(sizeBuf[:0]) @@ -221,8 +221,8 @@ func bucketToSourceBucket3TL(bucket *data_model.MetricsBucket, perm []int, sampl sizeBuf = item.Write(sizeBuf[:0]) var top []tlstatshouse.TopElement - for skey, value := range v.Top { - el := tlstatshouse.TopElement{Key: skey} + for key, value := range v.Top { + el := tlstatshouse.TopElement{Key: key.S} // TODO - send I value.MultiValueToTL(&el.Value, v.SF, &el.FieldsMask, &marshalBuf) top = append(top, el) sizeBuf = el.Write(sizeBuf[:0]) diff --git a/internal/aggregator/aggregator_handlers.go b/internal/aggregator/aggregator_handlers.go index aa7c21462..9525d1e58 100644 --- a/internal/aggregator/aggregator_handlers.go +++ b/internal/aggregator/aggregator_handlers.go @@ -482,7 +482,7 @@ func (a *Aggregator) handleSendSourceBucketAny(hctx *rpc.HandlerContext, args tl } s := aggBucket.lockShard(&lockedShard, sID, &measurementLocks) mi, created := s.GetOrCreateMultiItem(&k, data_model.AggregatorStringTopCapacity, nil, keyBytes) - mi.MergeWithTLMultiItem(rng, &item, hostTagId) + mi.MergeWithTLMultiItem(rng, &item, hostTagId) // TODO - try to map strings of top elements, as we do with SKeys above if created { if !args.IsSetSpare() { // Data from spares should not affect cardinality estimations newKeys = append(newKeys, k) diff --git a/internal/aggregator/aggregator_insert.go b/internal/aggregator/aggregator_insert.go index cc90c9eeb..399b6a466 100644 --- a/internal/aggregator/aggregator_insert.go +++ b/internal/aggregator/aggregator_insert.go @@ -464,13 +464,13 @@ func (a *Aggregator) RowDataMarshalAppendPositions(buckets []*aggregatorBucket, } } resPos = len(res) - for skey, value := range item.Top { + for key, value := range item.Top { if value.Empty() { // must be never, but check is cheap continue } // We have no badges for string tops - res = appendKeys(res, &item.Key, metricCache, usedTimestamps, v3Format, rnd, skey) - res = multiValueMarshal(rnd, item.Key.Metric, metricCache, res, value, skey, sf, v3Format) + res = appendKeys(res, &item.Key, metricCache, usedTimestamps, v3Format, rnd, key.S) // TODO - insert I + res = multiValueMarshal(rnd, item.Key.Metric, metricCache, res, value, key.S, sf, v3Format) // TODO - insert I } if item.Key.Metric < 0 { is.builtin += len(res) - resPos diff --git a/internal/aggregator/simulator.go b/internal/aggregator/simulator.go index 106307241..742754d94 100644 --- a/internal/aggregator/simulator.go +++ b/internal/aggregator/simulator.go @@ -236,8 +236,8 @@ func generateStats(simID int, journal *metajournal.MetricsStorage, s *agent.Agen value := rng.Float64() * 100 s.AddValueCounter(&kValue2, value, 1, metricInfo2) s.AddValueCounter(&kValue3, value, 1, metricInfo3) - s.AddValueArrayCounterHostStringBytes(&kValue10, []float64{value}, 1, 0, sKey, metricInfo10) - s.AddValueArrayCounterHostStringBytes(&kValue11, []float64{value}, 1, 0, sKey, metricInfo11) + s.AddValueArrayCounterHostStringBytes(&kValue10, []float64{value}, 1, 0, data_model.TagUnionBytes{S: sKey, I: 0}, metricInfo10) + s.AddValueArrayCounterHostStringBytes(&kValue11, []float64{value}, 1, 0, data_model.TagUnionBytes{S: sKey, I: 0}, metricInfo11) } } total = rng.Intn(totalRange) @@ -261,9 +261,9 @@ func generateStats(simID int, journal *metajournal.MetricsStorage, s *agent.Agen for ci := 0; ci < cc; ci++ { const D = 1000000 r := D + int64(rng.Intn(D))*int64(rng.Intn(D)) - s.AddUniqueHostStringBytes(&kUnique4, 0, nil, []int64{r}, 1, metricInfo4) - s.AddUniqueHostStringBytes(&kUnique7, 0, nil, []int64{r}, 1, metricInfo7) - s.AddUniqueHostStringBytes(&kUnique8, 0, sKey, []int64{r}, 1, metricInfo8) + s.AddUniqueHostStringBytes(&kUnique4, 0, data_model.TagUnionBytes{}, []int64{r}, 1, metricInfo4) + s.AddUniqueHostStringBytes(&kUnique7, 0, data_model.TagUnionBytes{}, []int64{r}, 1, metricInfo7) + s.AddUniqueHostStringBytes(&kUnique8, 0, data_model.TagUnionBytes{S: sKey, I: 0}, []int64{r}, 1, metricInfo8) } } diff --git a/internal/aggregator/tags_mapper.go b/internal/aggregator/tags_mapper.go index abb7ca3dd..af7cb38a2 100644 --- a/internal/aggregator/tags_mapper.go +++ b/internal/aggregator/tags_mapper.go @@ -74,7 +74,7 @@ func NewTagsMapper(agg *Aggregator, sh2 *agent.Agent, metricStorage *metajournal case format.TagValueIDAggMappingCreatedStatusCreated: agg.appendInternalLog("map_tag", strconv.Itoa(int(extra.AgentEnv)), "created", askedKey, extra.Metric, strconv.Itoa(int(metricID)), strconv.Itoa(int(extra.TagIDKey)), strconv.Itoa(int(keyValue))) // if askedKey is created, it is valid and safe to write - ms.sh2.AddValueCounterHostStringBytes(key, float64(keyValue), 1, extra.Host, []byte(askedKey), meta) + ms.sh2.AddValueCounterHostString(key, float64(keyValue), 1, extra.Host, data_model.TagUnion{S: askedKey, I: 0}, meta) } } return pcache.Int32ToValue(keyValue), d, err diff --git a/internal/aggregator/tags_mapper2.go b/internal/aggregator/tags_mapper2.go index e7247c208..13fa145a7 100644 --- a/internal/aggregator/tags_mapper2.go +++ b/internal/aggregator/tags_mapper2.go @@ -12,6 +12,7 @@ import ( "time" "github.com/vkcom/statshouse/internal/agent" + "github.com/vkcom/statshouse/internal/data_model" "github.com/vkcom/statshouse/internal/format" "github.com/vkcom/statshouse/internal/metajournal" "github.com/vkcom/statshouse/internal/pcache" @@ -165,7 +166,7 @@ func (ms *tagsMapper2) createTag(str string, extra format.CreateMappingExtra) in key.WithAgentEnvRouteArch(extra.AgentEnv, extra.Route, extra.BuildArch) if err == nil && c == format.TagValueIDAggMappingCreatedStatusCreated { // if str is created, it is valid and safe to write - ms.sh2.AddValueCounterHostStringBytes(key, float64(keyValue), 1, extra.Host, []byte(str), meta) + ms.sh2.AddValueCounterHostString(key, float64(keyValue), 1, extra.Host, data_model.TagUnion{S: str, I: 0}, meta) } else { ms.sh2.AddValueCounterHost(key, 0, 1, extra.Host, meta) } diff --git a/internal/data_model/bucket.go b/internal/data_model/bucket.go index 41931828e..77d803828 100644 --- a/internal/data_model/bucket.go +++ b/internal/data_model/bucket.go @@ -23,6 +23,15 @@ import ( const DefaultStringTopCapacity = 100 // if capacity is 0, this one will be used instead type ( + TagUnion struct { + S string + I int32 + } + TagUnionBytes struct { + S []byte + I int32 + } + // Time Series Key, will be optimized to single human-readable string Key struct { Timestamp uint32 @@ -53,7 +62,7 @@ type ( // All our items are technically string tops, but most have empty Top map MultiItem struct { Key Key - Top map[string]*MultiValue + Top map[TagUnion]*MultiValue Tail MultiValue // elements not in top are collected here sampleFactorLog2 int Capacity int // algorithm supports changing on the fly, <2 means DefaultStringTopCapacity @@ -306,14 +315,14 @@ func (b *MultiItemMap) DeleteMultiItem(key *Key) { // we do not clean keysBuffer, it has same lifetime as b and should be reused } -func (s *MultiItem) MapStringTop(rng *rand.Rand, str string, count float64) *MultiValue { - if len(str) == 0 { +func (s *MultiItem) MapStringTop(rng *rand.Rand, tag TagUnion, count float64) *MultiValue { + if len(tag.S) == 0 && tag.I == 0 { return &s.Tail } if s.Top == nil { - s.Top = map[string]*MultiValue{} + s.Top = map[TagUnion]*MultiValue{} } - c, ok := s.Top[str] + c, ok := s.Top[tag] if ok { return c } @@ -329,18 +338,18 @@ func (s *MultiItem) MapStringTop(rng *rand.Rand, str string, count float64) *Mul s.resample(rng) } c = &MultiValue{} - s.Top[str] = c + s.Top[tag] = c return c } -func (s *MultiItem) MapStringTopBytes(rng *rand.Rand, str []byte, count float64) *MultiValue { - if len(str) == 0 { +func (s *MultiItem) MapStringTopBytes(rng *rand.Rand, tag TagUnionBytes, count float64) *MultiValue { + if len(tag.S) == 0 && tag.I == 0 { return &s.Tail } if s.Top == nil { - s.Top = map[string]*MultiValue{} + s.Top = map[TagUnion]*MultiValue{} } - c, ok := s.Top[string(str)] + c, ok := s.Top[TagUnion{S: string(tag.S), I: tag.I}] if ok { return c } @@ -356,7 +365,7 @@ func (s *MultiItem) MapStringTopBytes(rng *rand.Rand, str []byte, count float64) s.resample(rng) } c = &MultiValue{} - s.Top[string(str)] = c + s.Top[TagUnion{S: string(tag.S), I: tag.I}] = c return c } @@ -377,7 +386,7 @@ func (s *MultiItem) resample(rng *rand.Rand) { } type multiItemPair struct { - k string + k TagUnion v *MultiValue } @@ -416,7 +425,7 @@ func (s *MultiItem) Merge(rng *rand.Rand, s2 *MultiItem) { func (s *MultiItem) RowBinarySizeEstimate() int { size := s.Tail.RowBinarySizeEstimate() for k, v := range s.Top { - size += len(k) + v.RowBinarySizeEstimate() + size += 4 + len(k.S) + v.RowBinarySizeEstimate() } return size } diff --git a/internal/data_model/mapped_metric_header.go b/internal/data_model/mapped_metric_header.go index da0dc7f12..62197ed3a 100644 --- a/internal/data_model/mapped_metric_header.go +++ b/internal/data_model/mapped_metric_header.go @@ -26,8 +26,8 @@ type MappedMetricHeader struct { ReceiveTime time.Time // Saved at mapping start and used where we need time.Now. This is different to MetricBatch.T, which is sent by clients MetricMeta *format.MetricMetaValue Key Key - SValue []byte // reference to memory inside tlstatshouse.MetricBytes. - HostTag int32 + TopValue TagUnionBytes // reference to memory inside tlstatshouse.MetricBytes. + HostTag TagUnionBytes // reference to memory inside tlstatshouse.MetricBytes. CheckedTagIndex int // we check tags one by one, remembering position here, between invocations of mapTags ValuesChecked bool // infs, nans, etc. This might be expensive, so done only once @@ -54,7 +54,7 @@ type MappedMetricHeader struct { func (h *MappedMetricHeader) SetTag(index int, id int32, tagIDKey int32) { if index == format.HostTagIndex { - h.HostTag = id + h.HostTag.I = id if h.IsHKeySet { h.TagSetTwiceKey = tagIDKey } @@ -68,12 +68,20 @@ func (h *MappedMetricHeader) SetTag(index int, id int32, tagIDKey int32) { } } -func (h *MappedMetricHeader) SetSTag(index int, value string, tagIDKey int32) { - h.Key.SetSTag(index, value) - if h.IsTagSet[index] { - h.TagSetTwiceKey = tagIDKey +func (h *MappedMetricHeader) SetSTag(index int, value []byte, tagIDKey int32) { + if index == format.HostTagIndex { + h.HostTag.S = value + if h.IsHKeySet { + h.TagSetTwiceKey = tagIDKey + } + h.IsHKeySet = true + } else { + h.Key.SetSTag(index, string(value)) + if h.IsTagSet[index] { + h.TagSetTwiceKey = tagIDKey + } + h.IsTagSet[index] = true } - h.IsTagSet[index] = true } func (h *MappedMetricHeader) SetInvalidString(ingestionStatus int32, tagIDKey int32, invalidString []byte) { diff --git a/internal/data_model/transfer.go b/internal/data_model/transfer.go index 74cefe24a..e9c4e6610 100644 --- a/internal/data_model/transfer.go +++ b/internal/data_model/transfer.go @@ -175,7 +175,7 @@ func (s *MultiValue) MultiValueToTL(item *tlstatshouse.MultiValue, sampleFactor } } -func (s *ItemValue) MergeWithTLItem2(rng *rand.Rand, s2 *tlstatshouse.MultiValueBytes, fields_mask uint32) { +func CounterFromStatshouseMultiValue(s2 *tlstatshouse.MultiValueBytes, fields_mask uint32) (float64, bool) { counter := float64(0) if s2.IsSetCounterEq1(fields_mask) { counter = 1 @@ -184,11 +184,19 @@ func (s *ItemValue) MergeWithTLItem2(rng *rand.Rand, s2 *tlstatshouse.MultiValue counter = s2.Counter } if counter <= 0 || math.IsNaN(counter) { // sanity check/check for empty String Top tail - return + return -1, false } if counter > math.MaxFloat32 { // agents do similar check, but this is so cheap, we repeat on aggregators. counter = math.MaxFloat32 } + return counter, true +} + +func (s *ItemValue) MergeWithTLItem2(rng *rand.Rand, s2 *tlstatshouse.MultiValueBytes, fields_mask uint32) { + counter, ok := CounterFromStatshouseMultiValue(s2, fields_mask) + if !ok { // sanity check/check for empty String Top tail + return + } s.AddCounterHost(rng, counter, s2.MaxCounterHostTag) if !s2.IsSetValueSet(fields_mask) { return @@ -215,8 +223,8 @@ func (s *ItemValue) MergeWithTLItem2(rng *rand.Rand, s2 *tlstatshouse.MultiValue func (s *MultiItem) MergeWithTLMultiItem(rng *rand.Rand, s2 *tlstatshouse.MultiItemBytes, hostTagId int32) { for _, v := range s2.Top { - mi := s.MapStringTopBytes(rng, v.Key, v.Value.Counter) - v.Key, _ = format.AppendValidStringValue(v.Key[:0], v.Key) // TODO - report this error via builtin metrics + mi := s.MapStringTopBytes(rng, TagUnionBytes{S: v.Key, I: 0}, v.Value.Counter) // TODO - pass I + v.Key, _ = format.AppendValidStringValue(v.Key[:0], v.Key) // TODO - report this error via builtin metrics // we want to validate all incoming strings. In case of encoding error, v.Key will be truncated to 0 mi.MergeWithTL2(rng, &v.Value, v.FieldsMask, hostTagId, AggregatorPercentileCompression) } @@ -226,7 +234,7 @@ func (s *MultiItem) MergeWithTLMultiItem(rng *rand.Rand, s2 *tlstatshouse.MultiI func (s *MultiItem) TLSizeEstimate() int { size := s.Tail.TLSizeEstimate() for k, v := range s.Top { - size += 4 + len(k) + 3 + v.TLSizeEstimate() + size += 4 + len(k.S) + 3 + v.TLSizeEstimate() } return size } diff --git a/internal/format/format.go b/internal/format/format.go index 20de1b2e6..5e3daf8fc 100644 --- a/internal/format/format.go +++ b/internal/format/format.go @@ -111,7 +111,6 @@ const ( const ( LegacyStringTopTagID = "skey" legacyTagIDPrefix = "key" - legacyEnvTagID = legacyTagIDPrefix + "0" legacyMetricKindStringTop = "stop" // converted into counter during RestoreMetricInfo ) diff --git a/internal/mapping/pipeline_v2.go b/internal/mapping/pipeline_v2.go index 9ec76f5bd..e5c6c3a48 100644 --- a/internal/mapping/pipeline_v2.go +++ b/internal/mapping/pipeline_v2.go @@ -78,9 +78,9 @@ func (mp *mapPipelineV2) mapTags(h *data_model.MappedMetricHeader, metric *tlsta } switch { case tagMeta.SkipMapping: - h.SetSTag(tagMeta.Index, string(v.Value), tagIDKey) + h.SetSTag(tagMeta.Index, v.Value, tagIDKey) case tagMeta.Index == format.StringTopTagIndex: - h.SValue = v.Value + h.TopValue.S = v.Value if h.IsSKeySet { h.TagSetTwiceKey = tagIDKey }