Skip to content

Commit

Permalink
Change MepKeyItemMultiItem signature to GetOrCreateMultiItem
Browse files Browse the repository at this point in the history
  • Loading branch information
razmser committed Nov 11, 2024
1 parent 58fe98f commit ed90552
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 70 deletions.
76 changes: 38 additions & 38 deletions internal/agent/agent_shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,8 @@ func (s *Shard) ApplyUnique(key data_model.Key, keyHash uint64, str []byte, hash
return
}
resolutionShard := s.resolutionShardFromHashLocked(&key, keyHash, metricInfo)
mi := resolutionShard.MapKeyItemMultiItem(key, s.config.StringTopCapacity, metricInfo, nil)
mv := mi.MapStringTopBytes(s.rng, str, count)
item, _ := resolutionShard.GetOrCreateMultiItem(key, s.config.StringTopCapacity, metricInfo)
mv := item.MapStringTopBytes(s.rng, str, count)
mv.ApplyUnique(s.rng, hashes, count, hostTag)
}

Expand All @@ -231,8 +231,8 @@ func (s *Shard) ApplyValues(key data_model.Key, keyHash uint64, str []byte, hist
return
}
resolutionShard := s.resolutionShardFromHashLocked(&key, keyHash, metricInfo)
mi := resolutionShard.MapKeyItemMultiItem(key, s.config.StringTopCapacity, metricInfo, nil)
mv := mi.MapStringTopBytes(s.rng, str, count)
item, _ := resolutionShard.GetOrCreateMultiItem(key, s.config.StringTopCapacity, metricInfo)
mv := item.MapStringTopBytes(s.rng, str, count)
mv.ApplyValues(s.rng, histogram, values, count, totalCount, hostTag, data_model.AgentPercentileCompression, metricInfo != nil && metricInfo.HasPercentiles)
}

Expand All @@ -246,8 +246,8 @@ func (s *Shard) ApplyCounter(key data_model.Key, keyHash uint64, str []byte, cou
return
}
resolutionShard := s.resolutionShardFromHashLocked(&key, keyHash, metricInfo)
mi := resolutionShard.MapKeyItemMultiItem(key, s.config.StringTopCapacity, metricInfo, nil)
mi.MapStringTopBytes(s.rng, str, count).AddCounterHost(s.rng, count, hostTag)
item, _ := resolutionShard.GetOrCreateMultiItem(key, s.config.StringTopCapacity, metricInfo)
item.MapStringTopBytes(s.rng, str, count).AddCounterHost(s.rng, count, hostTag)
}

func (s *Shard) AddCounterHost(key data_model.Key, keyHash uint64, count float64, hostTagId int32, metricInfo *format.MetricMetaValue) {
Expand All @@ -257,8 +257,8 @@ func (s *Shard) AddCounterHost(key data_model.Key, keyHash uint64, count float64
return
}
resolutionShard := s.resolutionShardFromHashLocked(&key, keyHash, metricInfo)
mi := resolutionShard.MapKeyItemMultiItem(key, s.config.StringTopCapacity, metricInfo, nil)
mi.Tail.AddCounterHost(s.rng, count, hostTagId)
item, _ := resolutionShard.GetOrCreateMultiItem(key, s.config.StringTopCapacity, metricInfo)
item.Tail.AddCounterHost(s.rng, count, hostTagId)
}

func (s *Shard) AddCounterHostStringBytes(key data_model.Key, keyHash uint64, str []byte, count float64, hostTag int32, metricInfo *format.MetricMetaValue) {
Expand All @@ -268,8 +268,8 @@ func (s *Shard) AddCounterHostStringBytes(key data_model.Key, keyHash uint64, st
return
}
resolutionShard := s.resolutionShardFromHashLocked(&key, keyHash, metricInfo)
mi := resolutionShard.MapKeyItemMultiItem(key, s.config.StringTopCapacity, metricInfo, nil)
mi.MapStringTopBytes(s.rng, str, count).AddCounterHost(s.rng, count, hostTag)
item, _ := resolutionShard.GetOrCreateMultiItem(key, s.config.StringTopCapacity, metricInfo)
item.MapStringTopBytes(s.rng, str, count).AddCounterHost(s.rng, count, hostTag)
}

func (s *Shard) AddValueCounterHostStringBytes(key data_model.Key, keyHash uint64, value float64, count float64, hostTag int32, str []byte, metricInfo *format.MetricMetaValue) {
Expand All @@ -279,8 +279,8 @@ func (s *Shard) AddValueCounterHostStringBytes(key data_model.Key, keyHash uint6
return
}
resolutionShard := s.resolutionShardFromHashLocked(&key, keyHash, metricInfo)
mi := resolutionShard.MapKeyItemMultiItem(key, s.config.StringTopCapacity, metricInfo, nil)
mi.MapStringTopBytes(s.rng, str, count).AddValueCounterHost(s.rng, value, count, hostTag)
item, _ := resolutionShard.GetOrCreateMultiItem(key, s.config.StringTopCapacity, metricInfo)
item.MapStringTopBytes(s.rng, str, count).AddValueCounterHost(s.rng, value, count, hostTag)
}

func (s *Shard) AddValueCounterHost(key data_model.Key, keyHash uint64, value float64, counter float64, hostTag int32, metricInfo *format.MetricMetaValue) {
Expand All @@ -290,11 +290,11 @@ func (s *Shard) AddValueCounterHost(key data_model.Key, keyHash uint64, value fl
return
}
resolutionShard := s.resolutionShardFromHashLocked(&key, keyHash, metricInfo)
mi := resolutionShard.MapKeyItemMultiItem(key, s.config.StringTopCapacity, metricInfo, nil)
item, _ := resolutionShard.GetOrCreateMultiItem(key, s.config.StringTopCapacity, metricInfo)
if metricInfo != nil && metricInfo.HasPercentiles {
mi.Tail.AddValueCounterHostPercentile(s.rng, value, counter, hostTag, data_model.AgentPercentileCompression)
item.Tail.AddValueCounterHostPercentile(s.rng, value, counter, hostTag, data_model.AgentPercentileCompression)
} else {
mi.Tail.Value.AddValueCounterHost(s.rng, value, counter, hostTag)
item.Tail.Value.AddValueCounterHost(s.rng, value, counter, hostTag)
}
}

Expand All @@ -305,11 +305,11 @@ func (s *Shard) AddValueArrayCounterHost(key data_model.Key, keyHash uint64, val
return
}
resolutionShard := s.resolutionShardFromHashLocked(&key, keyHash, metricInfo)
mi := resolutionShard.MapKeyItemMultiItem(key, s.config.StringTopCapacity, metricInfo, nil)
item, _ := resolutionShard.GetOrCreateMultiItem(key, s.config.StringTopCapacity, metricInfo)
if metricInfo != nil && metricInfo.HasPercentiles {
mi.Tail.AddValueArrayHostPercentile(s.rng, values, mult, hostTag, data_model.AgentPercentileCompression)
item.Tail.AddValueArrayHostPercentile(s.rng, values, mult, hostTag, data_model.AgentPercentileCompression)
} else {
mi.Tail.Value.AddValueArrayHost(s.rng, values, mult, hostTag)
item.Tail.Value.AddValueArrayHost(s.rng, values, mult, hostTag)
}
}

Expand All @@ -320,24 +320,24 @@ func (s *Shard) AddValueArrayCounterHostStringBytes(key data_model.Key, keyHash
return
}
resolutionShard := s.resolutionShardFromHashLocked(&key, keyHash, metricInfo)
mi := resolutionShard.MapKeyItemMultiItem(key, s.config.StringTopCapacity, metricInfo, nil)
item, _ := resolutionShard.GetOrCreateMultiItem(key, s.config.StringTopCapacity, metricInfo)
count := float64(len(values)) * mult
if metricInfo != nil && metricInfo.HasPercentiles {
mi.MapStringTopBytes(s.rng, str, count).AddValueArrayHostPercentile(s.rng, values, mult, hostTag, data_model.AgentPercentileCompression)
item.MapStringTopBytes(s.rng, str, count).AddValueArrayHostPercentile(s.rng, values, mult, hostTag, data_model.AgentPercentileCompression)
} else {
mi.MapStringTopBytes(s.rng, str, count).Value.AddValueArrayHost(s.rng, values, mult, hostTag)
item.MapStringTopBytes(s.rng, str, count).Value.AddValueArrayHost(s.rng, values, mult, hostTag)
}
}

func (s *Shard) MergeItemValue(key data_model.Key, keyHash uint64, item *data_model.ItemValue, metricInfo *format.MetricMetaValue) {
func (s *Shard) MergeItemValue(key data_model.Key, keyHash uint64, itemValue *data_model.ItemValue, metricInfo *format.MetricMetaValue) {
s.mu.Lock()
defer s.mu.Unlock()
if s.stopReceivingIncomingData {
return
}
resolutionShard := s.resolutionShardFromHashLocked(&key, keyHash, metricInfo)
mi := resolutionShard.MapKeyItemMultiItem(key, s.config.StringTopCapacity, metricInfo, nil)
mi.Tail.Value.Merge(s.rng, item)
item, _ := resolutionShard.GetOrCreateMultiItem(key, s.config.StringTopCapacity, metricInfo)
item.Tail.Value.Merge(s.rng, itemValue)
}

func (s *Shard) addBuiltInsLocked(nowUnix uint32) {
Expand All @@ -348,8 +348,8 @@ func (s *Shard) addBuiltInsLocked(nowUnix uint32) {
for _, v := range s.BuiltInItemValues {
v.mu.Lock()
if v.value.Count() > 0 {
mi := resolutionShard.MapKeyItemMultiItem(v.key, s.config.StringTopCapacity, nil, nil)
mi.Tail.Value.Merge(s.rng, &v.value)
item, _ := resolutionShard.GetOrCreateMultiItem(v.key, s.config.StringTopCapacity, nil)
item.Tail.Value.Merge(s.rng, &v.value)
v.value = data_model.ItemValue{} // Moving below 'if' would reset Counter if <0. Will complicate debugging, so no.
}
v.mu.Unlock()
Expand All @@ -366,8 +366,8 @@ func (s *Shard) addBuiltInsLocked(nowUnix uint32) {
// standard metrics do not allow this, but heartbeats are magic.
writeJournalVersion := func(version int64, hash string, hashTag int32, count float64) {
key := s.agent.AggKey(resolutionShard.Time, format.BuiltinMetricIDJournalVersions, [format.MaxTags]int32{0, s.agent.componentTag, 0, 0, 0, int32(version), hashTag})
mi := resolutionShard.MapKeyItemMultiItem(key, s.config.StringTopCapacity, nil, nil)
mi.MapStringTop(s.rng, hash, count).AddCounterHost(s.rng, count, 0)
item, _ := resolutionShard.GetOrCreateMultiItem(key, s.config.StringTopCapacity, nil)
item.MapStringTop(s.rng, hash, count).AddCounterHost(s.rng, count, 0)
}
if s.agent.metricStorage != nil { // nil only on ingress proxy for now
metricJournalVersion := s.agent.metricStorage.Version()
Expand Down Expand Up @@ -401,12 +401,12 @@ func (s *Shard) addBuiltInsLocked(nowUnix uint32) {
sysTime := float64(s.agent.rUsage.Stime.Nano()-prevRUsage.Stime.Nano()) / float64(time.Second)

key := s.agent.AggKey(resolutionShard.Time, format.BuiltinMetricIDUsageCPU, [format.MaxTags]int32{0, s.agent.componentTag, format.TagValueIDCPUUsageUser})
mi := resolutionShard.MapKeyItemMultiItem(key, s.config.StringTopCapacity, nil, nil)
mi.Tail.AddValueCounterHost(s.rng, userTime, 1, 0)
item, _ := resolutionShard.GetOrCreateMultiItem(key, s.config.StringTopCapacity, nil)
item.Tail.AddValueCounterHost(s.rng, userTime, 1, 0)

key = s.agent.AggKey(resolutionShard.Time, format.BuiltinMetricIDUsageCPU, [format.MaxTags]int32{0, s.agent.componentTag, format.TagValueIDCPUUsageSys})
mi = resolutionShard.MapKeyItemMultiItem(key, s.config.StringTopCapacity, nil, nil)
mi.Tail.AddValueCounterHost(s.rng, sysTime, 1, 0)
item, _ = resolutionShard.GetOrCreateMultiItem(key, s.config.StringTopCapacity, nil)
item.Tail.AddValueCounterHost(s.rng, sysTime, 1, 0)

if nowUnix%60 != 0 {
// IF we sample once per minute, we do it right before sending to reduce latency
Expand All @@ -423,8 +423,8 @@ func (s *Shard) addBuiltInsLocked(nowUnix uint32) {
}

key = s.agent.AggKey(resolutionShard.Time, format.BuiltinMetricIDUsageMemory, [format.MaxTags]int32{0, s.agent.componentTag})
mi = resolutionShard.MapKeyItemMultiItem(key, s.config.StringTopCapacity, nil, nil)
mi.Tail.AddValueCounterHost(s.rng, rss, 60, 0)
item, _ = resolutionShard.GetOrCreateMultiItem(key, s.config.StringTopCapacity, nil)
item.Tail.AddValueCounterHost(s.rng, rss, 60, 0)

s.addBuiltInsHeartbeatsLocked(resolutionShard, nowUnix, 60) // heartbeat once per minute
}
Expand All @@ -433,11 +433,11 @@ func (s *Shard) addBuiltInsHeartbeatsLocked(resolutionShard *data_model.MetricsB
uptimeSec := float64(nowUnix - s.agent.startTimestamp)

key := s.agent.AggKey(resolutionShard.Time, format.BuiltinMetricIDHeartbeatVersion, [format.MaxTags]int32{0, s.agent.componentTag, s.agent.heartBeatEventType})
mi := resolutionShard.MapKeyItemMultiItem(key, s.config.StringTopCapacity, nil, nil)
mi.MapStringTop(s.rng, build.Commit(), count).AddValueCounterHost(s.rng, uptimeSec, count, 0)
item, _ := resolutionShard.GetOrCreateMultiItem(key, s.config.StringTopCapacity, nil)
item.MapStringTop(s.rng, build.Commit(), count).AddValueCounterHost(s.rng, uptimeSec, count, 0)

// we send format.BuiltinMetricIDHeartbeatArgs only. Args1, Args2, Args3 are deprecated
key = s.agent.AggKey(resolutionShard.Time, format.BuiltinMetricIDHeartbeatArgs, [format.MaxTags]int32{0, s.agent.componentTag, s.agent.heartBeatEventType, s.agent.argsHash, 0, 0, 0, 0, 0, s.agent.argsLen})
mi = resolutionShard.MapKeyItemMultiItem(key, s.config.StringTopCapacity, nil, nil)
mi.MapStringTop(s.rng, s.agent.args, count).AddValueCounterHost(s.rng, uptimeSec, count, 0)
item, _ = resolutionShard.GetOrCreateMultiItem(key, s.config.StringTopCapacity, nil)
item.MapStringTop(s.rng, s.agent.args, count).AddValueCounterHost(s.rng, uptimeSec, count, 0)
}
20 changes: 10 additions & 10 deletions internal/agent/agent_shard_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (s *Shard) flushBuckets(now time.Time) {
Metric: format.BuiltinMetricIDTimingErrors,
Tags: [format.MaxTags]int32{0, format.TagValueIDTimingMissedSecondsAgent},
}
mi := b.MapKeyItemMultiItem(key, s.config.StringTopCapacity, nil, nil)
mi, _ := b.GetOrCreateMultiItem(key, s.config.StringTopCapacity, nil)
mi.Tail.AddValueCounterHost(s.rng, float64(currentTimeRounded+uint32(r)-b.Time), 1, 0) // values record jumps f more than 1 second
}
b.Time = currentTimeRounded + uint32(r)
Expand Down Expand Up @@ -240,7 +240,7 @@ func (s *Shard) mergeBuckets(rng *rand.Rand, bucket *data_model.MetricsBucket, b
}
for _, b := range buckets {
for _, item := range b.MultiItems {
mi := bucket.MapKeyItemMultiItem(item.Key, stringTopCapacity, nil, nil)
mi, _ := bucket.GetOrCreateMultiItem(item.Key, stringTopCapacity, nil)
mi.Merge(rng, item)
}
}
Expand Down Expand Up @@ -295,25 +295,25 @@ func (s *Shard) sampleBucket(bucket *data_model.MetricsBucket, rnd *rand.Rand) [
for _, v := range sampler.MetricGroups {
// keep bytes
key := data_model.Key{Metric: format.BuiltinMetricIDSrcSamplingSizeBytes, Tags: [format.MaxTags]int32{0, s.agent.componentTag, format.TagValueIDSamplingDecisionKeep, v.NamespaceID, v.GroupID, v.MetricID}}
mi := bucket.MapKeyItemMultiItem(key, config.StringTopCapacity, nil, nil)
mi.Tail.Value.Merge(rnd, &v.SumSizeKeep)
item, _ := bucket.GetOrCreateMultiItem(key, config.StringTopCapacity, nil)
item.Tail.Value.Merge(rnd, &v.SumSizeKeep)
// discard bytes
key = data_model.Key{Metric: format.BuiltinMetricIDSrcSamplingSizeBytes, Tags: [format.MaxTags]int32{0, s.agent.componentTag, format.TagValueIDSamplingDecisionDiscard, v.NamespaceID, v.GroupID, v.MetricID}}
mi = bucket.MapKeyItemMultiItem(key, config.StringTopCapacity, nil, nil)
mi.Tail.Value.Merge(rnd, &v.SumSizeDiscard)
item, _ = bucket.GetOrCreateMultiItem(key, config.StringTopCapacity, nil)
item.Tail.Value.Merge(rnd, &v.SumSizeDiscard)
// budget
key = data_model.Key{Metric: format.BuiltinMetricIDSrcSamplingGroupBudget, Tags: [format.MaxTags]int32{0, s.agent.componentTag, v.NamespaceID, v.GroupID}}
item := bucket.MapKeyItemMultiItem(key, config.StringTopCapacity, nil, nil)
item, _ = bucket.GetOrCreateMultiItem(key, config.StringTopCapacity, nil)
item.Tail.Value.AddValue(v.Budget())
}
// report budget used
budgetKey := data_model.Key{Metric: format.BuiltinMetricIDSrcSamplingBudget, Tags: [format.MaxTags]int32{0, s.agent.componentTag}}
budgetItem := bucket.MapKeyItemMultiItem(budgetKey, config.StringTopCapacity, nil, nil)
budgetItem, _ := bucket.GetOrCreateMultiItem(budgetKey, config.StringTopCapacity, nil)
budgetItem.Tail.Value.AddValue(float64(remainingBudget))
// metric count
key := data_model.Key{Metric: format.BuiltinMetricIDSrcSamplingMetricCount, Tags: [format.MaxTags]int32{0, s.agent.componentTag}}
mi := bucket.MapKeyItemMultiItem(key, config.StringTopCapacity, nil, nil)
mi.Tail.Value.AddValueCounterHost(rnd, float64(sampler.MetricCount), 1, 0)
item, _ := bucket.GetOrCreateMultiItem(key, config.StringTopCapacity, nil)
item.Tail.Value.AddValueCounterHost(rnd, float64(sampler.MetricCount), 1, 0)
return sampler.SampleFactors
}

Expand Down
12 changes: 7 additions & 5 deletions internal/aggregator/aggregator_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,8 +353,7 @@ func (a *Aggregator) handleSendSourceBucket2(_ context.Context, hctx *rpc.Handle
}
}
s := aggBucket.lockShard(&lockedShard, sID)
created := false
mi := s.MapKeyItemMultiItem(k, data_model.AggregatorStringTopCapacity, nil, &created)
mi, created := s.GetOrCreateMultiItem(k, data_model.AggregatorStringTopCapacity, nil)
mi.MergeWithTLMultiItem(rng, &item, hostTagId)
if created {
if !args.IsSetSpare() { // Data from spares should not affect cardinality estimations
Expand Down Expand Up @@ -391,7 +390,8 @@ func (a *Aggregator) handleSendSourceBucket2(_ context.Context, hctx *rpc.Handle
getMultiItem := func(t uint32, m int32, keys [16]int32) *data_model.MultiItem {
key := a.aggKey(t, m, keys)
key = key.WithAgentEnvRouteArch(agentEnv, route, buildArch)
return s.MapKeyItemMultiItem(key, data_model.AggregatorStringTopCapacity, nil, nil)
mi, _ := s.GetOrCreateMultiItem(key, data_model.AggregatorStringTopCapacity, nil)
return mi
}
getMultiItem(args.Time, format.BuiltinMetricIDAggSizeCompressed, [16]int32{0, 0, 0, 0, conveyor, spare}).Tail.AddValueCounterHost(rng, float64(len(hctx.Request)), 1, hostTagId)

Expand Down Expand Up @@ -439,7 +439,8 @@ func (a *Aggregator) handleSendSourceBucket2(_ context.Context, hctx *rpc.Handle
}

ingestionStatus := func(env int32, metricID int32, status int32, value float32) {
s.MapKeyItemMultiItem((data_model.Key{Timestamp: args.Time, Metric: format.BuiltinMetricIDIngestionStatus, Tags: [16]int32{env, metricID, status}}).WithAgentEnvRouteArch(agentEnv, route, buildArch), data_model.AggregatorStringTopCapacity, nil, nil).Tail.AddCounterHost(rng, float64(value), hostTagId)
mi, _ := s.GetOrCreateMultiItem((data_model.Key{Timestamp: args.Time, Metric: format.BuiltinMetricIDIngestionStatus, Tags: [16]int32{env, metricID, status}}).WithAgentEnvRouteArch(agentEnv, route, buildArch), data_model.AggregatorStringTopCapacity, nil)
mi.Tail.AddCounterHost(rng, float64(value), hostTagId)
}
for _, v := range bucket.IngestionStatusOk {
// We do not split by aggregator, because this metric is taking already too much space - about 1% of all data
Expand Down Expand Up @@ -510,7 +511,8 @@ func (a *Aggregator) handleSendKeepAlive2(_ context.Context, hctx *rpc.HandlerCo
// Counters can contain this metrics while # of contributors is 0. We compensate by adding small fixed budget.
key := a.aggKey(aggBucket.time, format.BuiltinMetricIDAggKeepAlive, [16]int32{})
key = key.WithAgentEnvRouteArch(agentEnv, route, buildArch)
s.MapKeyItemMultiItem(key, data_model.AggregatorStringTopCapacity, nil, nil).Tail.AddCounterHost(rng, 1, host)
mi, _ := s.GetOrCreateMultiItem(key, data_model.AggregatorStringTopCapacity, nil)
mi.Tail.AddCounterHost(rng, 1, host)
aggBucket.lockShard(&lockedShard, -1)

return errHijack
Expand Down
10 changes: 4 additions & 6 deletions internal/data_model/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,24 +254,22 @@ func (b *MetricsBucket) Empty() bool {
return len(b.MultiItems) == 0
}

func (b *MultiItemMap) MapKeyItemMultiItem(key Key, stringTopCapacity int, metricInfo *format.MetricMetaValue, created *bool) *MultiItem {
func (b *MultiItemMap) GetOrCreateMultiItem(key Key, stringTopCapacity int, metricInfo *format.MetricMetaValue) (item *MultiItem, created bool) {
if b.MultiItems == nil {
b.MultiItems = make(map[string]*MultiItem)
}
var keyBytes []byte
b.keysBuffer, keyBytes = key.Marshal(b.keysBuffer)
keyString := unsafe.String(unsafe.SliceData(keyBytes), len(keyBytes))
item, ok := b.MultiItems[keyString]
if created != nil {
*created = !ok
}
created = !ok
if ok {
b.keysBuffer = b.keysBuffer[:len(b.keysBuffer)-len(keyBytes)]
return item
return
}
item = &MultiItem{Key: key, Capacity: stringTopCapacity, SF: 1, MetricMeta: metricInfo}
b.MultiItems[keyString] = item
return item
return
}

func (b *MultiItemMap) DeleteMultiItem(key *Key) {
Expand Down
3 changes: 2 additions & 1 deletion internal/data_model/estimator.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ func (e *Estimator) ReportHourCardinality(rng *rand.Rand, time uint32, miMap *Mu
// we cannot implement this, so we multiply by # of shards, expecting uniform load (which is wrong if skip shards option is given to agents)
// so avg() of this metric shows full estimate
key := AggKey((time/60)*60, format.BuiltinMetricIDAggHourCardinality, [format.MaxTags]int32{0, 0, 0, 0, k}, aggregatorHost, shardKey, replicaKey)
miMap.MapKeyItemMultiItem(key, AggregatorStringTopCapacity, nil, nil).Tail.AddValueCounterHost(rng, cardinality, 1, aggregatorHost)
item, _ := miMap.GetOrCreateMultiItem(key, AggregatorStringTopCapacity, nil)
item.Tail.AddValueCounterHost(rng, cardinality, 1, aggregatorHost)
}
}

Expand Down
Loading

0 comments on commit ed90552

Please sign in to comment.