Skip to content

Commit

Permalink
[exporter/elasticsearch] Merge *.geo.location.{lat,lon} to *.geo.loca…
Browse files Browse the repository at this point in the history
…tion in OTel mode (#36594)

<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
#### Description

In OTel mapping mode, merge *.geo.location.{lat,lon} to *.geo.location
such that they are stored as
[geo_point](https://www.elastic.co/guide/en/elasticsearch/reference/current/geo-point.html)
in Elasticsearch.

<!-- Issue number (e.g. #1234) or full URL to issue, if applicable. -->
#### Link to tracking issue
Fixes #36565

<!--Describe what testing was performed and which tests were added.-->
#### Testing

<!--Describe the documentation added.-->
#### Documentation

<!--Please delete paragraphs that you did not use before submitting.-->

---------

Co-authored-by: Vishal Raj <[email protected]>
  • Loading branch information
2 people authored and shivanthzen committed Dec 5, 2024
1 parent ef479f7 commit 9bf98d5
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 1 deletion.
27 changes: 27 additions & 0 deletions .chloggen/elasticsearchexporter_merge-geo-location.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: elasticsearchexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Map *.geo.location.{lat,lon} as geo_point field in OTel mode

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [36565]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: In OTel mapping mode, merge *.geo.location.{lat,lon} to *.geo.location such that they are stored as geo_point in Elasticsearch.

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
78 changes: 77 additions & 1 deletion exporter/elasticsearchexporter/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"hash/fnv"
"math"
"slices"
"strings"
"time"

jsoniter "github.com/json-iterator/go"
Expand Down Expand Up @@ -599,7 +600,7 @@ func (m *encodeModel) encodeResourceOTelMode(document *objmodel.Document, resour
}
return false
})

mergeGeolocation(resourceAttrMap)
document.Add("resource", objmodel.ValueFromAttribute(resourceMapVal))
}

Expand All @@ -625,6 +626,7 @@ func (m *encodeModel) encodeScopeOTelMode(document *objmodel.Document, scope pco
}
return false
})
mergeGeolocation(scopeAttrMap)
document.Add("scope", objmodel.ValueFromAttribute(scopeMapVal))
}

Expand All @@ -644,6 +646,7 @@ func (m *encodeModel) encodeAttributesOTelMode(document *objmodel.Document, attr
}
return false
})
mergeGeolocation(attrsCopy)
document.AddAttributes("attributes", attrsCopy)
}

Expand Down Expand Up @@ -998,3 +1001,76 @@ func sliceHash(h hash.Hash, s pcommon.Slice) {
valueHash(h, s.At(i))
}
}

// mergeGeolocation mutates attributes map to merge all `geo.location.{lon,lat}`,
// and namespaced `*.geo.location.{lon,lat}` to unnamespaced and namespaced `geo.location`.
// This is to match the geo_point type in Elasticsearch.
func mergeGeolocation(attributes pcommon.Map) {
const (
lonKey = "geo.location.lon"
latKey = "geo.location.lat"
mergedKey = "geo.location"
)
// Prefix is the attribute name without lonKey or latKey suffix
// e.g. prefix of "foo.bar.geo.location.lon" is "foo.bar.", prefix of "geo.location.lon" is "".
prefixToGeo := make(map[string]struct {
lon, lat float64
lonSet, latSet bool
})
setLon := func(prefix string, v float64) {
g := prefixToGeo[prefix]
g.lon = v
g.lonSet = true
prefixToGeo[prefix] = g
}
setLat := func(prefix string, v float64) {
g := prefixToGeo[prefix]
g.lat = v
g.latSet = true
prefixToGeo[prefix] = g
}
attributes.RemoveIf(func(key string, val pcommon.Value) bool {
if val.Type() != pcommon.ValueTypeDouble {
return false
}

if key == lonKey {
setLon("", val.Double())
return true
} else if key == latKey {
setLat("", val.Double())
return true
} else if namespace, found := strings.CutSuffix(key, "."+lonKey); found {
prefix := namespace + "."
setLon(prefix, val.Double())
return true
} else if namespace, found := strings.CutSuffix(key, "."+latKey); found {
prefix := namespace + "."
setLat(prefix, val.Double())
return true
}
return false
})

for prefix, geo := range prefixToGeo {
if geo.lonSet && geo.latSet {
key := prefix + mergedKey
// Geopoint expressed as an array with the format: [lon, lat]
s := attributes.PutEmptySlice(key)
s.EnsureCapacity(2)
s.AppendEmpty().SetDouble(geo.lon)
s.AppendEmpty().SetDouble(geo.lat)
continue
}

// Place the attributes back if lon and lat are not present together
if geo.lonSet {
key := prefix + lonKey
attributes.PutDouble(key, geo.lon)
}
if geo.latSet {
key := prefix + latKey
attributes.PutDouble(key, geo.lat)
}
}
}
33 changes: 33 additions & 0 deletions exporter/elasticsearchexporter/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1278,3 +1278,36 @@ func TestEncodeLogBodyMapMode(t *testing.T) {
require.Error(t, err)
require.ErrorIs(t, err, ErrInvalidTypeForBodyMapMode)
}

func TestMergeGeolocation(t *testing.T) {
attributes := map[string]any{
"geo.location.lon": 1.1,
"geo.location.lat": 2.2,
"foo.bar.geo.location.lon": 3.3,
"foo.bar.geo.location.lat": 4.4,
"a.geo.location.lon": 5.5,
"b.geo.location.lat": 6.6,
"unrelatedgeo.location.lon": 7.7,
"unrelatedgeo.location.lat": 8.8,
"d": 9.9,
"e.geo.location.lon": "foo",
"e.geo.location.lat": "bar",
}
wantAttributes := map[string]any{
"geo.location": []any{1.1, 2.2},
"foo.bar.geo.location": []any{3.3, 4.4},
"a.geo.location.lon": 5.5,
"b.geo.location.lat": 6.6,
"unrelatedgeo.location.lon": 7.7,
"unrelatedgeo.location.lat": 8.8,
"d": 9.9,
"e.geo.location.lon": "foo",
"e.geo.location.lat": "bar",
}
input := pcommon.NewMap()
err := input.FromRaw(attributes)
require.NoError(t, err)
mergeGeolocation(input)
after := input.AsRaw()
assert.Equal(t, wantAttributes, after)
}

0 comments on commit 9bf98d5

Please sign in to comment.