Skip to content

Commit

Permalink
[exporter/loadbalancing] Add return_hostnames option to k8s resolver
Browse files Browse the repository at this point in the history
Resolves 18412
  • Loading branch information
snuggie12 committed Dec 10, 2024
1 parent c808052 commit ca265dc
Show file tree
Hide file tree
Showing 8 changed files with 259 additions and 33 deletions.
27 changes: 27 additions & 0 deletions .chloggen/lbexporter-return-hostnames-k8s-resolver.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: loadbalancingexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Adds a an optional configuration to the k8s resolver which returns hostnames instead of IPs for headless services pointing at statefulsets

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [18412]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
3 changes: 3 additions & 0 deletions exporter/loadbalancingexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ Refer to [config.yaml](./testdata/config.yaml) for detailed examples on using th
* `service` Kubernetes service to resolve, e.g. `lb-svc.lb-ns`. If no namespace is specified, an attempt will be made to infer the namespace for this collector, and if this fails it will fall back to the `default` namespace.
* `ports` port to be used for exporting the traces to the addresses resolved from `service`. If `ports` is not specified, the default port 4317 is used. When multiple ports are specified, two backends are added to the load balancer as if they were at different pods.
* `timeout` resolver timeout in go-Duration format, e.g. `5s`, `1d`, `30m`. If not specified, `1s` will be used.
* `return_hostnames` will return hostnames instead of IPs. This is useful in certain situations like using istio in sidecar mode. To use this feature, the `service` must be a headless `Service`, pointing at a `StatefulSet`, and the `service` must be what is specified under `.spec.serviceName` in the `StatefulSet`.
* The `aws_cloud_map` node accepts the following properties:
* `namespace` The CloudMap namespace where the service is register, e.g. `cloudmap`. If no `namespace` is specified, this will fail to start the Load Balancer exporter.
* `service_name` The name of the service that you specified when you registered the instance, e.g. `otelcollectors`. If no `service_name` is specified, this will fail to start the Load Balancer exporter.
Expand Down Expand Up @@ -231,6 +232,8 @@ service:
```
Kubernetes resolver example (For a more specific example: [example/k8s-resolver](./example/k8s-resolver/README.md))
> [!IMPORTANT]
> The k8s resolver requires proper permissions. See [the full example](./example/k8s-resolver/README.md) for more information.
```yaml
receivers:
Expand Down
7 changes: 4 additions & 3 deletions exporter/loadbalancingexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,10 @@ type DNSResolver struct {

// K8sSvcResolver defines the configuration for the DNS resolver
type K8sSvcResolver struct {
Service string `mapstructure:"service"`
Ports []int32 `mapstructure:"ports"`
Timeout time.Duration `mapstructure:"timeout"`
Service string `mapstructure:"service"`
Ports []int32 `mapstructure:"ports"`
Timeout time.Duration `mapstructure:"timeout"`
ReturnHostnames bool `mapstructure:"return_hostnames"`
}

type AWSCloudMapResolver struct {
Expand Down
1 change: 1 addition & 0 deletions exporter/loadbalancingexporter/loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func newLoadBalancer(logger *zap.Logger, cfg component.Config, factory component
oCfg.Resolver.K8sSvc.Service,
oCfg.Resolver.K8sSvc.Ports,
oCfg.Resolver.K8sSvc.Timeout,
oCfg.Resolver.K8sSvc.ReturnHostnames,
telemetry,
)
if err != nil {
Expand Down
24 changes: 17 additions & 7 deletions exporter/loadbalancingexporter/resolver_k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type k8sResolver struct {

endpoints []string
onChangeCallbacks []func([]string)
returnNames bool

stopCh chan struct{}
updateLock sync.RWMutex
Expand All @@ -75,6 +76,7 @@ func newK8sResolver(clt kubernetes.Interface,
service string,
ports []int32,
timeout time.Duration,
returnNames bool,
tb *metadata.TelemetryBuilder,
) (*k8sResolver, error) {
if len(service) == 0 {
Expand Down Expand Up @@ -115,9 +117,10 @@ func newK8sResolver(clt kubernetes.Interface,

epsStore := &sync.Map{}
h := &handler{
endpoints: epsStore,
logger: logger,
telemetry: tb,
endpoints: epsStore,
logger: logger,
telemetry: tb,
returnNames: returnNames,
}
r := &k8sResolver{
logger: logger,
Expand All @@ -131,6 +134,7 @@ func newK8sResolver(clt kubernetes.Interface,
stopCh: make(chan struct{}),
lwTimeout: timeout,
telemetry: tb,
returnNames: returnNames,
}
h.callback = r.resolve

Expand Down Expand Up @@ -187,13 +191,19 @@ func (r *k8sResolver) resolve(ctx context.Context) ([]string, error) {
defer r.shutdownWg.Done()

var backends []string
r.endpointsStore.Range(func(address, _ any) bool {
addr := address.(string)
var ep string
r.endpointsStore.Range(func(host, _ any) bool {
switch r.returnNames {
case true:
ep = fmt.Sprintf("%s.%s.%s", host, r.svcName, r.svcNs)
default:
ep = host.(string)
}
if len(r.port) == 0 {
backends = append(backends, addr)
backends = append(backends, ep)
} else {
for _, port := range r.port {
backends = append(backends, net.JoinHostPort(addr, strconv.FormatInt(int64(port), 10)))
backends = append(backends, net.JoinHostPort(ep, strconv.FormatInt(int64(port), 10)))
}
}
return true
Expand Down
57 changes: 45 additions & 12 deletions exporter/loadbalancingexporter/resolver_k8s_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,31 @@ import (

var _ cache.ResourceEventHandler = (*handler)(nil)

const (
epMissingHostnamesMsg = "Endpoints object missing hostnames"
)

type handler struct {
endpoints *sync.Map
callback func(ctx context.Context) ([]string, error)
logger *zap.Logger
telemetry *metadata.TelemetryBuilder
endpoints *sync.Map
callback func(ctx context.Context) ([]string, error)
logger *zap.Logger
telemetry *metadata.TelemetryBuilder
returnNames bool
}

func (h handler) OnAdd(obj any, _ bool) {
var endpoints map[string]bool
var ok bool

switch object := obj.(type) {
case *corev1.Endpoints:
endpoints = convertToEndpoints(object)
ok, endpoints = convertToEndpoints(h.returnNames, object)
if !ok {
h.logger.Warn(epMissingHostnamesMsg, zap.Any("obj", obj))
h.telemetry.LoadbalancerNumResolutions.Add(context.Background(), 1, metric.WithAttributeSet(k8sResolverFailureAttrSet))
return
}

default: // unsupported
h.logger.Warn("Got an unexpected Kubernetes data type during the inclusion of a new pods for the service", zap.Any("obj", obj))
h.telemetry.LoadbalancerNumResolutions.Add(context.Background(), 1, metric.WithAttributeSet(k8sResolverFailureAttrSet))
Expand All @@ -56,8 +68,14 @@ func (h handler) OnUpdate(oldObj, newObj any) {
return
}

oldEndpoints := convertToEndpoints(oldEps)
newEndpoints := convertToEndpoints(newEps)
_, oldEndpoints := convertToEndpoints(h.returnNames, oldEps)
hostnameOk, newEndpoints := convertToEndpoints(h.returnNames, newEps)
if !hostnameOk {
h.logger.Warn(epMissingHostnamesMsg, zap.Any("obj", newEps))
h.telemetry.LoadbalancerNumResolutions.Add(context.Background(), 1, metric.WithAttributeSet(k8sResolverFailureAttrSet))
return
}

changed := false

// Iterate through old endpoints and remove those that are not in the new list.
Expand All @@ -80,6 +98,7 @@ func (h handler) OnUpdate(oldObj, newObj any) {
} else {
h.logger.Debug("No changes detected in the endpoints for the service", zap.Any("old", oldEps), zap.Any("new", newEps))
}

default: // unsupported
h.logger.Warn("Got an unexpected Kubernetes data type during the update of the pods for a service", zap.Any("obj", oldObj))
h.telemetry.LoadbalancerNumResolutions.Add(context.Background(), 1, metric.WithAttributeSet(k8sResolverFailureAttrSet))
Expand All @@ -89,13 +108,20 @@ func (h handler) OnUpdate(oldObj, newObj any) {

func (h handler) OnDelete(obj any) {
var endpoints map[string]bool
var ok bool

switch object := obj.(type) {
case *cache.DeletedFinalStateUnknown:
h.OnDelete(object.Obj)
return
case *corev1.Endpoints:
if object != nil {
endpoints = convertToEndpoints(object)
ok, endpoints = convertToEndpoints(h.returnNames, object)
if !ok {
h.logger.Warn(epMissingHostnamesMsg, zap.Any("obj", obj))
h.telemetry.LoadbalancerNumResolutions.Add(context.Background(), 1, metric.WithAttributeSet(k8sResolverFailureAttrSet))
return
}
}
default: // unsupported
h.logger.Warn("Got an unexpected Kubernetes data type during the removal of the pods for a service", zap.Any("obj", obj))
Expand All @@ -110,14 +136,21 @@ func (h handler) OnDelete(obj any) {
}
}

func convertToEndpoints(eps ...*corev1.Endpoints) map[string]bool {
ipAddress := map[string]bool{}
func convertToEndpoints(retNames bool, eps ...*corev1.Endpoints) (bool, map[string]bool) {
res := map[string]bool{}
for _, ep := range eps {
for _, subsets := range ep.Subsets {
for _, addr := range subsets.Addresses {
ipAddress[addr.IP] = true
if retNames {
if addr.Hostname == "" {
return false, nil
}
res[addr.Hostname] = true
} else {
res[addr.IP] = true
}
}
}
}
return ipAddress
return true, res
}
105 changes: 105 additions & 0 deletions exporter/loadbalancingexporter/resolver_k8s_handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package loadbalancingexporter

import (
"testing"

"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestConvertToEndpoints(tst *testing.T) {
// Create dummy Endpoints objects
endpoints1 := &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "test-endpoints-1",
Namespace: "test-namespace",
},
Subsets: []corev1.EndpointSubset{
{
Addresses: []corev1.EndpointAddress{
{
Hostname: "pod-1",
IP: "192.168.10.101",
},
},
},
},
}
endpoints2 := &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "test-endpoints-2",
Namespace: "test-namespace",
},
Subsets: []corev1.EndpointSubset{
{
Addresses: []corev1.EndpointAddress{
{
Hostname: "pod-2",
IP: "192.168.10.102",
},
},
},
},
}
endpoints3 := &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "test-endpoints-3",
Namespace: "test-namespace",
},
Subsets: []corev1.EndpointSubset{
{
Addresses: []corev1.EndpointAddress{
{
IP: "192.168.10.103",
},
},
},
},
}

tests := []struct {
name string
returnNames bool
includedEndpoints []*corev1.Endpoints
expectedEndpoints map[string]bool
wantNil bool
}{
{
name: "return hostnames",
returnNames: true,
includedEndpoints: []*corev1.Endpoints{endpoints1, endpoints2},
expectedEndpoints: map[string]bool{"pod-1": true, "pod-2": true},
wantNil: false,
},
{
name: "return IPs",
returnNames: false,
includedEndpoints: []*corev1.Endpoints{endpoints1, endpoints2, endpoints3},
expectedEndpoints: map[string]bool{"192.168.10.101": true, "192.168.10.102": true, "192.168.10.103": true},
wantNil: false,
},
{
name: "missing hostname",
returnNames: true,
includedEndpoints: []*corev1.Endpoints{endpoints1, endpoints3},
expectedEndpoints: nil,
wantNil: true,
},
}

for _, tt := range tests {
tst.Run(tt.name, func(tst *testing.T) {
ok, res := convertToEndpoints(tt.returnNames, tt.includedEndpoints...)
if tt.wantNil {
assert.Nil(tst, res)
} else {
assert.Equal(tst, tt.expectedEndpoints, res)
}
assert.Equal(tst, !tt.wantNil, ok)
})
}
}
Loading

0 comments on commit ca265dc

Please sign in to comment.