Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tiny refacors #1639

Merged
merged 2 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 20 additions & 20 deletions internal/aggregator/aggregator_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,12 @@ func (a *Aggregator) handleGetConfig2(_ context.Context, args tlstatshouse.GetCo

if args.Cluster != a.config.Cluster {
key := a.aggKey(nowUnix, format.BuiltinMetricIDAutoConfig, [16]int32{0, 0, 0, 0, format.TagValueIDAutoConfigWrongCluster})
key = key.WithAgentEnvRouteArch(agentEnv, route, buildArch)
key.WithAgentEnvRouteArch(agentEnv, route, buildArch)
a.sh2.AddCounterHost(key, 1, host, format.BuiltinMetricMetaAutoConfig)
return tlstatshouse.GetConfigResult{}, fmt.Errorf("statshouse misconfiguration! cluster requested %q does not match actual cluster connected %q", args.Cluster, a.config.Cluster)
}
key := a.aggKey(nowUnix, format.BuiltinMetricIDAutoConfig, [16]int32{0, 0, 0, 0, format.TagValueIDAutoConfigOK})
key = key.WithAgentEnvRouteArch(agentEnv, route, buildArch)
key.WithAgentEnvRouteArch(agentEnv, route, buildArch)
a.sh2.AddCounterHost(key, 1, host, format.BuiltinMetricMetaAutoConfig)
return a.getConfigResult(), nil
}
Expand Down Expand Up @@ -230,7 +230,7 @@ func (a *Aggregator) handleSendSourceBucketAny(hctx *rpc.HandlerContext, args tl

if a.configR.DenyOldAgents && args.BuildCommitTs < format.LeastAllowedAgentCommitTs {
key := a.aggKey(nowUnix, format.BuiltinMetricIDAggOutdatedAgents, [16]int32{0, 0, 0, 0, ownerTagId, 0, int32(addrIPV4)})
key = key.WithAgentEnvRouteArch(agentEnv, route, buildArch)
key.WithAgentEnvRouteArch(agentEnv, route, buildArch)
a.sh2.AddCounterHost(key, 1, hostTagId, format.BuiltinMetricMetaAggOutdatedAgents)
return "agent is too old please update", nil
}
Expand All @@ -239,7 +239,7 @@ func (a *Aggregator) handleSendSourceBucketAny(hctx *rpc.HandlerContext, args tl
if err := a.checkShardConfiguration(args.Header.ShardReplica, args.Header.ShardReplicaTotal); err != nil {
a.mu.Unlock()
key := a.aggKey(nowUnix, format.BuiltinMetricIDAutoConfig, [16]int32{0, 0, 0, 0, format.TagValueIDAutoConfigErrorSend, args.Header.ShardReplica, args.Header.ShardReplicaTotal})
key = key.WithAgentEnvRouteArch(agentEnv, route, buildArch)
key.WithAgentEnvRouteArch(agentEnv, route, buildArch)
a.sh2.AddCounterHost(key, 1, hostTagId, format.BuiltinMetricMetaAutoConfig)
return "", err // TODO - return code so clients will print into log and discard data
}
Expand Down Expand Up @@ -272,15 +272,15 @@ func (a *Aggregator) handleSendSourceBucketAny(hctx *rpc.HandlerContext, args tl
if roundedToOurTime > newestTime {
a.mu.Unlock()
key := a.aggKey(nowUnix, format.BuiltinMetricIDTimingErrors, [16]int32{0, format.TagValueIDTimingFutureBucketHistoric})
key = key.WithAgentEnvRouteArch(agentEnv, route, buildArch)
key.WithAgentEnvRouteArch(agentEnv, route, buildArch)
a.sh2.AddValueCounterHost(key, float64(args.Time)-float64(newestTime), 1, hostTagId, format.BuiltinMetricMetaTimingErrors)
// We discard, because otherwise clients will flood aggregators with this data
return "historic bucket time is too far in the future", nil
}
if oldestTime >= data_model.MaxHistoricWindow && roundedToOurTime < oldestTime-data_model.MaxHistoricWindow {
a.mu.Unlock()
key := a.aggKey(nowUnix, format.BuiltinMetricIDTimingErrors, [16]int32{0, format.TagValueIDTimingLongWindowThrownAggregator})
key = key.WithAgentEnvRouteArch(agentEnv, route, buildArch)
key.WithAgentEnvRouteArch(agentEnv, route, buildArch)
a.sh2.AddValueCounterHost(key, float64(newestTime)-float64(args.Time), 1, hostTagId, format.BuiltinMetricMetaTimingErrors)
return "Successfully discarded historic bucket beyond historic window", nil
}
Expand All @@ -305,15 +305,15 @@ func (a *Aggregator) handleSendSourceBucketAny(hctx *rpc.HandlerContext, args tl
if roundedToOurTime > newestTime { // AgentShard too far in a future
a.mu.Unlock()
key := a.aggKey(nowUnix, format.BuiltinMetricIDTimingErrors, [16]int32{0, format.TagValueIDTimingFutureBucketRecent})
key = key.WithAgentEnvRouteArch(agentEnv, route, buildArch)
key.WithAgentEnvRouteArch(agentEnv, route, buildArch)
a.sh2.AddValueCounterHost(key, float64(args.Time)-float64(newestTime), 1, hostTagId, format.BuiltinMetricMetaTimingErrors)
// We discard, because otherwise clients will flood aggregators with this data
return "bucket time is too far in the future", nil
}
if roundedToOurTime < oldestTime {
a.mu.Unlock()
key := a.aggKey(nowUnix, format.BuiltinMetricIDTimingErrors, [16]int32{0, format.TagValueIDTimingLateRecent})
key = key.WithAgentEnvRouteArch(agentEnv, route, buildArch)
key.WithAgentEnvRouteArch(agentEnv, route, buildArch)
a.sh2.AddValueCounterHost(key, float64(newestTime)-float64(args.Time), 1, hostTagId, format.BuiltinMetricMetaTimingErrors)
return "", &rpc.Error{
Code: data_model.RPCErrorMissedRecentConveyor,
Expand Down Expand Up @@ -344,7 +344,7 @@ func (a *Aggregator) handleSendSourceBucketAny(hctx *rpc.HandlerContext, args tl
defer aggBucket.sendMu.RUnlock()

lockedShard := -1
var newKeys []*data_model.Key
var newKeys []data_model.Key
var usedMetrics []int32

// We do not want to decompress under lock, so we decompress before ifs, then rarely throw away decompressed data.
Expand All @@ -371,7 +371,7 @@ func (a *Aggregator) handleSendSourceBucketAny(hctx *rpc.HandlerContext, args tl
clampedTimestampsMetrics[clampedKey{k.Tags[0], k.Metric, clampedTag}]++
}
if k.Metric < 0 && !format.HardwareMetric(k.Metric) {
k = k.WithAgentEnvRouteArch(agentEnv, route, buildArch)
k.WithAgentEnvRouteArch(agentEnv, route, buildArch)
if k.Metric == format.BuiltinMetricIDAgentHeartbeatVersion {
// Remap legacy metric to a new one
k.Metric = format.BuiltinMetricIDHeartbeatVersion
Expand Down Expand Up @@ -406,7 +406,7 @@ func (a *Aggregator) handleSendSourceBucketAny(hctx *rpc.HandlerContext, args tl
}
}
s := aggBucket.lockShard(&lockedShard, sID)
mi, created := s.GetOrCreateMultiItem(k, data_model.AggregatorStringTopCapacity, nil)
mi, created := s.GetOrCreateMultiItem(&k, data_model.AggregatorStringTopCapacity, nil)
mi.MergeWithTLMultiItem(rng, &item, hostTagId)
if created {
if !args.IsSetSpare() { // Data from spares should not affect cardinality estimations
Expand Down Expand Up @@ -446,7 +446,7 @@ func (a *Aggregator) handleSendSourceBucketAny(hctx *rpc.HandlerContext, args tl
s := aggBucket.lockShard(&lockedShard, 0)
getMultiItem := func(t uint32, m int32, keys [16]int32) *data_model.MultiItem {
key := a.aggKey(t, m, keys)
key = key.WithAgentEnvRouteArch(agentEnv, route, buildArch)
key.WithAgentEnvRouteArch(agentEnv, route, buildArch)
mi, _ := s.GetOrCreateMultiItem(key, data_model.AggregatorStringTopCapacity, nil)
return mi
}
Expand All @@ -469,15 +469,13 @@ func (a *Aggregator) handleSendSourceBucketAny(hctx *rpc.HandlerContext, args tl
if args.QueueSizeDiskUnsent > 0 {
getMultiItem(args.Time, format.BuiltinMetricIDAgentHistoricQueueSize, [16]int32{0, format.TagValueIDHistoricQueueDiskUnsent}).Tail.AddValueCounterHost(rng, float64(args.QueueSizeDiskUnsent), 1, hostTagId)
}
queueSizeDiskSent := float64(args.QueueSizeDisk) - float64(args.QueueSizeDiskUnsent)
if queueSizeDiskSent > 0 {
if queueSizeDiskSent := float64(args.QueueSizeDisk) - float64(args.QueueSizeDiskUnsent); queueSizeDiskSent > 0 {
getMultiItem(args.Time, format.BuiltinMetricIDAgentHistoricQueueSize, [16]int32{0, format.TagValueIDHistoricQueueDiskSent}).Tail.AddValueCounterHost(rng, float64(queueSizeDiskSent), 1, hostTagId)
}
if args.QueueSizeDiskSumUnsent > 0 {
getMultiItem(args.Time, format.BuiltinMetricIDAgentHistoricQueueSizeSum, [16]int32{0, format.TagValueIDHistoricQueueDiskUnsent}).Tail.AddValueCounterHost(rng, float64(args.QueueSizeDiskSumUnsent), 1, hostTagId)
}
queueSizeDiskSumSent := float64(args.QueueSizeDiskSum) - float64(args.QueueSizeDiskSumUnsent)
if queueSizeDiskSumSent > 0 {
if queueSizeDiskSumSent := float64(args.QueueSizeDiskSum) - float64(args.QueueSizeDiskSumUnsent); queueSizeDiskSumSent > 0 {
getMultiItem(args.Time, format.BuiltinMetricIDAgentHistoricQueueSizeSum, [16]int32{0, format.TagValueIDHistoricQueueDiskSent}).Tail.AddValueCounterHost(rng, float64(queueSizeDiskSumSent), 1, hostTagId)
}

Expand All @@ -497,7 +495,9 @@ func (a *Aggregator) handleSendSourceBucketAny(hctx *rpc.HandlerContext, args tl
}

ingestionStatus := func(env int32, metricID int32, status int32, value float32) {
mi, _ := s.GetOrCreateMultiItem((&data_model.Key{Timestamp: args.Time, Metric: format.BuiltinMetricIDIngestionStatus, Tags: [16]int32{env, metricID, status}}).WithAgentEnvRouteArch(agentEnv, route, buildArch), data_model.AggregatorStringTopCapacity, nil)
key := data_model.Key{Timestamp: args.Time, Metric: format.BuiltinMetricIDIngestionStatus, Tags: [16]int32{env, metricID, status}}
key.WithAgentEnvRouteArch(agentEnv, route, buildArch)
mi, _ := s.GetOrCreateMultiItem(&key, data_model.AggregatorStringTopCapacity, nil)
mi.Tail.AddCounterHost(rng, float64(value), hostTagId)
}
for _, v := range bucket.IngestionStatusOk {
Expand Down Expand Up @@ -559,7 +559,7 @@ func (a *Aggregator) handleSendKeepAliveAny(hctx *rpc.HandlerContext, args tlsta
if err := a.checkShardConfiguration(args.Header.ShardReplica, args.Header.ShardReplicaTotal); err != nil {
a.mu.Unlock()
key := a.aggKey(nowUnix, format.BuiltinMetricIDAutoConfig, [16]int32{0, 0, 0, 0, format.TagValueIDAutoConfigErrorKeepAlive, args.Header.ShardReplica, args.Header.ShardReplicaTotal})
key = key.WithAgentEnvRouteArch(agentEnv, route, buildArch)
key.WithAgentEnvRouteArch(agentEnv, route, buildArch)
a.sh2.AddCounterHost(key, 1, host, format.BuiltinMetricMetaAutoConfig)
return err
}
Expand Down Expand Up @@ -588,8 +588,8 @@ func (a *Aggregator) handleSendKeepAliveAny(hctx *rpc.HandlerContext, args tlsta
s := aggBucket.lockShard(&lockedShard, 0)
// Counters can contain this metrics while # of contributors is 0. We compensate by adding small fixed budget.
key := a.aggKey(aggBucket.time, format.BuiltinMetricIDAggKeepAlive, [16]int32{})
keyWithAgentEnv := key.WithAgentEnvRouteArch(agentEnv, route, buildArch)
mi, _ := s.GetOrCreateMultiItem(keyWithAgentEnv, data_model.AggregatorStringTopCapacity, nil)
key.WithAgentEnvRouteArch(agentEnv, route, buildArch)
mi, _ := s.GetOrCreateMultiItem(key, data_model.AggregatorStringTopCapacity, nil)
mi.Tail.AddCounterHost(rng, 1, host)
aggBucket.lockShard(&lockedShard, -1)

Expand Down
4 changes: 2 additions & 2 deletions internal/aggregator/tags_mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func NewTagsMapper(agg *Aggregator, sh2 *agent.Agent, metricStorage *metajournal
keyValue, c, d, err := loader.GetTagMapping(ctx, askedKey, metricName, extra.Create)
key := ms.sh2.AggKey(0, format.BuiltinMetricIDAggMappingCreated, [16]int32{extra.ClientEnv, 0, 0, 0, metricID, c, extra.TagIDKey})
meta := format.BuiltinMetricMetaAggMappingCreated
key = key.WithAgentEnvRouteArch(extra.AgentEnv, extra.Route, extra.BuildArch)
key.WithAgentEnvRouteArch(extra.AgentEnv, extra.Route, extra.BuildArch)

if err != nil {
// TODO - write to actual log from time to time
Expand Down Expand Up @@ -181,7 +181,7 @@ func (ms *TagsMapper) handleCreateTagMapping(_ context.Context, hctx *rpc.Handle
route = int32(format.TagValueIDRouteIngressProxy)
}
key := ms.sh2.AggKey(0, format.BuiltinMetricIDAggMapping, [16]int32{0, 0, 0, 0, format.TagValueIDAggMappingMetaMetrics, format.TagValueIDAggMappingStatusOKCached})
key = key.WithAgentEnvRouteArch(agentEnv, route, buildArch)
key.WithAgentEnvRouteArch(agentEnv, route, buildArch)
meta := format.BuiltinMetricMetaAggMapping

r := ms.tagValue.GetCached(now, args.Key)
Expand Down
3 changes: 1 addition & 2 deletions internal/data_model/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,15 +132,14 @@ func (k *Key) Marshal(buffer []byte) (updatedBuffer []byte, newKey []byte) {
return
}

func (k *Key) WithAgentEnvRouteArch(agentEnvTag int32, routeTag int32, buildArchTag int32) *Key {
func (k *Key) WithAgentEnvRouteArch(agentEnvTag int32, routeTag int32, buildArchTag int32) {
// when aggregator receives metric from an agent inside another aggregator, those keys are already set,
// so we simply keep them. AgentEnvTag or RouteTag are always non-zero in this case.
if k.Tags[format.AgentEnvTag] == 0 {
k.Tags[format.AgentEnvTag] = agentEnvTag
k.Tags[format.RouteTag] = routeTag
k.Tags[format.BuildArchTag] = buildArchTag
}
return k
}

func AggKey(t uint32, m int32, k [format.MaxTags]int32, hostTagId int32, shardTag int32, replicaTag int32) *Key {
Expand Down
2 changes: 1 addition & 1 deletion internal/data_model/estimator.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (e *Estimator) Init(window int, maxCardinality int) {
e.halfHour = map[uint32]map[int32]*ChUnique{}
}

func (e *Estimator) UpdateWithKeys(time uint32, keys []*Key) {
func (e *Estimator) UpdateWithKeys(time uint32, keys []Key) {
e.mu.Lock()
defer e.mu.Unlock()
ah, bh := e.createEstimatorsLocked(time)
Expand Down
11 changes: 4 additions & 7 deletions internal/data_model/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,9 @@ func (k *Key) TagSlice() []int32 {
return result[:i]
}

func KeyFromStatshouseMultiItem(item *tlstatshouse.MultiItemBytes, bucketTimestamp uint32, newestTime uint32) (key *Key, shardID int, clampedTimestampTag int32) {
func KeyFromStatshouseMultiItem(item *tlstatshouse.MultiItemBytes, bucketTimestamp uint32, newestTime uint32) (key Key, shardID int, clampedTimestampTag int32) {
// We use high byte of fieldsmask to pass shardID to aggregator, otherwise it is too much work for CPU
shardID = int(item.FieldsMask >> 24)
key = &Key{}
key.Timestamp = bucketTimestamp
if item.IsSetT() {
key.Timestamp = item.T
Expand All @@ -45,14 +44,12 @@ func KeyFromStatshouseMultiItem(item *tlstatshouse.MultiItemBytes, bucketTimesta
key.Timestamp = bucketTimestamp - BelieveTimestampWindow
clampedTimestampTag = format.TagValueIDSrcIngestionStatusWarnTimestampClampedPast
}
// above checks can be moved below }, but they will always be NOP as bucketTimestamp is both <= newestTime and in beleive window
// above checks can be moved below }, but they will always be NOP as bucketTimestamp is both <= newestTime and in believe window
}
key.Metric = item.Metric
copy(key.Tags[:], item.Keys)
if item.IsSetSkeys() {
for i := range item.Skeys {
key.SetSTag(i, string(item.Skeys[i]))
}
for i, k := range item.Skeys {
key.SetSTag(i, string(k))
}
return
}
Expand Down
2 changes: 1 addition & 1 deletion internal/data_model/transfer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func TestKeySizeEstimationEdgeCases(t *testing.T) {
}

// Helper function to convert Key to MultiItemBytes and back
func roundTripKey(key Key, bucketTimestamp uint32, newestTime uint32) *Key {
func roundTripKey(key Key, bucketTimestamp uint32, newestTime uint32) Key {
// Convert Key to MultiItem
item := key.TLMultiItemFromKey(bucketTimestamp)

Expand Down
Loading