diff --git a/.chloggen/elasticsearchexporter_otel-mode-passthrough-field-prefix-conflict.yaml b/.chloggen/elasticsearchexporter_otel-mode-passthrough-field-prefix-conflict.yaml new file mode 100644 index 000000000000..afde47be348b --- /dev/null +++ b/.chloggen/elasticsearchexporter_otel-mode-passthrough-field-prefix-conflict.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: elasticsearchexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Preserve attribute names and metric names on prefix conflict in OTel mapping mode + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [35651] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: e.g. if there are attributes "a" and "a.b", they should be sent to Elasticsearch as is, instead of "a.value" and "a.b", in OTel mapping mode + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/elasticsearchexporter/exporter_test.go b/exporter/elasticsearchexporter/exporter_test.go index e2871666b138..f1b455e41e1e 100644 --- a/exporter/elasticsearchexporter/exporter_test.go +++ b/exporter/elasticsearchexporter/exporter_test.go @@ -714,6 +714,35 @@ func TestExporterLogs(t *testing.T) { assert.Equal(t, `{"some.scope.attribute":["foo","bar"]}`, gjson.GetBytes(doc, `scope.attributes`).Raw) assert.Equal(t, `{"some.resource.attribute":["foo","bar"]}`, gjson.GetBytes(doc, `resource.attributes`).Raw) }) + + t.Run("otel mode attribute key prefix conflict", func(t *testing.T) { + rec := newBulkRecorder() + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + rec.Record(docs) + return itemsAllOK(docs) + }) + + exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) { + cfg.Mapping.Mode = "otel" + }) + + mustSendLogs(t, exporter, newLogsWithAttributes(map[string]any{ + "a": "a", + "a.b": "a.b", + }, map[string]any{ + "a": "a", + "a.b": "a.b", + }, map[string]any{ + "a": "a", + "a.b": "a.b", + })) + + rec.WaitItems(1) + doc := rec.Items()[0].Document + assert.JSONEq(t, `{"a":"a","a.b":"a.b"}`, gjson.GetBytes(doc, `attributes`).Raw) + assert.JSONEq(t, `{"a":"a","a.b":"a.b"}`, gjson.GetBytes(doc, `scope.attributes`).Raw) + assert.JSONEq(t, `{"a":"a","a.b":"a.b"}`, gjson.GetBytes(doc, `resource.attributes`).Raw) + }) } func TestExporterMetrics(t *testing.T) { @@ -1300,6 +1329,75 @@ func TestExporterMetrics(t *testing.T) { assertItemsEqual(t, expected, rec.Items(), false) }) + t.Run("otel mode metric name conflict", func(t *testing.T) { + rec := newBulkRecorder() + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + rec.Record(docs) + return itemsAllOK(docs) + }) + + exporter := newTestMetricsExporter(t, server.URL, func(cfg *Config) { + cfg.Mapping.Mode = "otel" + }) + + metrics := pmetric.NewMetrics() + resourceMetric := metrics.ResourceMetrics().AppendEmpty() + scopeMetric := resourceMetric.ScopeMetrics().AppendEmpty() + + fooBarMetric := scopeMetric.Metrics().AppendEmpty() + fooBarMetric.SetName("foo.bar") + fooBarMetric.SetEmptySum().DataPoints().AppendEmpty().SetIntValue(0) + + fooMetric := scopeMetric.Metrics().AppendEmpty() + fooMetric.SetName("foo") + fooMetric.SetEmptySum().DataPoints().AppendEmpty().SetIntValue(0) + + fooBarBazMetric := scopeMetric.Metrics().AppendEmpty() + fooBarBazMetric.SetName("foo.bar.baz") + fooBarBazMetric.SetEmptySum().DataPoints().AppendEmpty().SetIntValue(0) + + mustSendMetrics(t, exporter, metrics) + + rec.WaitItems(1) + expected := []itemRequest{ + { + Action: []byte(`{"create":{"_index":"metrics-generic.otel-default","dynamic_templates":{"metrics.foo.bar":"gauge_long","metrics.foo":"gauge_long","metrics.foo.bar.baz":"gauge_long"}}}`), + Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"foo":0,"foo.bar":0,"foo.bar.baz":0},"resource":{"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0}}`), + }, + } + + assertItemsEqual(t, expected, rec.Items(), false) + }) + + t.Run("otel mode attribute key prefix conflict", func(t *testing.T) { + rec := newBulkRecorder() + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + rec.Record(docs) + return itemsAllOK(docs) + }) + + exporter := newTestMetricsExporter(t, server.URL, func(cfg *Config) { + cfg.Mapping.Mode = "otel" + }) + + mustSendMetrics(t, exporter, newMetricsWithAttributes(map[string]any{ + "a": "a", + "a.b": "a.b", + }, map[string]any{ + "a": "a", + "a.b": "a.b", + }, map[string]any{ + "a": "a", + "a.b": "a.b", + })) + + rec.WaitItems(1) + doc := rec.Items()[0].Document + assert.JSONEq(t, `{"a":"a","a.b":"a.b"}`, gjson.GetBytes(doc, `attributes`).Raw) + assert.JSONEq(t, `{"a":"a","a.b":"a.b"}`, gjson.GetBytes(doc, `scope.attributes`).Raw) + assert.JSONEq(t, `{"a":"a","a.b":"a.b"}`, gjson.GetBytes(doc, `resource.attributes`).Raw) + }) + t.Run("publish summary", func(t *testing.T) { rec := newBulkRecorder() server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { @@ -1600,6 +1698,35 @@ func TestExporterTraces(t *testing.T) { assert.Equal(t, `{"some.resource.attribute":["foo","bar"]}`, gjson.GetBytes(doc, `resource.attributes`).Raw) } }) + + t.Run("otel mode attribute key prefix conflict", func(t *testing.T) { + rec := newBulkRecorder() + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + rec.Record(docs) + return itemsAllOK(docs) + }) + + exporter := newTestTracesExporter(t, server.URL, func(cfg *Config) { + cfg.Mapping.Mode = "otel" + }) + + mustSendTraces(t, exporter, newTracesWithAttributes(map[string]any{ + "a": "a", + "a.b": "a.b", + }, map[string]any{ + "a": "a", + "a.b": "a.b", + }, map[string]any{ + "a": "a", + "a.b": "a.b", + })) + + rec.WaitItems(1) + doc := rec.Items()[0].Document + assert.JSONEq(t, `{"a":"a","a.b":"a.b"}`, gjson.GetBytes(doc, `attributes`).Raw) + assert.JSONEq(t, `{"a":"a","a.b":"a.b"}`, gjson.GetBytes(doc, `scope.attributes`).Raw) + assert.JSONEq(t, `{"a":"a","a.b":"a.b"}`, gjson.GetBytes(doc, `resource.attributes`).Raw) + }) } // TestExporterAuth verifies that the Elasticsearch exporter supports diff --git a/exporter/elasticsearchexporter/internal/objmodel/objmodel.go b/exporter/elasticsearchexporter/internal/objmodel/objmodel.go index f20f9b1d213b..33a63abc794d 100644 --- a/exporter/elasticsearchexporter/internal/objmodel/objmodel.go +++ b/exporter/elasticsearchexporter/internal/objmodel/objmodel.go @@ -209,12 +209,12 @@ func (doc *Document) sort() { // The filtering only keeps the last value for a key. // // Dedup ensure that keys are sorted. -func (doc *Document) Dedup() { +func (doc *Document) Dedup(appendValueOnConflict bool) { // 1. Always ensure the fields are sorted, Dedup support requires // Fields to be sorted. doc.sort() - // 2. rename fields if a primitive value is overwritten by an object. + // 2. rename fields if a primitive value is overwritten by an object if appendValueOnConflict. // For example the pair (path.x=1, path.x.a="test") becomes: // (path.x.value=1, path.x.a="test"). // @@ -227,16 +227,18 @@ func (doc *Document) Dedup() { // field in favor of the `value` field in the document. // // This step removes potential conflicts when dedotting and serializing fields. - var renamed bool - for i := 0; i < len(doc.fields)-1; i++ { - key, nextKey := doc.fields[i].key, doc.fields[i+1].key - if len(key) < len(nextKey) && strings.HasPrefix(nextKey, key) && nextKey[len(key)] == '.' { - renamed = true - doc.fields[i].key = key + ".value" + if appendValueOnConflict { + var renamed bool + for i := 0; i < len(doc.fields)-1; i++ { + key, nextKey := doc.fields[i].key, doc.fields[i+1].key + if len(key) < len(nextKey) && strings.HasPrefix(nextKey, key) && nextKey[len(key)] == '.' { + renamed = true + doc.fields[i].key = key + ".value" + } + } + if renamed { + doc.sort() } - } - if renamed { - doc.sort() } // 3. mark duplicates as 'ignore' @@ -251,7 +253,7 @@ func (doc *Document) Dedup() { // 4. fix objects that might be stored in arrays for i := range doc.fields { - doc.fields[i].value.Dedup() + doc.fields[i].value.Dedup(appendValueOnConflict) } } @@ -487,13 +489,13 @@ func (v *Value) sort() { // Dedup recursively dedups keys in stored documents. // // NOTE: The value MUST be sorted. -func (v *Value) Dedup() { +func (v *Value) Dedup(appendValueOnConflict bool) { switch v.kind { case KindObject: - v.doc.Dedup() + v.doc.Dedup(appendValueOnConflict) case KindArr: for i := range v.arr { - v.arr[i].Dedup() + v.arr[i].Dedup(appendValueOnConflict) } } } diff --git a/exporter/elasticsearchexporter/internal/objmodel/objmodel_test.go b/exporter/elasticsearchexporter/internal/objmodel/objmodel_test.go index 1961f716db05..3d0a07b820d0 100644 --- a/exporter/elasticsearchexporter/internal/objmodel/objmodel_test.go +++ b/exporter/elasticsearchexporter/internal/objmodel/objmodel_test.go @@ -86,8 +86,9 @@ func TestObjectModel_CreateMap(t *testing.T) { func TestObjectModel_Dedup(t *testing.T) { tests := map[string]struct { - build func() Document - want Document + build func() Document + appendValueOnConflict bool + want Document }{ "no duplicates": { build: func() (doc Document) { @@ -95,7 +96,8 @@ func TestObjectModel_Dedup(t *testing.T) { doc.AddInt("c", 3) return doc }, - want: Document{fields: []field{{"a", IntValue(1)}, {"c", IntValue(3)}}}, + appendValueOnConflict: true, + want: Document{fields: []field{{"a", IntValue(1)}, {"c", IntValue(3)}}}, }, "duplicate keys": { build: func() (doc Document) { @@ -104,7 +106,8 @@ func TestObjectModel_Dedup(t *testing.T) { doc.AddInt("a", 2) return doc }, - want: Document{fields: []field{{"a", ignoreValue}, {"a", IntValue(2)}, {"c", IntValue(3)}}}, + appendValueOnConflict: true, + want: Document{fields: []field{{"a", ignoreValue}, {"a", IntValue(2)}, {"c", IntValue(3)}}}, }, "duplicate after flattening from map: namespace object at end": { build: func() Document { @@ -114,7 +117,8 @@ func TestObjectModel_Dedup(t *testing.T) { am.PutEmptyMap("namespace").PutInt("a", 23) return DocumentFromAttributes(am) }, - want: Document{fields: []field{{"namespace.a", ignoreValue}, {"namespace.a", IntValue(23)}, {"toplevel", StringValue("test")}}}, + appendValueOnConflict: true, + want: Document{fields: []field{{"namespace.a", ignoreValue}, {"namespace.a", IntValue(23)}, {"toplevel", StringValue("test")}}}, }, "duplicate after flattening from map: namespace object at beginning": { build: func() Document { @@ -124,7 +128,8 @@ func TestObjectModel_Dedup(t *testing.T) { am.PutStr("toplevel", "test") return DocumentFromAttributes(am) }, - want: Document{fields: []field{{"namespace.a", ignoreValue}, {"namespace.a", IntValue(42)}, {"toplevel", StringValue("test")}}}, + appendValueOnConflict: true, + want: Document{fields: []field{{"namespace.a", ignoreValue}, {"namespace.a", IntValue(42)}, {"toplevel", StringValue("test")}}}, }, "dedup in arrays": { build: func() (doc Document) { @@ -136,6 +141,7 @@ func TestObjectModel_Dedup(t *testing.T) { doc.Add("arr", ArrValue(Value{kind: KindObject, doc: embedded})) return doc }, + appendValueOnConflict: true, want: Document{fields: []field{{"arr", ArrValue(Value{kind: KindObject, doc: Document{fields: []field{ {"a", ignoreValue}, {"a", IntValue(2)}, @@ -148,7 +154,8 @@ func TestObjectModel_Dedup(t *testing.T) { doc.AddInt("namespace.a", 2) return doc }, - want: Document{fields: []field{{"namespace.a", IntValue(2)}, {"namespace.value", IntValue(1)}}}, + appendValueOnConflict: true, + want: Document{fields: []field{{"namespace.a", IntValue(2)}, {"namespace.value", IntValue(1)}}}, }, "dedup removes primitive if value exists": { build: func() (doc Document) { @@ -157,14 +164,25 @@ func TestObjectModel_Dedup(t *testing.T) { doc.AddInt("namespace.value", 3) return doc }, - want: Document{fields: []field{{"namespace.a", IntValue(2)}, {"namespace.value", ignoreValue}, {"namespace.value", IntValue(3)}}}, + appendValueOnConflict: true, + want: Document{fields: []field{{"namespace.a", IntValue(2)}, {"namespace.value", ignoreValue}, {"namespace.value", IntValue(3)}}}, + }, + "dedup without append value on conflict": { + build: func() (doc Document) { + doc.AddInt("namespace", 1) + doc.AddInt("namespace.a", 2) + doc.AddInt("namespace.value", 3) + return doc + }, + appendValueOnConflict: false, + want: Document{fields: []field{{"namespace", IntValue(1)}, {"namespace.a", IntValue(2)}, {"namespace.value", IntValue(3)}}}, }, } for name, test := range tests { t.Run(name, func(t *testing.T) { doc := test.build() - doc.Dedup() + doc.Dedup(test.appendValueOnConflict) assert.Equal(t, test.want, doc) }) } @@ -282,7 +300,7 @@ func TestDocument_Serialize_Flat(t *testing.T) { m := pcommon.NewMap() assert.NoError(t, m.FromRaw(test.attrs)) doc := DocumentFromAttributes(m) - doc.Dedup() + doc.Dedup(true) err := doc.Serialize(&buf, false, false) require.NoError(t, err) @@ -343,7 +361,7 @@ func TestDocument_Serialize_Dedot(t *testing.T) { m := pcommon.NewMap() assert.NoError(t, m.FromRaw(test.attrs)) doc := DocumentFromAttributes(m) - doc.Dedup() + doc.Dedup(true) err := doc.Serialize(&buf, true, false) require.NoError(t, err) diff --git a/exporter/elasticsearchexporter/model.go b/exporter/elasticsearchexporter/model.go index 434bb1090a93..8c71df950752 100644 --- a/exporter/elasticsearchexporter/model.go +++ b/exporter/elasticsearchexporter/model.go @@ -115,7 +115,8 @@ func (m *encodeModel) encodeLog(resource pcommon.Resource, resourceSchemaURL str default: document = m.encodeLogDefaultMode(resource, record, scope) } - document.Dedup() + // For OTel mode, prefix conflicts are not a problem as otel-data has subobjects: false + document.Dedup(m.mode != MappingOTel) var buf bytes.Buffer err := document.Serialize(&buf, m.dedot, m.mode == MappingOTel) @@ -267,7 +268,8 @@ func (m *encodeModel) encodeLogECSMode(resource pcommon.Resource, record plog.Lo } func (m *encodeModel) encodeDocument(document objmodel.Document) ([]byte, error) { - document.Dedup() + // For OTel mode, prefix conflicts are not a problem as otel-data has subobjects: false + document.Dedup(m.mode != MappingOTel) var buf bytes.Buffer err := document.Serialize(&buf, m.dedot, m.mode == MappingOTel) @@ -646,7 +648,8 @@ func (m *encodeModel) encodeSpan(resource pcommon.Resource, resourceSchemaURL st default: document = m.encodeSpanDefaultMode(resource, span, scope) } - document.Dedup() + // For OTel mode, prefix conflicts are not a problem as otel-data has subobjects: false + document.Dedup(m.mode != MappingOTel) var buf bytes.Buffer err := document.Serialize(&buf, m.dedot, m.mode == MappingOTel) return buf.Bytes(), err