Skip to content

Commit

Permalink
[exporter/elasticsearch] Preserve attribute names and metric names on…
Browse files Browse the repository at this point in the history
… prefix conflict in OTel mapping mode (#35651)

<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
#### Description

Metric names should be flattened and exported as is, even when one
metric name is a prefix of another. Same for attributes for all logs,
metrics and traces.


<!-- Issue number (e.g. #1234) or full URL to issue, if applicable. -->
#### Link to tracking issue

<!--Describe what testing was performed and which tests were added.-->
#### Testing

<!--Describe the documentation added.-->
#### Documentation

<!--Please delete paragraphs that you did not use before submitting.-->
  • Loading branch information
carsonip authored Oct 17, 2024
1 parent e7ebc6e commit e9efb40
Show file tree
Hide file tree
Showing 5 changed files with 206 additions and 29 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: elasticsearchexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Preserve attribute names and metric names on prefix conflict in OTel mapping mode

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [35651]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: e.g. if there are attributes "a" and "a.b", they should be sent to Elasticsearch as is, instead of "a.value" and "a.b", in OTel mapping mode

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
127 changes: 127 additions & 0 deletions exporter/elasticsearchexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,35 @@ func TestExporterLogs(t *testing.T) {
assert.Equal(t, `{"some.scope.attribute":["foo","bar"]}`, gjson.GetBytes(doc, `scope.attributes`).Raw)
assert.Equal(t, `{"some.resource.attribute":["foo","bar"]}`, gjson.GetBytes(doc, `resource.attributes`).Raw)
})

t.Run("otel mode attribute key prefix conflict", func(t *testing.T) {
rec := newBulkRecorder()
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
rec.Record(docs)
return itemsAllOK(docs)
})

exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) {
cfg.Mapping.Mode = "otel"
})

mustSendLogs(t, exporter, newLogsWithAttributes(map[string]any{
"a": "a",
"a.b": "a.b",
}, map[string]any{
"a": "a",
"a.b": "a.b",
}, map[string]any{
"a": "a",
"a.b": "a.b",
}))

rec.WaitItems(1)
doc := rec.Items()[0].Document
assert.JSONEq(t, `{"a":"a","a.b":"a.b"}`, gjson.GetBytes(doc, `attributes`).Raw)
assert.JSONEq(t, `{"a":"a","a.b":"a.b"}`, gjson.GetBytes(doc, `scope.attributes`).Raw)
assert.JSONEq(t, `{"a":"a","a.b":"a.b"}`, gjson.GetBytes(doc, `resource.attributes`).Raw)
})
}

func TestExporterMetrics(t *testing.T) {
Expand Down Expand Up @@ -1300,6 +1329,75 @@ func TestExporterMetrics(t *testing.T) {
assertItemsEqual(t, expected, rec.Items(), false)
})

t.Run("otel mode metric name conflict", func(t *testing.T) {
rec := newBulkRecorder()
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
rec.Record(docs)
return itemsAllOK(docs)
})

exporter := newTestMetricsExporter(t, server.URL, func(cfg *Config) {
cfg.Mapping.Mode = "otel"
})

metrics := pmetric.NewMetrics()
resourceMetric := metrics.ResourceMetrics().AppendEmpty()
scopeMetric := resourceMetric.ScopeMetrics().AppendEmpty()

fooBarMetric := scopeMetric.Metrics().AppendEmpty()
fooBarMetric.SetName("foo.bar")
fooBarMetric.SetEmptySum().DataPoints().AppendEmpty().SetIntValue(0)

fooMetric := scopeMetric.Metrics().AppendEmpty()
fooMetric.SetName("foo")
fooMetric.SetEmptySum().DataPoints().AppendEmpty().SetIntValue(0)

fooBarBazMetric := scopeMetric.Metrics().AppendEmpty()
fooBarBazMetric.SetName("foo.bar.baz")
fooBarBazMetric.SetEmptySum().DataPoints().AppendEmpty().SetIntValue(0)

mustSendMetrics(t, exporter, metrics)

rec.WaitItems(1)
expected := []itemRequest{
{
Action: []byte(`{"create":{"_index":"metrics-generic.otel-default","dynamic_templates":{"metrics.foo.bar":"gauge_long","metrics.foo":"gauge_long","metrics.foo.bar.baz":"gauge_long"}}}`),
Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"foo":0,"foo.bar":0,"foo.bar.baz":0},"resource":{"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0}}`),
},
}

assertItemsEqual(t, expected, rec.Items(), false)
})

t.Run("otel mode attribute key prefix conflict", func(t *testing.T) {
rec := newBulkRecorder()
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
rec.Record(docs)
return itemsAllOK(docs)
})

exporter := newTestMetricsExporter(t, server.URL, func(cfg *Config) {
cfg.Mapping.Mode = "otel"
})

mustSendMetrics(t, exporter, newMetricsWithAttributes(map[string]any{
"a": "a",
"a.b": "a.b",
}, map[string]any{
"a": "a",
"a.b": "a.b",
}, map[string]any{
"a": "a",
"a.b": "a.b",
}))

rec.WaitItems(1)
doc := rec.Items()[0].Document
assert.JSONEq(t, `{"a":"a","a.b":"a.b"}`, gjson.GetBytes(doc, `attributes`).Raw)
assert.JSONEq(t, `{"a":"a","a.b":"a.b"}`, gjson.GetBytes(doc, `scope.attributes`).Raw)
assert.JSONEq(t, `{"a":"a","a.b":"a.b"}`, gjson.GetBytes(doc, `resource.attributes`).Raw)
})

t.Run("publish summary", func(t *testing.T) {
rec := newBulkRecorder()
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
Expand Down Expand Up @@ -1600,6 +1698,35 @@ func TestExporterTraces(t *testing.T) {
assert.Equal(t, `{"some.resource.attribute":["foo","bar"]}`, gjson.GetBytes(doc, `resource.attributes`).Raw)
}
})

t.Run("otel mode attribute key prefix conflict", func(t *testing.T) {
rec := newBulkRecorder()
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
rec.Record(docs)
return itemsAllOK(docs)
})

exporter := newTestTracesExporter(t, server.URL, func(cfg *Config) {
cfg.Mapping.Mode = "otel"
})

mustSendTraces(t, exporter, newTracesWithAttributes(map[string]any{
"a": "a",
"a.b": "a.b",
}, map[string]any{
"a": "a",
"a.b": "a.b",
}, map[string]any{
"a": "a",
"a.b": "a.b",
}))

rec.WaitItems(1)
doc := rec.Items()[0].Document
assert.JSONEq(t, `{"a":"a","a.b":"a.b"}`, gjson.GetBytes(doc, `attributes`).Raw)
assert.JSONEq(t, `{"a":"a","a.b":"a.b"}`, gjson.GetBytes(doc, `scope.attributes`).Raw)
assert.JSONEq(t, `{"a":"a","a.b":"a.b"}`, gjson.GetBytes(doc, `resource.attributes`).Raw)
})
}

// TestExporterAuth verifies that the Elasticsearch exporter supports
Expand Down
32 changes: 17 additions & 15 deletions exporter/elasticsearchexporter/internal/objmodel/objmodel.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,12 +209,12 @@ func (doc *Document) sort() {
// The filtering only keeps the last value for a key.
//
// Dedup ensure that keys are sorted.
func (doc *Document) Dedup() {
func (doc *Document) Dedup(appendValueOnConflict bool) {
// 1. Always ensure the fields are sorted, Dedup support requires
// Fields to be sorted.
doc.sort()

// 2. rename fields if a primitive value is overwritten by an object.
// 2. rename fields if a primitive value is overwritten by an object if appendValueOnConflict.
// For example the pair (path.x=1, path.x.a="test") becomes:
// (path.x.value=1, path.x.a="test").
//
Expand All @@ -227,16 +227,18 @@ func (doc *Document) Dedup() {
// field in favor of the `value` field in the document.
//
// This step removes potential conflicts when dedotting and serializing fields.
var renamed bool
for i := 0; i < len(doc.fields)-1; i++ {
key, nextKey := doc.fields[i].key, doc.fields[i+1].key
if len(key) < len(nextKey) && strings.HasPrefix(nextKey, key) && nextKey[len(key)] == '.' {
renamed = true
doc.fields[i].key = key + ".value"
if appendValueOnConflict {
var renamed bool
for i := 0; i < len(doc.fields)-1; i++ {
key, nextKey := doc.fields[i].key, doc.fields[i+1].key
if len(key) < len(nextKey) && strings.HasPrefix(nextKey, key) && nextKey[len(key)] == '.' {
renamed = true
doc.fields[i].key = key + ".value"
}
}
if renamed {
doc.sort()
}
}
if renamed {
doc.sort()
}

// 3. mark duplicates as 'ignore'
Expand All @@ -251,7 +253,7 @@ func (doc *Document) Dedup() {

// 4. fix objects that might be stored in arrays
for i := range doc.fields {
doc.fields[i].value.Dedup()
doc.fields[i].value.Dedup(appendValueOnConflict)
}
}

Expand Down Expand Up @@ -487,13 +489,13 @@ func (v *Value) sort() {
// Dedup recursively dedups keys in stored documents.
//
// NOTE: The value MUST be sorted.
func (v *Value) Dedup() {
func (v *Value) Dedup(appendValueOnConflict bool) {
switch v.kind {
case KindObject:
v.doc.Dedup()
v.doc.Dedup(appendValueOnConflict)
case KindArr:
for i := range v.arr {
v.arr[i].Dedup()
v.arr[i].Dedup(appendValueOnConflict)
}
}
}
Expand Down
40 changes: 29 additions & 11 deletions exporter/elasticsearchexporter/internal/objmodel/objmodel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,18 @@ func TestObjectModel_CreateMap(t *testing.T) {

func TestObjectModel_Dedup(t *testing.T) {
tests := map[string]struct {
build func() Document
want Document
build func() Document
appendValueOnConflict bool
want Document
}{
"no duplicates": {
build: func() (doc Document) {
doc.AddInt("a", 1)
doc.AddInt("c", 3)
return doc
},
want: Document{fields: []field{{"a", IntValue(1)}, {"c", IntValue(3)}}},
appendValueOnConflict: true,
want: Document{fields: []field{{"a", IntValue(1)}, {"c", IntValue(3)}}},
},
"duplicate keys": {
build: func() (doc Document) {
Expand All @@ -104,7 +106,8 @@ func TestObjectModel_Dedup(t *testing.T) {
doc.AddInt("a", 2)
return doc
},
want: Document{fields: []field{{"a", ignoreValue}, {"a", IntValue(2)}, {"c", IntValue(3)}}},
appendValueOnConflict: true,
want: Document{fields: []field{{"a", ignoreValue}, {"a", IntValue(2)}, {"c", IntValue(3)}}},
},
"duplicate after flattening from map: namespace object at end": {
build: func() Document {
Expand All @@ -114,7 +117,8 @@ func TestObjectModel_Dedup(t *testing.T) {
am.PutEmptyMap("namespace").PutInt("a", 23)
return DocumentFromAttributes(am)
},
want: Document{fields: []field{{"namespace.a", ignoreValue}, {"namespace.a", IntValue(23)}, {"toplevel", StringValue("test")}}},
appendValueOnConflict: true,
want: Document{fields: []field{{"namespace.a", ignoreValue}, {"namespace.a", IntValue(23)}, {"toplevel", StringValue("test")}}},
},
"duplicate after flattening from map: namespace object at beginning": {
build: func() Document {
Expand All @@ -124,7 +128,8 @@ func TestObjectModel_Dedup(t *testing.T) {
am.PutStr("toplevel", "test")
return DocumentFromAttributes(am)
},
want: Document{fields: []field{{"namespace.a", ignoreValue}, {"namespace.a", IntValue(42)}, {"toplevel", StringValue("test")}}},
appendValueOnConflict: true,
want: Document{fields: []field{{"namespace.a", ignoreValue}, {"namespace.a", IntValue(42)}, {"toplevel", StringValue("test")}}},
},
"dedup in arrays": {
build: func() (doc Document) {
Expand All @@ -136,6 +141,7 @@ func TestObjectModel_Dedup(t *testing.T) {
doc.Add("arr", ArrValue(Value{kind: KindObject, doc: embedded}))
return doc
},
appendValueOnConflict: true,
want: Document{fields: []field{{"arr", ArrValue(Value{kind: KindObject, doc: Document{fields: []field{
{"a", ignoreValue},
{"a", IntValue(2)},
Expand All @@ -148,7 +154,8 @@ func TestObjectModel_Dedup(t *testing.T) {
doc.AddInt("namespace.a", 2)
return doc
},
want: Document{fields: []field{{"namespace.a", IntValue(2)}, {"namespace.value", IntValue(1)}}},
appendValueOnConflict: true,
want: Document{fields: []field{{"namespace.a", IntValue(2)}, {"namespace.value", IntValue(1)}}},
},
"dedup removes primitive if value exists": {
build: func() (doc Document) {
Expand All @@ -157,14 +164,25 @@ func TestObjectModel_Dedup(t *testing.T) {
doc.AddInt("namespace.value", 3)
return doc
},
want: Document{fields: []field{{"namespace.a", IntValue(2)}, {"namespace.value", ignoreValue}, {"namespace.value", IntValue(3)}}},
appendValueOnConflict: true,
want: Document{fields: []field{{"namespace.a", IntValue(2)}, {"namespace.value", ignoreValue}, {"namespace.value", IntValue(3)}}},
},
"dedup without append value on conflict": {
build: func() (doc Document) {
doc.AddInt("namespace", 1)
doc.AddInt("namespace.a", 2)
doc.AddInt("namespace.value", 3)
return doc
},
appendValueOnConflict: false,
want: Document{fields: []field{{"namespace", IntValue(1)}, {"namespace.a", IntValue(2)}, {"namespace.value", IntValue(3)}}},
},
}

for name, test := range tests {
t.Run(name, func(t *testing.T) {
doc := test.build()
doc.Dedup()
doc.Dedup(test.appendValueOnConflict)
assert.Equal(t, test.want, doc)
})
}
Expand Down Expand Up @@ -282,7 +300,7 @@ func TestDocument_Serialize_Flat(t *testing.T) {
m := pcommon.NewMap()
assert.NoError(t, m.FromRaw(test.attrs))
doc := DocumentFromAttributes(m)
doc.Dedup()
doc.Dedup(true)
err := doc.Serialize(&buf, false, false)
require.NoError(t, err)

Expand Down Expand Up @@ -343,7 +361,7 @@ func TestDocument_Serialize_Dedot(t *testing.T) {
m := pcommon.NewMap()
assert.NoError(t, m.FromRaw(test.attrs))
doc := DocumentFromAttributes(m)
doc.Dedup()
doc.Dedup(true)
err := doc.Serialize(&buf, true, false)
require.NoError(t, err)

Expand Down
9 changes: 6 additions & 3 deletions exporter/elasticsearchexporter/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ func (m *encodeModel) encodeLog(resource pcommon.Resource, resourceSchemaURL str
default:
document = m.encodeLogDefaultMode(resource, record, scope)
}
document.Dedup()
// For OTel mode, prefix conflicts are not a problem as otel-data has subobjects: false
document.Dedup(m.mode != MappingOTel)

var buf bytes.Buffer
err := document.Serialize(&buf, m.dedot, m.mode == MappingOTel)
Expand Down Expand Up @@ -267,7 +268,8 @@ func (m *encodeModel) encodeLogECSMode(resource pcommon.Resource, record plog.Lo
}

func (m *encodeModel) encodeDocument(document objmodel.Document) ([]byte, error) {
document.Dedup()
// For OTel mode, prefix conflicts are not a problem as otel-data has subobjects: false
document.Dedup(m.mode != MappingOTel)

var buf bytes.Buffer
err := document.Serialize(&buf, m.dedot, m.mode == MappingOTel)
Expand Down Expand Up @@ -646,7 +648,8 @@ func (m *encodeModel) encodeSpan(resource pcommon.Resource, resourceSchemaURL st
default:
document = m.encodeSpanDefaultMode(resource, span, scope)
}
document.Dedup()
// For OTel mode, prefix conflicts are not a problem as otel-data has subobjects: false
document.Dedup(m.mode != MappingOTel)
var buf bytes.Buffer
err := document.Serialize(&buf, m.dedot, m.mode == MappingOTel)
return buf.Bytes(), err
Expand Down

0 comments on commit e9efb40

Please sign in to comment.