Skip to content

Commit

Permalink
feat: Flatten OTLP kv lists
Browse files Browse the repository at this point in the history
  • Loading branch information
MikeGoldsmith committed Feb 21, 2024
1 parent 476a3fd commit 92c8d7c
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 105 deletions.
81 changes: 47 additions & 34 deletions otlp/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ const (
gRPCAcceptEncodingHeader = "grpc-accept-encoding"
defaultServiceName = "unknown_service"
unknownLogSource = "unknown_log_source"

// maxDepth is the maximum depth of a nested kvlist attribute that will be flattened.
// If the depth is exceeded, the attribute should be added as a JSON string.
maxDepth = 5
)

// fieldSizeMax is the maximum size of a field that will be accepted by honeycomb.
Expand Down Expand Up @@ -254,30 +258,24 @@ func getValueFromMetadata(md metadata.MD, key string) string {
return ""
}

func addAttributesToMap(attrs map[string]interface{}, attributes []*common.KeyValue) {
// AddAttributesToMap adds attributes to a map, extracting the underlying attribute data type.
// Supported types are string, bool, double, int, bytes, array, and kvlist.
// kvlist attributes are flattened to a depth of (maxDepth), if the depth is exceeded, the attribute is added as a JSON string.
// Bytes and array values are added as JSON strings.
func AddAttributesToMap(attrs map[string]interface{}, attributes []*common.KeyValue) {
for _, attr := range attributes {
// ignore entries if the key is empty or value is nil
if attr.Key == "" || attr.Value == nil {
continue
}
if val, truncatedBytes := getValue(attr.Value); val != nil {
attrs[attr.Key] = val
if truncatedBytes != 0 {
// if we trim a field, add telemetry about it; because we trim at 64K and
// a whole span can't be more than 100K, this can't happen more than once
// for a single span. If we ever change those limits, this will need to
// become additive.
attrs["meta.truncated_bytes"] = val
attrs["meta.truncated_field"] = attr.Key
}
}
addAttributeToMap(attrs, attr.Key, attr.Value, 0)
}
}

func getResourceAttributes(resource *resource.Resource) map[string]interface{} {
attrs := map[string]interface{}{}
if resource != nil {
addAttributesToMap(attrs, resource.Attributes)
AddAttributesToMap(attrs, resource.Attributes)
}
return attrs
}
Expand All @@ -294,7 +292,7 @@ func getScopeAttributes(scope *common.InstrumentationScope) map[string]interface
if scope.Version != "" {
attrs["library.version"] = scope.Version
}
addAttributesToMap(attrs, scope.Attributes)
AddAttributesToMap(attrs, scope.Attributes)
}
return attrs
}
Expand Down Expand Up @@ -402,33 +400,48 @@ func getMarshallableValue(value *common.AnyValue) interface{} {
return nil
}

// This function returns a value that can be handled by Honeycomb -- it must be one of:
// string, int, bool, float. All other values are converted to strings containing JSON.
func getValue(value *common.AnyValue) (result interface{}, truncatedBytes int) {
// addAttributeToMap adds an attribute to a map, extracting the underlying attribute data type.
// Supported types are string, bool, double, int, bytes, array, and kvlist.
// kvlist attributes are flattened to a depth of (maxDepth), if the depth is exceeded, the attribute is added as a JSON string.
// Bytes and array values are added as JSON strings.
func addAttributeToMap(result map[string]interface{}, key string, value *common.AnyValue, depth int) {
switch value.Value.(type) {
case *common.AnyValue_StringValue:
return value.GetStringValue(), 0
result[key] = value.GetStringValue()
case *common.AnyValue_BoolValue:
return value.GetBoolValue(), 0
result[key] = value.GetBoolValue()
case *common.AnyValue_DoubleValue:
return value.GetDoubleValue(), 0
result[key] = value.GetDoubleValue()
case *common.AnyValue_IntValue:
return value.GetIntValue(), 0
// These types are all be marshalled to a string after conversion to Honeycomb-safe values.
// We use our limitedWriter to ensure that the string can't be bigger than the allowable,
// and it also minimizes allocations.
// Note that an Encoder emits JSON with a trailing newline because it's intended for use
// in streaming. This is correct but sometimes surprising and the tests need to expect it.
case *common.AnyValue_ArrayValue, *common.AnyValue_KvlistValue, *common.AnyValue_BytesValue:
arr := getMarshallableValue(value)
w := newLimitedWriter(fieldSizeMax)
enc := json.NewEncoder(w)
err := enc.Encode(arr)
if err == nil {
return w.String(), w.truncatedBytes
result[key] = value.GetIntValue()
case *common.AnyValue_BytesValue, *common.AnyValue_ArrayValue:
addAttributeToMapAsJson(result, key, value)
case *common.AnyValue_KvlistValue:
for _, entry := range value.GetKvlistValue().Values {
k := key + "." + entry.Key
if depth < maxDepth {
addAttributeToMap(result, k, entry.Value, depth+1)
} else {
addAttributeToMapAsJson(result, k, entry.Value)
}
}
}
return nil, 0
}

// addAttributeToMapAsJson adds an attribute to a map as a JSON string.
// Uses limitedWriter to ensure that the string can't be bigger than the maximum field size and
// helps reduce allocation and copying.
// Note that an Encoder emits JSON with a trailing newline because it's intended for use
// in streaming. This is correct but sometimes surprising and the tests need to expect it.
func addAttributeToMapAsJson(attrs map[string]interface{}, key string, value *common.AnyValue) int {
val := getMarshallableValue(value)
w := newLimitedWriter(fieldSizeMax)
if err := json.NewEncoder(w).Encode(val); err != nil {
// TODO: log error or report error when we have a way to do so
return 0
}
attrs[key] = w.String()
return w.truncatedBytes
}

func parseOtlpRequestBody(body io.ReadCloser, contentType string, contentEncoding string, request protoreflect.ProtoMessage) error {
Expand Down
83 changes: 38 additions & 45 deletions otlp/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"io"
"net/http"
"net/http/httptest"
"reflect"
"strings"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -52,48 +50,43 @@ func TestParseHttpHeadersIntoRequestInfo(t *testing.T) {

func TestAddAttributesToMap(t *testing.T) {
testCases := []struct {
key string
expected interface{}
attribute *common.KeyValue
}{
{
key: "str-attr",
expected: "str-value",
expected: map[string]interface{}{"str-attr": "str-value"},
attribute: &common.KeyValue{
Key: "str-attr", Value: &common.AnyValue{Value: &common.AnyValue_StringValue{StringValue: "str-value"}},
},
},
{
key: "int-attr",
expected: int64(123),
expected: map[string]interface{}{"int-attr": int64(123)},
attribute: &common.KeyValue{
Key: "int-attr", Value: &common.AnyValue{Value: &common.AnyValue_IntValue{IntValue: 123}},
},
},
{
key: "double-attr",
expected: float64(12.3),
expected: map[string]interface{}{"double-attr": float64(12.3)},
attribute: &common.KeyValue{
Key: "double-attr", Value: &common.AnyValue{Value: &common.AnyValue_DoubleValue{DoubleValue: 12.3}},
},
},
{
key: "bool-attr",
expected: true,
expected: map[string]interface{}{"bool-attr": true},
attribute: &common.KeyValue{
Key: "bool-attr", Value: &common.AnyValue{Value: &common.AnyValue_BoolValue{BoolValue: true}},
},
},
{
key: "empty-key",
expected: nil,
expected: map[string]interface{}{},
attribute: &common.KeyValue{
Key: "", Value: &common.AnyValue{Value: &common.AnyValue_StringValue{StringValue: "str-value"}},
},
},
{
key: "array-attr",
expected: "[\"one\",true,3]\n",
expected: map[string]interface{}{
"array-attr": "[\"one\",true,3]\n",
},
attribute: &common.KeyValue{
Key: "array-attr", Value: &common.AnyValue{Value: &common.AnyValue_ArrayValue{ArrayValue: &common.ArrayValue{
Values: []*common.AnyValue{
Expand All @@ -107,16 +100,15 @@ func TestAddAttributesToMap(t *testing.T) {
// that functionality is more completely tested by Test_getValue(). The case of a nested map will fail
// badly in the way this test is structured, so we don't do maps at all here.
{
key: "nil-value-attr",
expected: nil,
expected: map[string]interface{}{},
attribute: &common.KeyValue{Key: "kv-attr", Value: nil},
},
}

for _, tc := range testCases {
attrs := map[string]interface{}{}
addAttributesToMap(attrs, []*common.KeyValue{tc.attribute})
assert.Equal(t, tc.expected, attrs[tc.key])
AddAttributesToMap(attrs, []*common.KeyValue{tc.attribute})
assert.Equal(t, tc.expected, attrs)
}
}

Expand Down Expand Up @@ -334,25 +326,34 @@ func Test_getValue(t *testing.T) {
value *common.AnyValue
want interface{}
}{
{"int64", &common.AnyValue{Value: &common.AnyValue_IntValue{IntValue: 123}}, int64(123)},
{"bool", &common.AnyValue{Value: &common.AnyValue_BoolValue{BoolValue: true}}, true},
{"float64", &common.AnyValue{Value: &common.AnyValue_DoubleValue{DoubleValue: 123}}, float64(123)},
{"bytes as b64", &common.AnyValue{Value: &common.AnyValue_BytesValue{BytesValue: []byte{10, 20, 30}}}, `"ChQe"` + "\n"},
{"string", &common.AnyValue{Value: &common.AnyValue_StringValue{StringValue: "foo"}}, map[string]interface{}{"body": "foo"}},
{"int64", &common.AnyValue{Value: &common.AnyValue_IntValue{IntValue: 123}}, map[string]interface{}{"body": int64(123)}},
{"bool", &common.AnyValue{Value: &common.AnyValue_BoolValue{BoolValue: true}}, map[string]interface{}{"body": true}},
{"float64", &common.AnyValue{Value: &common.AnyValue_DoubleValue{DoubleValue: 123}}, map[string]interface{}{"body": float64(123)}},
{"bytes as b64", &common.AnyValue{Value: &common.AnyValue_BytesValue{BytesValue: []byte{10, 20, 30}}}, map[string]interface{}{"body": `"ChQe"` + "\n"}},
{"array as mixed-type string", &common.AnyValue{Value: &common.AnyValue_ArrayValue{
ArrayValue: &common.ArrayValue{Values: []*common.AnyValue{
{Value: &common.AnyValue_IntValue{IntValue: 123}},
{Value: &common.AnyValue_DoubleValue{DoubleValue: 45.6}},
{Value: &common.AnyValue_StringValue{StringValue: "hi mom"}},
}},
}}, `[123,45.6,"hi mom"]` + "\n"},
}}, map[string]interface{}{
"body": "[123,45.6,\"hi mom\"]\n",
}},
{"map as mixed-type string", &common.AnyValue{
Value: &common.AnyValue_KvlistValue{KvlistValue: &common.KeyValueList{
Values: []*common.KeyValue{
{Key: "foo", Value: &common.AnyValue{Value: &common.AnyValue_IntValue{IntValue: 123}}},
{Key: "bar", Value: &common.AnyValue{Value: &common.AnyValue_DoubleValue{DoubleValue: 45.6}}},
{Key: "mom", Value: &common.AnyValue{Value: &common.AnyValue_StringValue{StringValue: "hi mom"}}},
},
}}}, `{"foo":123,"bar":45.6,"mom":"hi mom"}` + "\n"},
}}},
map[string]interface{}{
"body.foo": int64(123),
"body.bar": float64(45.6),
"body.mom": "hi mom",
},
},
{"nested map as mixed-type string", &common.AnyValue{
Value: &common.AnyValue_KvlistValue{KvlistValue: &common.KeyValueList{
Values: []*common.KeyValue{
Expand All @@ -367,29 +368,21 @@ func Test_getValue(t *testing.T) {
},
}}}},
},
}}}, `{"bar":45.6,"foo":123,"nest":{"bar":45.6,"foo":123,"mom":"hi mom"}}`},
}}},
map[string]interface{}{
"body.foo": int64(123),
"body.bar": float64(45.6),
"body.nest.bar": float64(45.6),
"body.nest.foo": int64(123),
"body.nest.mom": "hi mom",
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, truncated := getValue(tt.value)
if truncated != 0 {
t.Errorf("getValue() returned %v for truncatedBytes, should be 0", truncated)
}
if s, ok := got.(string); ok && strings.HasPrefix(s, "{") {
// it's a string wrapping an object, and might be out of order, so convert them both to objects
// and compare them as unmarshalled objects
var g, w map[string]any
json.Unmarshal([]byte(s), &g)
json.Unmarshal([]byte(tt.want.(string)), &w)
if !reflect.DeepEqual(g, w) {
t.Errorf("getValue() unmarshalled = %#v, want %#v", g, w)
t.Errorf("getValue() marshalled = %v (%T), want %v (%T)", got, got, tt.want, tt.want)
}
} else {
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("getValue() = %v (%T), want %v (%T)", got, got, tt.want, tt.want)
}
}
attrs := map[string]interface{}{}
addAttributeToMap(attrs, "body", tt.value, 0)
assert.Equal(t, tt.want, attrs)
})
}
}
Expand Down
14 changes: 6 additions & 8 deletions otlp/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,11 @@ func TranslateLogsRequest(request *collectorLogs.ExportLogsServiceRequest, ri Re
attrs["severity_text"] = log.SeverityText
}
if log.Body != nil {
if val, truncatedBytes := getValue(log.Body); val != nil {
attrs["body"] = val
if truncatedBytes != 0 {
// if we trim the body, add telemetry about it
attrs["meta.truncated_bytes"] = truncatedBytes
attrs["meta.truncated_field"] = "body"
}
// convert the log body to attributes, includes flattening kv pairs into multiple attributes
addAttributeToMap(attrs, "body", log.Body, 0)
// if the body attribute is not set, add the whole body as a json string
if _, ok := attrs["body"]; !ok {
addAttributeToMapAsJson(attrs, "body", log.Body)
}
}

Expand All @@ -74,7 +72,7 @@ func TranslateLogsRequest(request *collectorLogs.ExportLogsServiceRequest, ri Re
attrs[k] = v
}
if log.Attributes != nil {
addAttributesToMap(attrs, log.Attributes)
AddAttributesToMap(attrs, log.Attributes)
}

// Now we need to wrap the eventAttrs in an event so we can specify the timestamp
Expand Down
Loading

0 comments on commit 92c8d7c

Please sign in to comment.