diff --git a/.chloggen/lbexporter-return-hostnames-k8s-resolver.yaml b/.chloggen/lbexporter-return-hostnames-k8s-resolver.yaml new file mode 100644 index 000000000000..c8bd302960f2 --- /dev/null +++ b/.chloggen/lbexporter-return-hostnames-k8s-resolver.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: loadbalancingexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Adds a an optional configuration to the k8s resolver which returns hostnames instead of IPs for headless services pointing at statefulsets + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [18412] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/exporter/loadbalancingexporter/README.md b/exporter/loadbalancingexporter/README.md index 5df7812fb204..a6d5e8606d8e 100644 --- a/exporter/loadbalancingexporter/README.md +++ b/exporter/loadbalancingexporter/README.md @@ -96,6 +96,7 @@ Refer to [config.yaml](./testdata/config.yaml) for detailed examples on using th * `service` Kubernetes service to resolve, e.g. `lb-svc.lb-ns`. If no namespace is specified, an attempt will be made to infer the namespace for this collector, and if this fails it will fall back to the `default` namespace. * `ports` port to be used for exporting the traces to the addresses resolved from `service`. If `ports` is not specified, the default port 4317 is used. When multiple ports are specified, two backends are added to the load balancer as if they were at different pods. * `timeout` resolver timeout in go-Duration format, e.g. `5s`, `1d`, `30m`. If not specified, `1s` will be used. + * `return_hostnames` will return hostnames instead of IPs. This is useful in certain situations like using istio in sidecar mode. To use this feature, the `service` must be a headless `Service`, pointing at a `StatefulSet`, and the `service` must be what is specified under `.spec.serviceName` in the `StatefulSet`. * The `aws_cloud_map` node accepts the following properties: * `namespace` The CloudMap namespace where the service is register, e.g. `cloudmap`. If no `namespace` is specified, this will fail to start the Load Balancer exporter. * `service_name` The name of the service that you specified when you registered the instance, e.g. `otelcollectors`. If no `service_name` is specified, this will fail to start the Load Balancer exporter. @@ -231,6 +232,8 @@ service: ``` Kubernetes resolver example (For a more specific example: [example/k8s-resolver](./example/k8s-resolver/README.md)) +> [!IMPORTANT] +> The k8s resolver requires proper permissions. See [the full example](./example/k8s-resolver/README.md) for more information. ```yaml receivers: diff --git a/exporter/loadbalancingexporter/config.go b/exporter/loadbalancingexporter/config.go index b9682df16892..cee37b4d14fe 100644 --- a/exporter/loadbalancingexporter/config.go +++ b/exporter/loadbalancingexporter/config.go @@ -69,9 +69,10 @@ type DNSResolver struct { // K8sSvcResolver defines the configuration for the DNS resolver type K8sSvcResolver struct { - Service string `mapstructure:"service"` - Ports []int32 `mapstructure:"ports"` - Timeout time.Duration `mapstructure:"timeout"` + Service string `mapstructure:"service"` + Ports []int32 `mapstructure:"ports"` + Timeout time.Duration `mapstructure:"timeout"` + ReturnHostnames bool `mapstructure:"return_hostnames"` } type AWSCloudMapResolver struct { diff --git a/exporter/loadbalancingexporter/loadbalancer.go b/exporter/loadbalancingexporter/loadbalancer.go index 12ef0b5025fa..c14ad3d91183 100644 --- a/exporter/loadbalancingexporter/loadbalancer.go +++ b/exporter/loadbalancingexporter/loadbalancer.go @@ -102,6 +102,7 @@ func newLoadBalancer(logger *zap.Logger, cfg component.Config, factory component oCfg.Resolver.K8sSvc.Service, oCfg.Resolver.K8sSvc.Ports, oCfg.Resolver.K8sSvc.Timeout, + oCfg.Resolver.K8sSvc.ReturnHostnames, telemetry, ) if err != nil { diff --git a/exporter/loadbalancingexporter/resolver_k8s.go b/exporter/loadbalancingexporter/resolver_k8s.go index 87940260a880..040eb8120814 100644 --- a/exporter/loadbalancingexporter/resolver_k8s.go +++ b/exporter/loadbalancingexporter/resolver_k8s.go @@ -61,6 +61,7 @@ type k8sResolver struct { endpoints []string onChangeCallbacks []func([]string) + returnNames bool stopCh chan struct{} updateLock sync.RWMutex @@ -75,6 +76,7 @@ func newK8sResolver(clt kubernetes.Interface, service string, ports []int32, timeout time.Duration, + returnNames bool, tb *metadata.TelemetryBuilder, ) (*k8sResolver, error) { if len(service) == 0 { @@ -115,9 +117,10 @@ func newK8sResolver(clt kubernetes.Interface, epsStore := &sync.Map{} h := &handler{ - endpoints: epsStore, - logger: logger, - telemetry: tb, + endpoints: epsStore, + logger: logger, + telemetry: tb, + returnNames: returnNames, } r := &k8sResolver{ logger: logger, @@ -131,6 +134,7 @@ func newK8sResolver(clt kubernetes.Interface, stopCh: make(chan struct{}), lwTimeout: timeout, telemetry: tb, + returnNames: returnNames, } h.callback = r.resolve @@ -187,13 +191,19 @@ func (r *k8sResolver) resolve(ctx context.Context) ([]string, error) { defer r.shutdownWg.Done() var backends []string - r.endpointsStore.Range(func(address, _ any) bool { - addr := address.(string) + var ep string + r.endpointsStore.Range(func(host, _ any) bool { + switch r.returnNames { + case true: + ep = fmt.Sprintf("%s.%s.%s", host, r.svcName, r.svcNs) + default: + ep = host.(string) + } if len(r.port) == 0 { - backends = append(backends, addr) + backends = append(backends, ep) } else { for _, port := range r.port { - backends = append(backends, net.JoinHostPort(addr, strconv.FormatInt(int64(port), 10))) + backends = append(backends, net.JoinHostPort(ep, strconv.FormatInt(int64(port), 10))) } } return true diff --git a/exporter/loadbalancingexporter/resolver_k8s_handler.go b/exporter/loadbalancingexporter/resolver_k8s_handler.go index 186111eba4d5..895c7fbfca2c 100644 --- a/exporter/loadbalancingexporter/resolver_k8s_handler.go +++ b/exporter/loadbalancingexporter/resolver_k8s_handler.go @@ -17,19 +17,31 @@ import ( var _ cache.ResourceEventHandler = (*handler)(nil) +const ( + epMissingHostnamesMsg = "Endpoints object missing hostnames" +) + type handler struct { - endpoints *sync.Map - callback func(ctx context.Context) ([]string, error) - logger *zap.Logger - telemetry *metadata.TelemetryBuilder + endpoints *sync.Map + callback func(ctx context.Context) ([]string, error) + logger *zap.Logger + telemetry *metadata.TelemetryBuilder + returnNames bool } func (h handler) OnAdd(obj any, _ bool) { var endpoints map[string]bool + var ok bool switch object := obj.(type) { case *corev1.Endpoints: - endpoints = convertToEndpoints(object) + ok, endpoints = convertToEndpoints(h.returnNames, object) + if !ok { + h.logger.Warn(epMissingHostnamesMsg, zap.Any("obj", obj)) + h.telemetry.LoadbalancerNumResolutions.Add(context.Background(), 1, metric.WithAttributeSet(k8sResolverFailureAttrSet)) + return + } + default: // unsupported h.logger.Warn("Got an unexpected Kubernetes data type during the inclusion of a new pods for the service", zap.Any("obj", obj)) h.telemetry.LoadbalancerNumResolutions.Add(context.Background(), 1, metric.WithAttributeSet(k8sResolverFailureAttrSet)) @@ -56,8 +68,14 @@ func (h handler) OnUpdate(oldObj, newObj any) { return } - oldEndpoints := convertToEndpoints(oldEps) - newEndpoints := convertToEndpoints(newEps) + _, oldEndpoints := convertToEndpoints(h.returnNames, oldEps) + hostnameOk, newEndpoints := convertToEndpoints(h.returnNames, newEps) + if !hostnameOk { + h.logger.Warn(epMissingHostnamesMsg, zap.Any("obj", newEps)) + h.telemetry.LoadbalancerNumResolutions.Add(context.Background(), 1, metric.WithAttributeSet(k8sResolverFailureAttrSet)) + return + } + changed := false // Iterate through old endpoints and remove those that are not in the new list. @@ -80,6 +98,7 @@ func (h handler) OnUpdate(oldObj, newObj any) { } else { h.logger.Debug("No changes detected in the endpoints for the service", zap.Any("old", oldEps), zap.Any("new", newEps)) } + default: // unsupported h.logger.Warn("Got an unexpected Kubernetes data type during the update of the pods for a service", zap.Any("obj", oldObj)) h.telemetry.LoadbalancerNumResolutions.Add(context.Background(), 1, metric.WithAttributeSet(k8sResolverFailureAttrSet)) @@ -89,13 +108,20 @@ func (h handler) OnUpdate(oldObj, newObj any) { func (h handler) OnDelete(obj any) { var endpoints map[string]bool + var ok bool + switch object := obj.(type) { case *cache.DeletedFinalStateUnknown: h.OnDelete(object.Obj) return case *corev1.Endpoints: if object != nil { - endpoints = convertToEndpoints(object) + ok, endpoints = convertToEndpoints(h.returnNames, object) + if !ok { + h.logger.Warn(epMissingHostnamesMsg, zap.Any("obj", obj)) + h.telemetry.LoadbalancerNumResolutions.Add(context.Background(), 1, metric.WithAttributeSet(k8sResolverFailureAttrSet)) + return + } } default: // unsupported h.logger.Warn("Got an unexpected Kubernetes data type during the removal of the pods for a service", zap.Any("obj", obj)) @@ -110,14 +136,21 @@ func (h handler) OnDelete(obj any) { } } -func convertToEndpoints(eps ...*corev1.Endpoints) map[string]bool { - ipAddress := map[string]bool{} +func convertToEndpoints(retNames bool, eps ...*corev1.Endpoints) (bool, map[string]bool) { + res := map[string]bool{} for _, ep := range eps { for _, subsets := range ep.Subsets { for _, addr := range subsets.Addresses { - ipAddress[addr.IP] = true + if retNames { + if addr.Hostname == "" { + return false, nil + } + res[addr.Hostname] = true + } else { + res[addr.IP] = true + } } } } - return ipAddress + return true, res } diff --git a/exporter/loadbalancingexporter/resolver_k8s_handler_test.go b/exporter/loadbalancingexporter/resolver_k8s_handler_test.go new file mode 100644 index 000000000000..9a7a6011c30d --- /dev/null +++ b/exporter/loadbalancingexporter/resolver_k8s_handler_test.go @@ -0,0 +1,106 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package loadbalancingexporter + +import ( + "testing" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestConvertToEndpoints(tst *testing.T) { + // Create dummy Endpoints objects + endpoints1 := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-endpoints-1", + Namespace: "test-namespace", + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + { + Hostname: "pod-1", + IP: "192.168.10.101", + }, + }, + }, + }, + } + endpoints2 := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-endpoints-2", + Namespace: "test-namespace", + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + { + Hostname: "pod-2", + IP: "192.168.10.102", + }, + }, + }, + }, + } + endpoints3 := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-endpoints-3", + Namespace: "test-namespace", + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + { + IP: "192.168.10.103", + }, + }, + }, + }, + } + + tests := []struct { + name string + returnNames bool + includedEndpoints []*corev1.Endpoints + expectedEndpoints map[string]bool + wantNil bool + }{ + { + name: "return hostnames", + returnNames: true, + includedEndpoints: []*corev1.Endpoints{endpoints1, endpoints2}, + expectedEndpoints: map[string]bool{"pod-1": true, "pod-2": true}, + wantNil: false, + }, + { + name: "return IPs", + returnNames: false, + includedEndpoints: []*corev1.Endpoints{endpoints1, endpoints2, endpoints3}, + expectedEndpoints: map[string]bool{"192.168.10.101": true, "192.168.10.102": true, "192.168.10.103": true}, + wantNil: false, + }, + { + name: "missing hostname", + returnNames: true, + includedEndpoints: []*corev1.Endpoints{endpoints1, endpoints3}, + expectedEndpoints: nil, + wantNil: true, + }, + } + + for _, tt := range tests { + tst.Run(tt.name, func(tst *testing.T) { + ok, res := convertToEndpoints(tt.returnNames, tt.includedEndpoints...) + if tt.wantNil { + assert.Nil(tst, res) + } else { + assert.Equal(tst, tt.expectedEndpoints, res) + } + assert.Equal(tst, !tt.wantNil, ok) + }) + } + +} diff --git a/exporter/loadbalancingexporter/resolver_k8s_test.go b/exporter/loadbalancingexporter/resolver_k8s_test.go index 5a4e77dd593b..671c7447bf1e 100644 --- a/exporter/loadbalancingexporter/resolver_k8s_test.go +++ b/exporter/loadbalancingexporter/resolver_k8s_test.go @@ -21,10 +21,11 @@ import ( func TestK8sResolve(t *testing.T) { type args struct { - logger *zap.Logger - service string - ports []int32 - namespace string + logger *zap.Logger + service string + ports []int32 + namespace string + returnHostnames bool } type suiteContext struct { endpoint *corev1.Endpoints @@ -32,7 +33,7 @@ func TestK8sResolve(t *testing.T) { resolver *k8sResolver } setupSuite := func(t *testing.T, args args) (*suiteContext, func(*testing.T)) { - service, defaultNs, ports := args.service, args.namespace, args.ports + service, defaultNs, ports, returnHostnames := args.service, args.namespace, args.ports, args.returnHostnames endpoint := &corev1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: service, @@ -41,7 +42,10 @@ func TestK8sResolve(t *testing.T) { Subsets: []corev1.EndpointSubset{ { Addresses: []corev1.EndpointAddress{ - {IP: "192.168.10.100"}, + { + Hostname: "pod-0", + IP: "192.168.10.100", + }, }, }, }, @@ -50,14 +54,18 @@ func TestK8sResolve(t *testing.T) { for _, subset := range endpoint.Subsets { for _, address := range subset.Addresses { for _, port := range args.ports { - expectInit = append(expectInit, fmt.Sprintf("%s:%d", address.IP, port)) + if returnHostnames { + expectInit = append(expectInit, fmt.Sprintf("%s.%s.%s:%d", address.Hostname, service, defaultNs, port)) + } else { + expectInit = append(expectInit, fmt.Sprintf("%s:%d", address.IP, port)) + } } } } cl := fake.NewSimpleClientset(endpoint) _, tb := getTelemetryAssets(t) - res, err := newK8sResolver(cl, zap.NewNop(), service, ports, defaultListWatchTimeout, tb) + res, err := newK8sResolver(cl, zap.NewNop(), service, ports, defaultListWatchTimeout, returnHostnames, tb) require.NoError(t, err) require.NoError(t, res.start(context.Background())) @@ -81,7 +89,7 @@ func TestK8sResolve(t *testing.T) { verifyFn func(*suiteContext, args) error }{ { - name: "simulate append the backend ip address", + name: "add new IP to existing backends", args: args{ logger: zap.NewNop(), service: "lb", @@ -153,7 +161,46 @@ func TestK8sResolve(t *testing.T) { }, }, { - name: "simulate change the backend ip address", + name: "add new hostname to existing backends", + args: args{ + logger: zap.NewNop(), + service: "lb", + namespace: "default", + ports: []int32{8080, 9090}, + returnHostnames: true, + }, + simulateFn: func(suiteCtx *suiteContext, args args) error { + endpoint, exist := suiteCtx.endpoint.DeepCopy(), suiteCtx.endpoint.DeepCopy() + endpoint.Subsets = append(endpoint.Subsets, corev1.EndpointSubset{ + Addresses: []corev1.EndpointAddress{{IP: "10.10.0.11", Hostname: "pod-1"}}, + }) + patch := client.MergeFrom(exist) + data, err := patch.Data(endpoint) + if err != nil { + return err + } + _, err = suiteCtx.clientset.CoreV1().Endpoints(args.namespace). + Patch(context.TODO(), args.service, types.MergePatchType, data, metav1.PatchOptions{}) + return err + + }, + verifyFn: func(ctx *suiteContext, _ args) error { + if _, err := ctx.resolver.resolve(context.Background()); err != nil { + return err + } + + assert.Equal(t, []string{ + "pod-0.lb.default:8080", + "pod-0.lb.default:9090", + "pod-1.lb.default:8080", + "pod-1.lb.default:9090", + }, ctx.resolver.Endpoints(), "resolver failed, endpoints not equal") + + return nil + }, + }, + { + name: "change existing backend ip address", args: args{ logger: zap.NewNop(), service: "lb", @@ -281,7 +328,7 @@ func Test_newK8sResolver(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { _, tb := getTelemetryAssets(t) - got, err := newK8sResolver(fake.NewSimpleClientset(), tt.args.logger, tt.args.service, tt.args.ports, defaultListWatchTimeout, tb) + got, err := newK8sResolver(fake.NewSimpleClientset(), tt.args.logger, tt.args.service, tt.args.ports, defaultListWatchTimeout, false, tb) if tt.wantErr != nil { require.ErrorIs(t, err, tt.wantErr) } else {