diff --git a/.chloggen/geoipprocessor_add_field.yaml b/.chloggen/geoipprocessor_add_field.yaml new file mode 100644 index 000000000000..95bc31fd8650 --- /dev/null +++ b/.chloggen/geoipprocessor_add_field.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'enhancement' + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: geoipprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add initial processing based on source.address resource attribute + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [32663] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/processor/geoipprocessor/README.md b/processor/geoipprocessor/README.md index 696918367598..ccf54378872f 100644 --- a/processor/geoipprocessor/README.md +++ b/processor/geoipprocessor/README.md @@ -11,6 +11,8 @@ [development]: https://github.com/open-telemetry/opentelemetry-collector#development -## Overview +**This processor is currently under development and is presently a NOP (No Operation) processor. Further features and functionalities will be added in upcoming versions.** -This processor is currently under development and is presently a **NOP (No Operation) processor**. Further features and functionalities will be added in upcoming versions. +## Description + +The geoIP processor `geoipprocessor` enhances resource attributes by appending information about the geographical location of an IP address. To add geographical information, the IP address must be included in the resource attributes using the [`source.address` semantic conventions key attribute](https://github.com/open-telemetry/semantic-conventions/blob/v1.26.0/docs/general/attributes.md#source). diff --git a/processor/geoipprocessor/config_test.go b/processor/geoipprocessor/config_test.go index 6f41589941fe..7714a7587d6e 100644 --- a/processor/geoipprocessor/config_test.go +++ b/processor/geoipprocessor/config_test.go @@ -19,8 +19,9 @@ func TestLoadConfig(t *testing.T) { t.Parallel() tests := []struct { - id component.ID - expected component.Config + id component.ID + expected component.Config + errorMessage string }{ { id: component.NewID(metadata.Type), @@ -40,11 +41,11 @@ func TestLoadConfig(t *testing.T) { require.NoError(t, err) require.NoError(t, component.UnmarshalConfig(sub, cfg)) - if tt.expected == nil { - err = component.ValidateConfig(cfg) - assert.Error(t, err) + if tt.errorMessage != "" { + assert.EqualError(t, component.ValidateConfig(cfg), tt.errorMessage) return } + assert.NoError(t, component.ValidateConfig(cfg)) assert.Equal(t, tt.expected, cfg) }) diff --git a/processor/geoipprocessor/factory.go b/processor/geoipprocessor/factory.go index ade49bf4accd..aa46cfe67dc4 100644 --- a/processor/geoipprocessor/factory.go +++ b/processor/geoipprocessor/factory.go @@ -10,11 +10,20 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/processor" "go.opentelemetry.io/collector/processor/processorhelper" + "go.opentelemetry.io/otel/attribute" + semconv "go.opentelemetry.io/otel/semconv/v1.25.0" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/geoipprocessor/internal/metadata" ) -var processorCapabilities = consumer.Capabilities{MutatesData: true} +var ( + processorCapabilities = consumer.Capabilities{MutatesData: true} + // defaultResourceAttributes holds a list of default resource attribute keys. + // These keys are used to identify an IP address attribute associated with the resource. + defaultResourceAttributes = []attribute.Key{ + semconv.SourceAddressKey, // This key represents the standard source address attribute as defined in the OpenTelemetry semantic conventions. + } +) // NewFactory creates a new processor factory with default configuration, // and registers the processors for metrics, traces, and logs. @@ -28,13 +37,13 @@ func createDefaultConfig() component.Config { } func createMetricsProcessor(ctx context.Context, set processor.CreateSettings, cfg component.Config, nextConsumer consumer.Metrics) (processor.Metrics, error) { - return processorhelper.NewMetricsProcessor(ctx, set, cfg, nextConsumer, newGeoIPProcessor().processMetrics, processorhelper.WithCapabilities(processorCapabilities)) + return processorhelper.NewMetricsProcessor(ctx, set, cfg, nextConsumer, newGeoIPProcessor(defaultResourceAttributes).processMetrics, processorhelper.WithCapabilities(processorCapabilities)) } func createTracesProcessor(ctx context.Context, set processor.CreateSettings, cfg component.Config, nextConsumer consumer.Traces) (processor.Traces, error) { - return processorhelper.NewTracesProcessor(ctx, set, cfg, nextConsumer, newGeoIPProcessor().processTraces, processorhelper.WithCapabilities(processorCapabilities)) + return processorhelper.NewTracesProcessor(ctx, set, cfg, nextConsumer, newGeoIPProcessor(defaultResourceAttributes).processTraces, processorhelper.WithCapabilities(processorCapabilities)) } func createLogsProcessor(ctx context.Context, set processor.CreateSettings, cfg component.Config, nextConsumer consumer.Logs) (processor.Logs, error) { - return processorhelper.NewLogsProcessor(ctx, set, cfg, nextConsumer, newGeoIPProcessor().processLogs, processorhelper.WithCapabilities(processorCapabilities)) + return processorhelper.NewLogsProcessor(ctx, set, cfg, nextConsumer, newGeoIPProcessor(defaultResourceAttributes).processLogs, processorhelper.WithCapabilities(processorCapabilities)) } diff --git a/processor/geoipprocessor/geoip_processor.go b/processor/geoipprocessor/geoip_processor.go index 207e2e1644a9..098ca0e515a9 100644 --- a/processor/geoipprocessor/geoip_processor.go +++ b/processor/geoipprocessor/geoip_processor.go @@ -5,26 +5,116 @@ package geoipprocessor // import "github.com/open-telemetry/opentelemetry-collec import ( "context" + "errors" + "net" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/otel/attribute" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/geoipprocessor/internal/provider" ) -type geoIPProcessor struct{} +var errIPNotFound = errors.New("no IP address found in the resource attributes") + +// newGeoIPProcessor creates a new instance of geoIPProcessor with the specified fields. +type geoIPProcessor struct { + providers []provider.GeoIPProvider + resourceAttributes []attribute.Key +} + +func newGeoIPProcessor(resourceAttributes []attribute.Key) *geoIPProcessor { + return &geoIPProcessor{ + resourceAttributes: resourceAttributes, + } +} + +// ipFromResourceAttributes extracts an IP address from the given resource's attributes based on the specified fields. +// It returns the first IP address if found, or an error if no valid IP address is found. +func ipFromResourceAttributes(attributes []attribute.Key, resource pcommon.Resource) (net.IP, error) { + for _, attr := range attributes { + if ipField, found := resource.Attributes().Get(string(attr)); found { + ipAttribute := net.ParseIP(ipField.AsString()) + // The attribute might contain a domain name. Skip any net.ParseIP error until we have a fine-grained error propagation strategy. + // TODO: propagate an error once error_mode configuration option is available (e.g. transformprocessor) + if ipAttribute != nil { + return ipAttribute, nil + } + } + } + + return nil, errIPNotFound +} + +// geoLocation fetches geolocation information for the given IP address using the configured providers. +// It returns a set of attributes containing the geolocation data, or an error if the location could not be determined. +func (g *geoIPProcessor) geoLocation(ctx context.Context, ip net.IP) (attribute.Set, error) { + allAttributes := attribute.EmptySet() + for _, provider := range g.providers { + geoAttributes, err := provider.Location(ctx, ip) + if err != nil { + return attribute.Set{}, err + } + *allAttributes = attribute.NewSet(append(allAttributes.ToSlice(), geoAttributes.ToSlice()...)...) + } + + return *allAttributes, nil +} + +// processResource processes a single resource by adding geolocation attributes based on the found IP address. +func (g *geoIPProcessor) processResource(ctx context.Context, resource pcommon.Resource) error { + ipAddr, err := ipFromResourceAttributes(g.resourceAttributes, resource) + if err != nil { + // TODO: log IP error not found + if errors.Is(err, errIPNotFound) { + return nil + } + return err + } + + attributes, err := g.geoLocation(ctx, ipAddr) + if err != nil { + return err + } + + for _, geoAttr := range attributes.ToSlice() { + resource.Attributes().PutStr(string(geoAttr.Key), geoAttr.Value.AsString()) + } -func newGeoIPProcessor() *geoIPProcessor { - return &geoIPProcessor{} + return nil } -func (g *geoIPProcessor) processMetrics(_ context.Context, ms pmetric.Metrics) (pmetric.Metrics, error) { +func (g *geoIPProcessor) processMetrics(ctx context.Context, ms pmetric.Metrics) (pmetric.Metrics, error) { + rm := ms.ResourceMetrics() + for i := 0; i < rm.Len(); i++ { + err := g.processResource(ctx, rm.At(i).Resource()) + if err != nil { + return ms, err + } + } return ms, nil } -func (g *geoIPProcessor) processTraces(_ context.Context, ts ptrace.Traces) (ptrace.Traces, error) { +func (g *geoIPProcessor) processTraces(ctx context.Context, ts ptrace.Traces) (ptrace.Traces, error) { + rt := ts.ResourceSpans() + for i := 0; i < rt.Len(); i++ { + err := g.processResource(ctx, rt.At(i).Resource()) + if err != nil { + return ts, err + } + } return ts, nil } -func (g *geoIPProcessor) processLogs(_ context.Context, ls plog.Logs) (plog.Logs, error) { +func (g *geoIPProcessor) processLogs(ctx context.Context, ls plog.Logs) (plog.Logs, error) { + rl := ls.ResourceLogs() + for i := 0; i < rl.Len(); i++ { + err := g.processResource(ctx, rl.At(i).Resource()) + if err != nil { + return ls, err + } + } return ls, nil } diff --git a/processor/geoipprocessor/geoip_processor_test.go b/processor/geoipprocessor/geoip_processor_test.go new file mode 100644 index 000000000000..5418228d851c --- /dev/null +++ b/processor/geoipprocessor/geoip_processor_test.go @@ -0,0 +1,216 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package geoipprocessor + +import ( + "context" + "net" + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/otel/attribute" + semconv "go.opentelemetry.io/otel/semconv/v1.25.0" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/plogtest" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/ptracetest" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/geoipprocessor/internal/provider" +) + +type ProviderMock struct { + LocationF func(context.Context, net.IP) (attribute.Set, error) +} + +var _ provider.GeoIPProvider = (*ProviderMock)(nil) + +var baseProviderMock = ProviderMock{ + LocationF: func(context.Context, net.IP) (attribute.Set, error) { + return attribute.Set{}, nil + }, +} + +func (pm *ProviderMock) Location(ctx context.Context, ip net.IP) (attribute.Set, error) { + return pm.LocationF(ctx, ip) +} + +type generateResourceFunc func(res pcommon.Resource) + +func generateTraces(resourceFunc ...generateResourceFunc) ptrace.Traces { + t := ptrace.NewTraces() + rs := t.ResourceSpans().AppendEmpty() + for _, resFun := range resourceFunc { + res := rs.Resource() + resFun(res) + } + span := rs.ScopeSpans().AppendEmpty().Spans().AppendEmpty() + span.SetName("foobar") + return t +} + +func generateMetrics(resourceFunc ...generateResourceFunc) pmetric.Metrics { + m := pmetric.NewMetrics() + ms := m.ResourceMetrics().AppendEmpty() + for _, resFun := range resourceFunc { + res := ms.Resource() + resFun(res) + } + metric := ms.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty() + metric.SetName("foobar") + return m +} + +func generateLogs(resourceFunc ...generateResourceFunc) plog.Logs { + l := plog.NewLogs() + ls := l.ResourceLogs().AppendEmpty() + for _, resFun := range resourceFunc { + res := ls.Resource() + resFun(res) + } + ls.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() + return l +} + +func withAttributes(attributes []attribute.KeyValue) generateResourceFunc { + return func(res pcommon.Resource) { + for _, attribute := range attributes { + res.Attributes().PutStr(string(attribute.Key), attribute.Value.AsString()) + } + } +} + +// TestProcessPdata asserts that the processor adds the corresponding geo location data into the resource attributes if an ip is found +func TestProcessPdata(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + resourceAttributes []attribute.Key + initResourceAttributes []generateResourceFunc + geoLocationMock func(context.Context, net.IP) (attribute.Set, error) + expectedResourceAttributes []generateResourceFunc + }{ + { + name: "default source.ip attribute, not found", + resourceAttributes: defaultResourceAttributes, + initResourceAttributes: []generateResourceFunc{ + withAttributes([]attribute.KeyValue{ + attribute.String("ip", "1.2.3.4"), + }), + }, + geoLocationMock: func(context.Context, net.IP) (attribute.Set, error) { + return attribute.NewSet([]attribute.KeyValue{attribute.String("geo.city_name", "barcelona")}...), nil + }, + expectedResourceAttributes: []generateResourceFunc{ + withAttributes([]attribute.KeyValue{ + attribute.String("ip", "1.2.3.4"), + }), + }, + }, + { + name: "default source.ip attribute", + resourceAttributes: defaultResourceAttributes, + initResourceAttributes: []generateResourceFunc{ + withAttributes([]attribute.KeyValue{ + attribute.String("ip", "1.2.3.4"), + attribute.String(string(semconv.SourceAddressKey), "1.2.3.4"), + }), + }, + geoLocationMock: func(context.Context, net.IP) (attribute.Set, error) { + return attribute.NewSet([]attribute.KeyValue{attribute.String("geo.city_name", "barcelona")}...), nil + }, + expectedResourceAttributes: []generateResourceFunc{ + withAttributes([]attribute.KeyValue{ + attribute.String("ip", "1.2.3.4"), + attribute.String("geo.city_name", "barcelona"), + attribute.String(string(semconv.SourceAddressKey), "1.2.3.4"), + }), + }, + }, + { + name: "custom resource attribute", + resourceAttributes: []attribute.Key{"ip"}, + initResourceAttributes: []generateResourceFunc{ + withAttributes([]attribute.KeyValue{ + attribute.String("ip", "1.2.3.4"), + }), + }, + geoLocationMock: func(context.Context, net.IP) (attribute.Set, error) { + // only one attribute should be added as we are using a set + return attribute.NewSet([]attribute.KeyValue{attribute.String("geo.city_name", "barcelona"), attribute.String("geo.city_name", "barcelona")}...), nil + }, + expectedResourceAttributes: []generateResourceFunc{ + withAttributes([]attribute.KeyValue{ + attribute.String("ip", "1.2.3.4"), + attribute.String("geo.city_name", "barcelona"), + }), + }, + }, + { + name: "custom resource attributes, match second one", + resourceAttributes: []attribute.Key{"ip", "host.ip"}, + initResourceAttributes: []generateResourceFunc{ + withAttributes([]attribute.KeyValue{ + attribute.String("host.ip", "1.2.3.4"), + }), + }, + geoLocationMock: func(_ context.Context, ip net.IP) (attribute.Set, error) { + if ip.Equal(net.IP{1, 2, 3, 4}) { + return attribute.NewSet(attribute.String("geo.city_name", "barcelona")), nil + } + return attribute.Set{}, nil + }, + expectedResourceAttributes: []generateResourceFunc{ + withAttributes([]attribute.KeyValue{ + attribute.String("host.ip", "1.2.3.4"), + attribute.String("geo.city_name", "barcelona"), + }), + }, + }, + { + name: "do not add resource attributes with an invalid ip", + resourceAttributes: defaultResourceAttributes, + initResourceAttributes: []generateResourceFunc{ + withAttributes([]attribute.KeyValue{ + attribute.String(string(semconv.SourceAddressKey), "%"), + }), + }, + geoLocationMock: func(context.Context, net.IP) (attribute.Set, error) { + return attribute.NewSet([]attribute.KeyValue{attribute.String("geo.city_name", "barcelona")}...), nil + }, + expectedResourceAttributes: []generateResourceFunc{ + withAttributes([]attribute.KeyValue{ + attribute.String(string(semconv.SourceAddressKey), "%"), + }), + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // prepare processor + baseProviderMock.LocationF = tt.geoLocationMock + processor := newGeoIPProcessor(tt.resourceAttributes) + processor.providers = []provider.GeoIPProvider{&baseProviderMock} + + // assert metrics + actualMetrics, err := processor.processMetrics(context.Background(), generateMetrics(tt.initResourceAttributes...)) + require.NoError(t, err) + require.NoError(t, pmetrictest.CompareMetrics(generateMetrics(tt.expectedResourceAttributes...), actualMetrics)) + + // assert traces + actualTraces, err := processor.processTraces(context.Background(), generateTraces(tt.initResourceAttributes...)) + require.NoError(t, err) + require.NoError(t, ptracetest.CompareTraces(generateTraces(tt.expectedResourceAttributes...), actualTraces)) + + // assert logs + actualLogs, err := processor.processLogs(context.Background(), generateLogs(tt.initResourceAttributes...)) + require.NoError(t, err) + require.NoError(t, plogtest.CompareLogs(generateLogs(tt.expectedResourceAttributes...), actualLogs)) + }) + } +} diff --git a/processor/geoipprocessor/go.mod b/processor/geoipprocessor/go.mod index abd47d534811..3d04d0383936 100644 --- a/processor/geoipprocessor/go.mod +++ b/processor/geoipprocessor/go.mod @@ -3,12 +3,14 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/processor/geoip go 1.21.0 require ( + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.102.0 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector/component v0.102.0 go.opentelemetry.io/collector/confmap v0.102.0 go.opentelemetry.io/collector/consumer v0.102.0 go.opentelemetry.io/collector/pdata v1.9.0 go.opentelemetry.io/collector/processor v0.102.0 + go.opentelemetry.io/otel v1.27.0 go.uber.org/goleak v1.3.0 ) @@ -29,6 +31,7 @@ require ( github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.102.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_golang v1.19.1 // indirect github.com/prometheus/client_model v0.6.1 // indirect @@ -37,7 +40,6 @@ require ( go.opentelemetry.io/collector v0.102.0 // indirect go.opentelemetry.io/collector/config/configtelemetry v0.102.0 // indirect go.opentelemetry.io/collector/pdata/testdata v0.102.0 // indirect - go.opentelemetry.io/otel v1.27.0 // indirect go.opentelemetry.io/otel/exporters/prometheus v0.49.0 // indirect go.opentelemetry.io/otel/metric v1.27.0 // indirect go.opentelemetry.io/otel/sdk v1.27.0 // indirect @@ -53,3 +55,9 @@ require ( google.golang.org/protobuf v1.34.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden diff --git a/processor/geoipprocessor/internal/provider/geoipprovider.go b/processor/geoipprocessor/internal/provider/geoipprovider.go new file mode 100644 index 000000000000..e75c4d447922 --- /dev/null +++ b/processor/geoipprocessor/internal/provider/geoipprovider.go @@ -0,0 +1,17 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package provider // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/geoipprocessor/internal/provider" + +import ( + "context" + "net" + + "go.opentelemetry.io/otel/attribute" +) + +// GeoIPProvider defines methods for obtaining the geographical location based on the provided IP address. +type GeoIPProvider interface { + // Location returns a set of attributes representing the geographical location for the given IP address. It requires a context for managing request lifetime. + Location(context.Context, net.IP) (attribute.Set, error) +}