diff --git a/internal/aggregator/aggregator_handlers.go b/internal/aggregator/aggregator_handlers.go index 39e466c3c..85e79e8f5 100644 --- a/internal/aggregator/aggregator_handlers.go +++ b/internal/aggregator/aggregator_handlers.go @@ -95,12 +95,12 @@ func (a *Aggregator) handleGetConfig2(_ context.Context, args tlstatshouse.GetCo if args.Cluster != a.config.Cluster { key := a.aggKey(nowUnix, format.BuiltinMetricIDAutoConfig, [16]int32{0, 0, 0, 0, format.TagValueIDAutoConfigWrongCluster}) - key = key.WithAgentEnvRouteArch(agentEnv, route, buildArch) + key.WithAgentEnvRouteArch(agentEnv, route, buildArch) a.sh2.AddCounterHost(key, 1, host, format.BuiltinMetricMetaAutoConfig) return tlstatshouse.GetConfigResult{}, fmt.Errorf("statshouse misconfiguration! cluster requested %q does not match actual cluster connected %q", args.Cluster, a.config.Cluster) } key := a.aggKey(nowUnix, format.BuiltinMetricIDAutoConfig, [16]int32{0, 0, 0, 0, format.TagValueIDAutoConfigOK}) - key = key.WithAgentEnvRouteArch(agentEnv, route, buildArch) + key.WithAgentEnvRouteArch(agentEnv, route, buildArch) a.sh2.AddCounterHost(key, 1, host, format.BuiltinMetricMetaAutoConfig) return a.getConfigResult(), nil } @@ -230,7 +230,7 @@ func (a *Aggregator) handleSendSourceBucketAny(hctx *rpc.HandlerContext, args tl if a.configR.DenyOldAgents && args.BuildCommitTs < format.LeastAllowedAgentCommitTs { key := a.aggKey(nowUnix, format.BuiltinMetricIDAggOutdatedAgents, [16]int32{0, 0, 0, 0, ownerTagId, 0, int32(addrIPV4)}) - key = key.WithAgentEnvRouteArch(agentEnv, route, buildArch) + key.WithAgentEnvRouteArch(agentEnv, route, buildArch) a.sh2.AddCounterHost(key, 1, hostTagId, format.BuiltinMetricMetaAggOutdatedAgents) return "agent is too old please update", nil } @@ -239,7 +239,7 @@ func (a *Aggregator) handleSendSourceBucketAny(hctx *rpc.HandlerContext, args tl if err := a.checkShardConfiguration(args.Header.ShardReplica, args.Header.ShardReplicaTotal); err != nil { a.mu.Unlock() key := a.aggKey(nowUnix, format.BuiltinMetricIDAutoConfig, [16]int32{0, 0, 0, 0, format.TagValueIDAutoConfigErrorSend, args.Header.ShardReplica, args.Header.ShardReplicaTotal}) - key = key.WithAgentEnvRouteArch(agentEnv, route, buildArch) + key.WithAgentEnvRouteArch(agentEnv, route, buildArch) a.sh2.AddCounterHost(key, 1, hostTagId, format.BuiltinMetricMetaAutoConfig) return "", err // TODO - return code so clients will print into log and discard data } @@ -272,7 +272,7 @@ func (a *Aggregator) handleSendSourceBucketAny(hctx *rpc.HandlerContext, args tl if roundedToOurTime > newestTime { a.mu.Unlock() key := a.aggKey(nowUnix, format.BuiltinMetricIDTimingErrors, [16]int32{0, format.TagValueIDTimingFutureBucketHistoric}) - key = key.WithAgentEnvRouteArch(agentEnv, route, buildArch) + key.WithAgentEnvRouteArch(agentEnv, route, buildArch) a.sh2.AddValueCounterHost(key, float64(args.Time)-float64(newestTime), 1, hostTagId, format.BuiltinMetricMetaTimingErrors) // We discard, because otherwise clients will flood aggregators with this data return "historic bucket time is too far in the future", nil @@ -280,7 +280,7 @@ func (a *Aggregator) handleSendSourceBucketAny(hctx *rpc.HandlerContext, args tl if oldestTime >= data_model.MaxHistoricWindow && roundedToOurTime < oldestTime-data_model.MaxHistoricWindow { a.mu.Unlock() key := a.aggKey(nowUnix, format.BuiltinMetricIDTimingErrors, [16]int32{0, format.TagValueIDTimingLongWindowThrownAggregator}) - key = key.WithAgentEnvRouteArch(agentEnv, route, buildArch) + key.WithAgentEnvRouteArch(agentEnv, route, buildArch) a.sh2.AddValueCounterHost(key, float64(newestTime)-float64(args.Time), 1, hostTagId, format.BuiltinMetricMetaTimingErrors) return "Successfully discarded historic bucket beyond historic window", nil } @@ -305,7 +305,7 @@ func (a *Aggregator) handleSendSourceBucketAny(hctx *rpc.HandlerContext, args tl if roundedToOurTime > newestTime { // AgentShard too far in a future a.mu.Unlock() key := a.aggKey(nowUnix, format.BuiltinMetricIDTimingErrors, [16]int32{0, format.TagValueIDTimingFutureBucketRecent}) - key = key.WithAgentEnvRouteArch(agentEnv, route, buildArch) + key.WithAgentEnvRouteArch(agentEnv, route, buildArch) a.sh2.AddValueCounterHost(key, float64(args.Time)-float64(newestTime), 1, hostTagId, format.BuiltinMetricMetaTimingErrors) // We discard, because otherwise clients will flood aggregators with this data return "bucket time is too far in the future", nil @@ -313,7 +313,7 @@ func (a *Aggregator) handleSendSourceBucketAny(hctx *rpc.HandlerContext, args tl if roundedToOurTime < oldestTime { a.mu.Unlock() key := a.aggKey(nowUnix, format.BuiltinMetricIDTimingErrors, [16]int32{0, format.TagValueIDTimingLateRecent}) - key = key.WithAgentEnvRouteArch(agentEnv, route, buildArch) + key.WithAgentEnvRouteArch(agentEnv, route, buildArch) a.sh2.AddValueCounterHost(key, float64(newestTime)-float64(args.Time), 1, hostTagId, format.BuiltinMetricMetaTimingErrors) return "", &rpc.Error{ Code: data_model.RPCErrorMissedRecentConveyor, @@ -344,7 +344,7 @@ func (a *Aggregator) handleSendSourceBucketAny(hctx *rpc.HandlerContext, args tl defer aggBucket.sendMu.RUnlock() lockedShard := -1 - var newKeys []*data_model.Key + var newKeys []data_model.Key var usedMetrics []int32 // We do not want to decompress under lock, so we decompress before ifs, then rarely throw away decompressed data. @@ -371,7 +371,7 @@ func (a *Aggregator) handleSendSourceBucketAny(hctx *rpc.HandlerContext, args tl clampedTimestampsMetrics[clampedKey{k.Tags[0], k.Metric, clampedTag}]++ } if k.Metric < 0 && !format.HardwareMetric(k.Metric) { - k = k.WithAgentEnvRouteArch(agentEnv, route, buildArch) + k.WithAgentEnvRouteArch(agentEnv, route, buildArch) if k.Metric == format.BuiltinMetricIDAgentHeartbeatVersion { // Remap legacy metric to a new one k.Metric = format.BuiltinMetricIDHeartbeatVersion @@ -406,7 +406,7 @@ func (a *Aggregator) handleSendSourceBucketAny(hctx *rpc.HandlerContext, args tl } } s := aggBucket.lockShard(&lockedShard, sID) - mi, created := s.GetOrCreateMultiItem(k, data_model.AggregatorStringTopCapacity, nil) + mi, created := s.GetOrCreateMultiItem(&k, data_model.AggregatorStringTopCapacity, nil) mi.MergeWithTLMultiItem(rng, &item, hostTagId) if created { if !args.IsSetSpare() { // Data from spares should not affect cardinality estimations @@ -446,7 +446,7 @@ func (a *Aggregator) handleSendSourceBucketAny(hctx *rpc.HandlerContext, args tl s := aggBucket.lockShard(&lockedShard, 0) getMultiItem := func(t uint32, m int32, keys [16]int32) *data_model.MultiItem { key := a.aggKey(t, m, keys) - key = key.WithAgentEnvRouteArch(agentEnv, route, buildArch) + key.WithAgentEnvRouteArch(agentEnv, route, buildArch) mi, _ := s.GetOrCreateMultiItem(key, data_model.AggregatorStringTopCapacity, nil) return mi } @@ -469,15 +469,13 @@ func (a *Aggregator) handleSendSourceBucketAny(hctx *rpc.HandlerContext, args tl if args.QueueSizeDiskUnsent > 0 { getMultiItem(args.Time, format.BuiltinMetricIDAgentHistoricQueueSize, [16]int32{0, format.TagValueIDHistoricQueueDiskUnsent}).Tail.AddValueCounterHost(rng, float64(args.QueueSizeDiskUnsent), 1, hostTagId) } - queueSizeDiskSent := float64(args.QueueSizeDisk) - float64(args.QueueSizeDiskUnsent) - if queueSizeDiskSent > 0 { + if queueSizeDiskSent := float64(args.QueueSizeDisk) - float64(args.QueueSizeDiskUnsent); queueSizeDiskSent > 0 { getMultiItem(args.Time, format.BuiltinMetricIDAgentHistoricQueueSize, [16]int32{0, format.TagValueIDHistoricQueueDiskSent}).Tail.AddValueCounterHost(rng, float64(queueSizeDiskSent), 1, hostTagId) } if args.QueueSizeDiskSumUnsent > 0 { getMultiItem(args.Time, format.BuiltinMetricIDAgentHistoricQueueSizeSum, [16]int32{0, format.TagValueIDHistoricQueueDiskUnsent}).Tail.AddValueCounterHost(rng, float64(args.QueueSizeDiskSumUnsent), 1, hostTagId) } - queueSizeDiskSumSent := float64(args.QueueSizeDiskSum) - float64(args.QueueSizeDiskSumUnsent) - if queueSizeDiskSumSent > 0 { + if queueSizeDiskSumSent := float64(args.QueueSizeDiskSum) - float64(args.QueueSizeDiskSumUnsent); queueSizeDiskSumSent > 0 { getMultiItem(args.Time, format.BuiltinMetricIDAgentHistoricQueueSizeSum, [16]int32{0, format.TagValueIDHistoricQueueDiskSent}).Tail.AddValueCounterHost(rng, float64(queueSizeDiskSumSent), 1, hostTagId) } @@ -497,7 +495,9 @@ func (a *Aggregator) handleSendSourceBucketAny(hctx *rpc.HandlerContext, args tl } ingestionStatus := func(env int32, metricID int32, status int32, value float32) { - mi, _ := s.GetOrCreateMultiItem((&data_model.Key{Timestamp: args.Time, Metric: format.BuiltinMetricIDIngestionStatus, Tags: [16]int32{env, metricID, status}}).WithAgentEnvRouteArch(agentEnv, route, buildArch), data_model.AggregatorStringTopCapacity, nil) + key := data_model.Key{Timestamp: args.Time, Metric: format.BuiltinMetricIDIngestionStatus, Tags: [16]int32{env, metricID, status}} + key.WithAgentEnvRouteArch(agentEnv, route, buildArch) + mi, _ := s.GetOrCreateMultiItem(&key, data_model.AggregatorStringTopCapacity, nil) mi.Tail.AddCounterHost(rng, float64(value), hostTagId) } for _, v := range bucket.IngestionStatusOk { @@ -559,7 +559,7 @@ func (a *Aggregator) handleSendKeepAliveAny(hctx *rpc.HandlerContext, args tlsta if err := a.checkShardConfiguration(args.Header.ShardReplica, args.Header.ShardReplicaTotal); err != nil { a.mu.Unlock() key := a.aggKey(nowUnix, format.BuiltinMetricIDAutoConfig, [16]int32{0, 0, 0, 0, format.TagValueIDAutoConfigErrorKeepAlive, args.Header.ShardReplica, args.Header.ShardReplicaTotal}) - key = key.WithAgentEnvRouteArch(agentEnv, route, buildArch) + key.WithAgentEnvRouteArch(agentEnv, route, buildArch) a.sh2.AddCounterHost(key, 1, host, format.BuiltinMetricMetaAutoConfig) return err } @@ -588,8 +588,8 @@ func (a *Aggregator) handleSendKeepAliveAny(hctx *rpc.HandlerContext, args tlsta s := aggBucket.lockShard(&lockedShard, 0) // Counters can contain this metrics while # of contributors is 0. We compensate by adding small fixed budget. key := a.aggKey(aggBucket.time, format.BuiltinMetricIDAggKeepAlive, [16]int32{}) - keyWithAgentEnv := key.WithAgentEnvRouteArch(agentEnv, route, buildArch) - mi, _ := s.GetOrCreateMultiItem(keyWithAgentEnv, data_model.AggregatorStringTopCapacity, nil) + key.WithAgentEnvRouteArch(agentEnv, route, buildArch) + mi, _ := s.GetOrCreateMultiItem(key, data_model.AggregatorStringTopCapacity, nil) mi.Tail.AddCounterHost(rng, 1, host) aggBucket.lockShard(&lockedShard, -1) diff --git a/internal/aggregator/tags_mapper.go b/internal/aggregator/tags_mapper.go index 47a0c31ca..a262b4be9 100644 --- a/internal/aggregator/tags_mapper.go +++ b/internal/aggregator/tags_mapper.go @@ -59,7 +59,7 @@ func NewTagsMapper(agg *Aggregator, sh2 *agent.Agent, metricStorage *metajournal keyValue, c, d, err := loader.GetTagMapping(ctx, askedKey, metricName, extra.Create) key := ms.sh2.AggKey(0, format.BuiltinMetricIDAggMappingCreated, [16]int32{extra.ClientEnv, 0, 0, 0, metricID, c, extra.TagIDKey}) meta := format.BuiltinMetricMetaAggMappingCreated - key = key.WithAgentEnvRouteArch(extra.AgentEnv, extra.Route, extra.BuildArch) + key.WithAgentEnvRouteArch(extra.AgentEnv, extra.Route, extra.BuildArch) if err != nil { // TODO - write to actual log from time to time @@ -181,7 +181,7 @@ func (ms *TagsMapper) handleCreateTagMapping(_ context.Context, hctx *rpc.Handle route = int32(format.TagValueIDRouteIngressProxy) } key := ms.sh2.AggKey(0, format.BuiltinMetricIDAggMapping, [16]int32{0, 0, 0, 0, format.TagValueIDAggMappingMetaMetrics, format.TagValueIDAggMappingStatusOKCached}) - key = key.WithAgentEnvRouteArch(agentEnv, route, buildArch) + key.WithAgentEnvRouteArch(agentEnv, route, buildArch) meta := format.BuiltinMetricMetaAggMapping r := ms.tagValue.GetCached(now, args.Key) diff --git a/internal/data_model/bucket.go b/internal/data_model/bucket.go index 3d13caf62..36f4a6123 100644 --- a/internal/data_model/bucket.go +++ b/internal/data_model/bucket.go @@ -132,7 +132,7 @@ func (k *Key) Marshal(buffer []byte) (updatedBuffer []byte, newKey []byte) { return } -func (k *Key) WithAgentEnvRouteArch(agentEnvTag int32, routeTag int32, buildArchTag int32) *Key { +func (k *Key) WithAgentEnvRouteArch(agentEnvTag int32, routeTag int32, buildArchTag int32) { // when aggregator receives metric from an agent inside another aggregator, those keys are already set, // so we simply keep them. AgentEnvTag or RouteTag are always non-zero in this case. if k.Tags[format.AgentEnvTag] == 0 { @@ -140,7 +140,6 @@ func (k *Key) WithAgentEnvRouteArch(agentEnvTag int32, routeTag int32, buildArch k.Tags[format.RouteTag] = routeTag k.Tags[format.BuildArchTag] = buildArchTag } - return k } func AggKey(t uint32, m int32, k [format.MaxTags]int32, hostTagId int32, shardTag int32, replicaTag int32) *Key { diff --git a/internal/data_model/estimator.go b/internal/data_model/estimator.go index d484597b4..423ff676a 100644 --- a/internal/data_model/estimator.go +++ b/internal/data_model/estimator.go @@ -48,7 +48,7 @@ func (e *Estimator) Init(window int, maxCardinality int) { e.halfHour = map[uint32]map[int32]*ChUnique{} } -func (e *Estimator) UpdateWithKeys(time uint32, keys []*Key) { +func (e *Estimator) UpdateWithKeys(time uint32, keys []Key) { e.mu.Lock() defer e.mu.Unlock() ah, bh := e.createEstimatorsLocked(time) diff --git a/internal/data_model/transfer.go b/internal/data_model/transfer.go index 5898a306f..296b081f7 100644 --- a/internal/data_model/transfer.go +++ b/internal/data_model/transfer.go @@ -28,10 +28,9 @@ func (k *Key) TagSlice() []int32 { return result[:i] } -func KeyFromStatshouseMultiItem(item *tlstatshouse.MultiItemBytes, bucketTimestamp uint32, newestTime uint32) (key *Key, shardID int, clampedTimestampTag int32) { +func KeyFromStatshouseMultiItem(item *tlstatshouse.MultiItemBytes, bucketTimestamp uint32, newestTime uint32) (key Key, shardID int, clampedTimestampTag int32) { // We use high byte of fieldsmask to pass shardID to aggregator, otherwise it is too much work for CPU shardID = int(item.FieldsMask >> 24) - key = &Key{} key.Timestamp = bucketTimestamp if item.IsSetT() { key.Timestamp = item.T @@ -45,14 +44,12 @@ func KeyFromStatshouseMultiItem(item *tlstatshouse.MultiItemBytes, bucketTimesta key.Timestamp = bucketTimestamp - BelieveTimestampWindow clampedTimestampTag = format.TagValueIDSrcIngestionStatusWarnTimestampClampedPast } - // above checks can be moved below }, but they will always be NOP as bucketTimestamp is both <= newestTime and in beleive window + // above checks can be moved below }, but they will always be NOP as bucketTimestamp is both <= newestTime and in believe window } key.Metric = item.Metric copy(key.Tags[:], item.Keys) - if item.IsSetSkeys() { - for i := range item.Skeys { - key.SetSTag(i, string(item.Skeys[i])) - } + for i, k := range item.Skeys { + key.SetSTag(i, string(k)) } return } diff --git a/internal/data_model/transfer_test.go b/internal/data_model/transfer_test.go index e4c967d3b..97db9a273 100644 --- a/internal/data_model/transfer_test.go +++ b/internal/data_model/transfer_test.go @@ -176,7 +176,7 @@ func TestKeySizeEstimationEdgeCases(t *testing.T) { } // Helper function to convert Key to MultiItemBytes and back -func roundTripKey(key Key, bucketTimestamp uint32, newestTime uint32) *Key { +func roundTripKey(key Key, bucketTimestamp uint32, newestTime uint32) Key { // Convert Key to MultiItem item := key.TLMultiItemFromKey(bucketTimestamp)