Skip to content

Commit

Permalink
tiny refacors
Browse files Browse the repository at this point in the history
  • Loading branch information
hrissan committed Jan 8, 2025
1 parent 9aa45f5 commit e314edb
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 33 deletions.
40 changes: 20 additions & 20 deletions internal/aggregator/aggregator_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,12 @@ func (a *Aggregator) handleGetConfig2(_ context.Context, args tlstatshouse.GetCo

if args.Cluster != a.config.Cluster {
key := a.aggKey(nowUnix, format.BuiltinMetricIDAutoConfig, [16]int32{0, 0, 0, 0, format.TagValueIDAutoConfigWrongCluster})
key = key.WithAgentEnvRouteArch(agentEnv, route, buildArch)
key.WithAgentEnvRouteArch(agentEnv, route, buildArch)
a.sh2.AddCounterHost(key, 1, host, format.BuiltinMetricMetaAutoConfig)
return tlstatshouse.GetConfigResult{}, fmt.Errorf("statshouse misconfiguration! cluster requested %q does not match actual cluster connected %q", args.Cluster, a.config.Cluster)
}
key := a.aggKey(nowUnix, format.BuiltinMetricIDAutoConfig, [16]int32{0, 0, 0, 0, format.TagValueIDAutoConfigOK})
key = key.WithAgentEnvRouteArch(agentEnv, route, buildArch)
key.WithAgentEnvRouteArch(agentEnv, route, buildArch)
a.sh2.AddCounterHost(key, 1, host, format.BuiltinMetricMetaAutoConfig)
return a.getConfigResult(), nil
}
Expand Down Expand Up @@ -230,7 +230,7 @@ func (a *Aggregator) handleSendSourceBucketAny(hctx *rpc.HandlerContext, args tl

if a.configR.DenyOldAgents && args.BuildCommitTs < format.LeastAllowedAgentCommitTs {
key := a.aggKey(nowUnix, format.BuiltinMetricIDAggOutdatedAgents, [16]int32{0, 0, 0, 0, ownerTagId, 0, int32(addrIPV4)})
key = key.WithAgentEnvRouteArch(agentEnv, route, buildArch)
key.WithAgentEnvRouteArch(agentEnv, route, buildArch)
a.sh2.AddCounterHost(key, 1, hostTagId, format.BuiltinMetricMetaAggOutdatedAgents)
return "agent is too old please update", nil
}
Expand All @@ -239,7 +239,7 @@ func (a *Aggregator) handleSendSourceBucketAny(hctx *rpc.HandlerContext, args tl
if err := a.checkShardConfiguration(args.Header.ShardReplica, args.Header.ShardReplicaTotal); err != nil {
a.mu.Unlock()
key := a.aggKey(nowUnix, format.BuiltinMetricIDAutoConfig, [16]int32{0, 0, 0, 0, format.TagValueIDAutoConfigErrorSend, args.Header.ShardReplica, args.Header.ShardReplicaTotal})
key = key.WithAgentEnvRouteArch(agentEnv, route, buildArch)
key.WithAgentEnvRouteArch(agentEnv, route, buildArch)
a.sh2.AddCounterHost(key, 1, hostTagId, format.BuiltinMetricMetaAutoConfig)
return "", err // TODO - return code so clients will print into log and discard data
}
Expand Down Expand Up @@ -272,15 +272,15 @@ func (a *Aggregator) handleSendSourceBucketAny(hctx *rpc.HandlerContext, args tl
if roundedToOurTime > newestTime {
a.mu.Unlock()
key := a.aggKey(nowUnix, format.BuiltinMetricIDTimingErrors, [16]int32{0, format.TagValueIDTimingFutureBucketHistoric})
key = key.WithAgentEnvRouteArch(agentEnv, route, buildArch)
key.WithAgentEnvRouteArch(agentEnv, route, buildArch)
a.sh2.AddValueCounterHost(key, float64(args.Time)-float64(newestTime), 1, hostTagId, format.BuiltinMetricMetaTimingErrors)
// We discard, because otherwise clients will flood aggregators with this data
return "historic bucket time is too far in the future", nil
}
if oldestTime >= data_model.MaxHistoricWindow && roundedToOurTime < oldestTime-data_model.MaxHistoricWindow {
a.mu.Unlock()
key := a.aggKey(nowUnix, format.BuiltinMetricIDTimingErrors, [16]int32{0, format.TagValueIDTimingLongWindowThrownAggregator})
key = key.WithAgentEnvRouteArch(agentEnv, route, buildArch)
key.WithAgentEnvRouteArch(agentEnv, route, buildArch)
a.sh2.AddValueCounterHost(key, float64(newestTime)-float64(args.Time), 1, hostTagId, format.BuiltinMetricMetaTimingErrors)
return "Successfully discarded historic bucket beyond historic window", nil
}
Expand All @@ -305,15 +305,15 @@ func (a *Aggregator) handleSendSourceBucketAny(hctx *rpc.HandlerContext, args tl
if roundedToOurTime > newestTime { // AgentShard too far in a future
a.mu.Unlock()
key := a.aggKey(nowUnix, format.BuiltinMetricIDTimingErrors, [16]int32{0, format.TagValueIDTimingFutureBucketRecent})
key = key.WithAgentEnvRouteArch(agentEnv, route, buildArch)
key.WithAgentEnvRouteArch(agentEnv, route, buildArch)
a.sh2.AddValueCounterHost(key, float64(args.Time)-float64(newestTime), 1, hostTagId, format.BuiltinMetricMetaTimingErrors)
// We discard, because otherwise clients will flood aggregators with this data
return "bucket time is too far in the future", nil
}
if roundedToOurTime < oldestTime {
a.mu.Unlock()
key := a.aggKey(nowUnix, format.BuiltinMetricIDTimingErrors, [16]int32{0, format.TagValueIDTimingLateRecent})
key = key.WithAgentEnvRouteArch(agentEnv, route, buildArch)
key.WithAgentEnvRouteArch(agentEnv, route, buildArch)
a.sh2.AddValueCounterHost(key, float64(newestTime)-float64(args.Time), 1, hostTagId, format.BuiltinMetricMetaTimingErrors)
return "", &rpc.Error{
Code: data_model.RPCErrorMissedRecentConveyor,
Expand Down Expand Up @@ -344,7 +344,7 @@ func (a *Aggregator) handleSendSourceBucketAny(hctx *rpc.HandlerContext, args tl
defer aggBucket.sendMu.RUnlock()

lockedShard := -1
var newKeys []*data_model.Key
var newKeys []data_model.Key
var usedMetrics []int32

// We do not want to decompress under lock, so we decompress before ifs, then rarely throw away decompressed data.
Expand All @@ -371,7 +371,7 @@ func (a *Aggregator) handleSendSourceBucketAny(hctx *rpc.HandlerContext, args tl
clampedTimestampsMetrics[clampedKey{k.Tags[0], k.Metric, clampedTag}]++
}
if k.Metric < 0 && !format.HardwareMetric(k.Metric) {
k = k.WithAgentEnvRouteArch(agentEnv, route, buildArch)
k.WithAgentEnvRouteArch(agentEnv, route, buildArch)
if k.Metric == format.BuiltinMetricIDAgentHeartbeatVersion {
// Remap legacy metric to a new one
k.Metric = format.BuiltinMetricIDHeartbeatVersion
Expand Down Expand Up @@ -406,7 +406,7 @@ func (a *Aggregator) handleSendSourceBucketAny(hctx *rpc.HandlerContext, args tl
}
}
s := aggBucket.lockShard(&lockedShard, sID)
mi, created := s.GetOrCreateMultiItem(k, data_model.AggregatorStringTopCapacity, nil)
mi, created := s.GetOrCreateMultiItem(&k, data_model.AggregatorStringTopCapacity, nil)
mi.MergeWithTLMultiItem(rng, &item, hostTagId)
if created {
if !args.IsSetSpare() { // Data from spares should not affect cardinality estimations
Expand Down Expand Up @@ -446,7 +446,7 @@ func (a *Aggregator) handleSendSourceBucketAny(hctx *rpc.HandlerContext, args tl
s := aggBucket.lockShard(&lockedShard, 0)
getMultiItem := func(t uint32, m int32, keys [16]int32) *data_model.MultiItem {
key := a.aggKey(t, m, keys)
key = key.WithAgentEnvRouteArch(agentEnv, route, buildArch)
key.WithAgentEnvRouteArch(agentEnv, route, buildArch)
mi, _ := s.GetOrCreateMultiItem(key, data_model.AggregatorStringTopCapacity, nil)
return mi
}
Expand All @@ -469,15 +469,13 @@ func (a *Aggregator) handleSendSourceBucketAny(hctx *rpc.HandlerContext, args tl
if args.QueueSizeDiskUnsent > 0 {
getMultiItem(args.Time, format.BuiltinMetricIDAgentHistoricQueueSize, [16]int32{0, format.TagValueIDHistoricQueueDiskUnsent}).Tail.AddValueCounterHost(rng, float64(args.QueueSizeDiskUnsent), 1, hostTagId)
}
queueSizeDiskSent := float64(args.QueueSizeDisk) - float64(args.QueueSizeDiskUnsent)
if queueSizeDiskSent > 0 {
if queueSizeDiskSent := float64(args.QueueSizeDisk) - float64(args.QueueSizeDiskUnsent); queueSizeDiskSent > 0 {
getMultiItem(args.Time, format.BuiltinMetricIDAgentHistoricQueueSize, [16]int32{0, format.TagValueIDHistoricQueueDiskSent}).Tail.AddValueCounterHost(rng, float64(queueSizeDiskSent), 1, hostTagId)
}
if args.QueueSizeDiskSumUnsent > 0 {
getMultiItem(args.Time, format.BuiltinMetricIDAgentHistoricQueueSizeSum, [16]int32{0, format.TagValueIDHistoricQueueDiskUnsent}).Tail.AddValueCounterHost(rng, float64(args.QueueSizeDiskSumUnsent), 1, hostTagId)
}
queueSizeDiskSumSent := float64(args.QueueSizeDiskSum) - float64(args.QueueSizeDiskSumUnsent)
if queueSizeDiskSumSent > 0 {
if queueSizeDiskSumSent := float64(args.QueueSizeDiskSum) - float64(args.QueueSizeDiskSumUnsent); queueSizeDiskSumSent > 0 {
getMultiItem(args.Time, format.BuiltinMetricIDAgentHistoricQueueSizeSum, [16]int32{0, format.TagValueIDHistoricQueueDiskSent}).Tail.AddValueCounterHost(rng, float64(queueSizeDiskSumSent), 1, hostTagId)
}

Expand All @@ -497,7 +495,9 @@ func (a *Aggregator) handleSendSourceBucketAny(hctx *rpc.HandlerContext, args tl
}

ingestionStatus := func(env int32, metricID int32, status int32, value float32) {
mi, _ := s.GetOrCreateMultiItem((&data_model.Key{Timestamp: args.Time, Metric: format.BuiltinMetricIDIngestionStatus, Tags: [16]int32{env, metricID, status}}).WithAgentEnvRouteArch(agentEnv, route, buildArch), data_model.AggregatorStringTopCapacity, nil)
key := data_model.Key{Timestamp: args.Time, Metric: format.BuiltinMetricIDIngestionStatus, Tags: [16]int32{env, metricID, status}}
key.WithAgentEnvRouteArch(agentEnv, route, buildArch)
mi, _ := s.GetOrCreateMultiItem(&key, data_model.AggregatorStringTopCapacity, nil)
mi.Tail.AddCounterHost(rng, float64(value), hostTagId)
}
for _, v := range bucket.IngestionStatusOk {
Expand Down Expand Up @@ -559,7 +559,7 @@ func (a *Aggregator) handleSendKeepAliveAny(hctx *rpc.HandlerContext, args tlsta
if err := a.checkShardConfiguration(args.Header.ShardReplica, args.Header.ShardReplicaTotal); err != nil {
a.mu.Unlock()
key := a.aggKey(nowUnix, format.BuiltinMetricIDAutoConfig, [16]int32{0, 0, 0, 0, format.TagValueIDAutoConfigErrorKeepAlive, args.Header.ShardReplica, args.Header.ShardReplicaTotal})
key = key.WithAgentEnvRouteArch(agentEnv, route, buildArch)
key.WithAgentEnvRouteArch(agentEnv, route, buildArch)
a.sh2.AddCounterHost(key, 1, host, format.BuiltinMetricMetaAutoConfig)
return err
}
Expand Down Expand Up @@ -588,8 +588,8 @@ func (a *Aggregator) handleSendKeepAliveAny(hctx *rpc.HandlerContext, args tlsta
s := aggBucket.lockShard(&lockedShard, 0)
// Counters can contain this metrics while # of contributors is 0. We compensate by adding small fixed budget.
key := a.aggKey(aggBucket.time, format.BuiltinMetricIDAggKeepAlive, [16]int32{})
keyWithAgentEnv := key.WithAgentEnvRouteArch(agentEnv, route, buildArch)
mi, _ := s.GetOrCreateMultiItem(keyWithAgentEnv, data_model.AggregatorStringTopCapacity, nil)
key.WithAgentEnvRouteArch(agentEnv, route, buildArch)
mi, _ := s.GetOrCreateMultiItem(key, data_model.AggregatorStringTopCapacity, nil)
mi.Tail.AddCounterHost(rng, 1, host)
aggBucket.lockShard(&lockedShard, -1)

Expand Down
4 changes: 2 additions & 2 deletions internal/aggregator/tags_mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func NewTagsMapper(agg *Aggregator, sh2 *agent.Agent, metricStorage *metajournal
keyValue, c, d, err := loader.GetTagMapping(ctx, askedKey, metricName, extra.Create)
key := ms.sh2.AggKey(0, format.BuiltinMetricIDAggMappingCreated, [16]int32{extra.ClientEnv, 0, 0, 0, metricID, c, extra.TagIDKey})
meta := format.BuiltinMetricMetaAggMappingCreated
key = key.WithAgentEnvRouteArch(extra.AgentEnv, extra.Route, extra.BuildArch)
key.WithAgentEnvRouteArch(extra.AgentEnv, extra.Route, extra.BuildArch)

if err != nil {
// TODO - write to actual log from time to time
Expand Down Expand Up @@ -181,7 +181,7 @@ func (ms *TagsMapper) handleCreateTagMapping(_ context.Context, hctx *rpc.Handle
route = int32(format.TagValueIDRouteIngressProxy)
}
key := ms.sh2.AggKey(0, format.BuiltinMetricIDAggMapping, [16]int32{0, 0, 0, 0, format.TagValueIDAggMappingMetaMetrics, format.TagValueIDAggMappingStatusOKCached})
key = key.WithAgentEnvRouteArch(agentEnv, route, buildArch)
key.WithAgentEnvRouteArch(agentEnv, route, buildArch)
meta := format.BuiltinMetricMetaAggMapping

r := ms.tagValue.GetCached(now, args.Key)
Expand Down
3 changes: 1 addition & 2 deletions internal/data_model/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,15 +132,14 @@ func (k *Key) Marshal(buffer []byte) (updatedBuffer []byte, newKey []byte) {
return
}

func (k *Key) WithAgentEnvRouteArch(agentEnvTag int32, routeTag int32, buildArchTag int32) *Key {
func (k *Key) WithAgentEnvRouteArch(agentEnvTag int32, routeTag int32, buildArchTag int32) {
// when aggregator receives metric from an agent inside another aggregator, those keys are already set,
// so we simply keep them. AgentEnvTag or RouteTag are always non-zero in this case.
if k.Tags[format.AgentEnvTag] == 0 {
k.Tags[format.AgentEnvTag] = agentEnvTag
k.Tags[format.RouteTag] = routeTag
k.Tags[format.BuildArchTag] = buildArchTag
}
return k
}

func AggKey(t uint32, m int32, k [format.MaxTags]int32, hostTagId int32, shardTag int32, replicaTag int32) *Key {
Expand Down
2 changes: 1 addition & 1 deletion internal/data_model/estimator.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (e *Estimator) Init(window int, maxCardinality int) {
e.halfHour = map[uint32]map[int32]*ChUnique{}
}

func (e *Estimator) UpdateWithKeys(time uint32, keys []*Key) {
func (e *Estimator) UpdateWithKeys(time uint32, keys []Key) {
e.mu.Lock()
defer e.mu.Unlock()
ah, bh := e.createEstimatorsLocked(time)
Expand Down
11 changes: 4 additions & 7 deletions internal/data_model/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,9 @@ func (k *Key) TagSlice() []int32 {
return result[:i]
}

func KeyFromStatshouseMultiItem(item *tlstatshouse.MultiItemBytes, bucketTimestamp uint32, newestTime uint32) (key *Key, shardID int, clampedTimestampTag int32) {
func KeyFromStatshouseMultiItem(item *tlstatshouse.MultiItemBytes, bucketTimestamp uint32, newestTime uint32) (key Key, shardID int, clampedTimestampTag int32) {
// We use high byte of fieldsmask to pass shardID to aggregator, otherwise it is too much work for CPU
shardID = int(item.FieldsMask >> 24)
key = &Key{}
key.Timestamp = bucketTimestamp
if item.IsSetT() {
key.Timestamp = item.T
Expand All @@ -45,14 +44,12 @@ func KeyFromStatshouseMultiItem(item *tlstatshouse.MultiItemBytes, bucketTimesta
key.Timestamp = bucketTimestamp - BelieveTimestampWindow
clampedTimestampTag = format.TagValueIDSrcIngestionStatusWarnTimestampClampedPast
}
// above checks can be moved below }, but they will always be NOP as bucketTimestamp is both <= newestTime and in beleive window
// above checks can be moved below }, but they will always be NOP as bucketTimestamp is both <= newestTime and in believe window
}
key.Metric = item.Metric
copy(key.Tags[:], item.Keys)
if item.IsSetSkeys() {
for i := range item.Skeys {
key.SetSTag(i, string(item.Skeys[i]))
}
for i, k := range item.Skeys {
key.SetSTag(i, string(k))
}
return
}
Expand Down
2 changes: 1 addition & 1 deletion internal/data_model/transfer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func TestKeySizeEstimationEdgeCases(t *testing.T) {
}

// Helper function to convert Key to MultiItemBytes and back
func roundTripKey(key Key, bucketTimestamp uint32, newestTime uint32) *Key {
func roundTripKey(key Key, bucketTimestamp uint32, newestTime uint32) Key {
// Convert Key to MultiItem
item := key.TLMultiItemFromKey(bucketTimestamp)

Expand Down

0 comments on commit e314edb

Please sign in to comment.