Skip to content

Commit

Permalink
new original hash
Browse files Browse the repository at this point in the history
  • Loading branch information
hrissan committed Jan 22, 2025
1 parent 5a0a9ff commit 2d0d07f
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 6 deletions.
4 changes: 4 additions & 0 deletions internal/agent/agent_mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func (s *Agent) mapAllTags(h *data_model.MappedMetricHeader, metric *tlstatshous
}
if tagMeta.Index == format.StringTopTagIndex || tagMeta.Index == format.StringTopTagIndexV3 {
// "_s" is alternative/legacy name for "47". We always have "top" function set for this tag.
// This tag is not part of resolution hash, so not placed into OriginalTagValues
// TODO - after old conveyor removed, we can simplify this code by setting tagMeta.Index to 47 for "_s"
// also we will remove IsSKeySet and use IsTagSet[47] automatically instead
h.TopValue = tagValue
Expand All @@ -71,6 +72,9 @@ func (s *Agent) mapAllTags(h *data_model.MappedMetricHeader, metric *tlstatshous
} else {
h.SetSTag(tagMeta.Index, tagValue.S, tagIDKey) // TODO - remove allocation here
}
if tagMeta.Index != format.HostTagIndex { // This tag is not part of resolution hash, so not placed into OriginalTagValues
h.OriginalTagValues[tagMeta.Index] = v.Value
}
}
}

Expand Down
54 changes: 51 additions & 3 deletions internal/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
package agent

import (
"encoding/binary"
"fmt"
"os"
"sync"
"testing"
"time"
Expand All @@ -20,6 +22,49 @@ import (
"github.com/vkcom/statshouse/internal/format"
)

var sideEffect uint64

// cpu: 13th Gen Intel(R) Core(TM) i7-1360P
// Benchmark_Original_Hash-16 10591867 109.1 ns/op 0 B/op 0 allocs/op
func Benchmark_Original_Hash(b *testing.B) {
var m data_model.MappedMetricHeader
m.OriginalTagValues[0] = []byte("production")
m.OriginalTagValues[1] = []byte(os.Args[0])
m.OriginalTagValues[2] = []byte("short")
m.OriginalTagValues[3] = []byte("tags")
m.OriginalTagValues[14] = []byte("AAAA")
var scratch []byte
metricInfo := &format.MetricMetaValue{MetricID: 1}
m.MetricMeta = metricInfo
b.ReportAllocs()
for i := 0; i < b.N; i++ {
binary.LittleEndian.PutUint32(m.OriginalTagValues[14], uint32(i))
var sum uint64
scratch, sum = m.OriginalHash(scratch)
sideEffect += sum
}
}

// cpu: 13th Gen Intel(R) Core(TM) i7-1360P
// Benchmark_Original_Marshal-16 12579855 88.82 ns/op 0 B/op 0 allocs/op
func Benchmark_Original_Marshal(b *testing.B) {
var m data_model.MappedMetricHeader
m.OriginalTagValues[0] = []byte("production")
m.OriginalTagValues[1] = []byte(os.Args[0])
m.OriginalTagValues[2] = []byte("short")
m.OriginalTagValues[3] = []byte("tags")
m.OriginalTagValues[14] = []byte("AAAA")
var scratch []byte
metricInfo := &format.MetricMetaValue{MetricID: 1}
m.MetricMeta = metricInfo
b.ReportAllocs()
for i := 0; i < b.N; i++ {
binary.LittleEndian.PutUint32(m.OriginalTagValues[14], uint32(i))
scratch = m.OriginalMarshalAppend(scratch[:0])
sideEffect += uint64(len(scratch))
}
}

func Benchmark_Hash(b *testing.B) {
var k data_model.Key
var result uint64
Expand All @@ -31,13 +76,13 @@ func Benchmark_Hash(b *testing.B) {
}
}

// cpu: Apple M3 Pro
// Benchmark_Hash-12 41562019 28.94 ns/op 0 B/op 0 allocs/op
// Benchmark_XXHash-12 35386051 30.75 ns/op 0 B/op 0 allocs/op
// cpu: 13th Gen Intel(R) Core(TM) i7-1360P
// Benchmark_XXHash-16 10919467 102.8 ns/op 0 B/op 0 allocs/op
func Benchmark_XXHash(b *testing.B) {
var k data_model.Key
var hash, result uint64
var buf []byte
k.STags[5] = "really"
b.ReportAllocs()
for i := 0; i < b.N; i++ {
k.Tags[14]++
Expand All @@ -47,10 +92,13 @@ func Benchmark_XXHash(b *testing.B) {
}
}

// cpu: 13th Gen Intel(R) Core(TM) i7-1360P
// Benchmark_Marshal-16 12839154 82.68 ns/op 0 B/op 0 allocs/op
func Benchmark_Marshal(b *testing.B) {
var k data_model.Key
var result int
var buf []byte
k.STags[5] = "really"
b.ReportAllocs()
for i := 0; i < b.N; i++ {
k.Tags[14]++
Expand Down
4 changes: 1 addition & 3 deletions internal/data_model/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,7 @@ func (k *Key) Hash() uint64 {

func (k *Key) XXHash(b []byte) ([]byte, uint64) {
b, _ = k.Marshal(b)
digest := xxhash.New()
digest.Write(b[4:]) // skip timestamp in first 4 bytes
return b, digest.Sum64()
return b, xxhash.Sum64(b[4:]) // skip timestamp in first 4 bytes
}

func SimpleItemValue(value float64, count float64, hostTagId int32) ItemValue {
Expand Down
30 changes: 30 additions & 0 deletions internal/data_model/mapped_metric_header.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
package data_model

import (
"encoding/binary"
"time"

"github.com/cespare/xxhash/v2"
"github.com/vkcom/statshouse/internal/data_model/gen2/tlstatshouse"
"github.com/vkcom/statshouse/internal/format"
)
Expand All @@ -32,6 +34,10 @@ type MappedMetricHeader struct {
CheckedTagIndex int // we check tags one by one, remembering position here, between invocations of mapTags
ValuesChecked bool // infs, nans, etc. This might be expensive, so done only once

OriginalTagValues [format.NewMaxTags][]byte
// original strings values as sent by user. Hash of those is stable between agents independent of
// mappings, so used as a resolution hash to deterministically place same rows into same resolution buckets

IsTagSet [format.MaxTags]bool // report setting tags more than once.
IsSKeySet bool
IsHKeySet bool
Expand Down Expand Up @@ -89,3 +95,27 @@ func (h *MappedMetricHeader) SetInvalidString(ingestionStatus int32, tagIDKey in
h.IngestionTagKey = tagIDKey
h.InvalidString = invalidString
}

func (h *MappedMetricHeader) OriginalMarshalAppend(buffer []byte) []byte {
// format: [metric_id] [tagsCount] [tag0] [0] [tag1] [0] [tag2] [0] ...
// timestamp is not part of the hash.
// metric is part of the hash so agents can use this marshalling in the future as a key to MultiItems.
// empty tags suffix is not part of the hash so that # of tags can be increased without changing hash.
// tagsCount is explicit, so we can add more fields to the right of tags if we need them.
// we use separator between tags instead of tag length, so we can increase tags length beyond 1 byte without using varint
tagsCount := len(h.OriginalTagValues) // ignore empty tags
for ; tagsCount > 0 && len(h.OriginalTagValues[tagsCount-1]) == 0; tagsCount-- {
}
buffer = binary.LittleEndian.AppendUint32(buffer, uint32(h.MetricMeta.MetricID))
buffer = append(buffer, byte(tagsCount))
for _, v := range h.OriginalTagValues[:tagsCount] {
buffer = append(buffer, v...)
buffer = append(buffer, 0) // terminator
}
return buffer
}

func (h *MappedMetricHeader) OriginalHash(scratch []byte) ([]byte, uint64) {
scratch = h.OriginalMarshalAppend(scratch[:0])
return scratch, xxhash.Sum64(scratch)
}

0 comments on commit 2d0d07f

Please sign in to comment.