Skip to content

Commit

Permalink
[exporter/loadbalancing] Add return_hostnames option to k8s resolver
Browse files Browse the repository at this point in the history
Resolves 18412
  • Loading branch information
snuggie12 committed Oct 21, 2024
1 parent 79926f8 commit 21cf2b2
Show file tree
Hide file tree
Showing 8 changed files with 256 additions and 21 deletions.
27 changes: 27 additions & 0 deletions .chloggen/lbexporter-return-hostnames-k8s-resolver.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: loadbalancingexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Adds a an optional configuration to the k8s resolver which returns hostnames instead of IPs for headless services pointing at statefulsets

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: ["18412"]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
7 changes: 5 additions & 2 deletions exporter/loadbalancingexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ Refer to [config.yaml](./testdata/config.yaml) for detailed examples on using th
* `service` Kubernetes service to resolve, e.g. `lb-svc.lb-ns`. If no namespace is specified, an attempt will be made to infer the namespace for this collector, and if this fails it will fall back to the `default` namespace.
* `ports` port to be used for exporting the traces to the addresses resolved from `service`. If `ports` is not specified, the default port 4317 is used. When multiple ports are specified, two backends are added to the load balancer as if they were at different pods.
* `timeout` resolver timeout in go-Duration format, e.g. `5s`, `1d`, `30m`. If not specified, `1s` will be used.
* `return_hostnames` will return hostnames instead of IPs. This is useful in certain situations like using istio in sidecar mode. To use this feature, the `service` must be a headless `Service`, pointing at a `StatefulSet`, and the `service` must be what is specified under `.spec.serviceName` in the `StatefulSet`.
* The `aws_cloud_map` node accepts the following properties:
* `namespace` The CloudMap namespace where the service is register, e.g. `cloudmap`. If no `namespace` is specified, this will fail to start the Load Balancer exporter.
* `service_name` The name of the service that you specified when you registered the instance, e.g. `otelcollectors`. If no `service_name` is specified, this will fail to start the Load Balancer exporter.
Expand Down Expand Up @@ -116,9 +117,9 @@ exporters:
- backend-2:4317
- backend-3:4317
- backend-4:4317
# Notice to config a headless service DNS in Kubernetes
# Notice to config a headless service DNS in Kubernetes
# dns:
# hostname: otelcol-headless.observability.svc.cluster.local
# hostname: otelcol-headless.observability.svc.cluster.local

service:
pipelines:
Expand All @@ -137,6 +138,8 @@ service:
```
Kubernetes resolver example (For a more specific example: [example/k8s-resolver](./example/k8s-resolver/README.md))
> [!IMPORTANT]
> The k8s resolver requires proper permissions. See [the full example](./example/k8s-resolver/README.md) for more information.
```yaml
receivers:
Expand Down
1 change: 1 addition & 0 deletions exporter/loadbalancingexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type K8sSvcResolver struct {
Service string `mapstructure:"service"`
Ports []int32 `mapstructure:"ports"`
Timeout time.Duration `mapstructure:"timeout"`
ReturnHostnames bool `mapstructure:"return_hostnames"`
}

type AWSCloudMapResolver struct {
Expand Down
1 change: 1 addition & 0 deletions exporter/loadbalancingexporter/loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func newLoadBalancer(logger *zap.Logger, cfg component.Config, factory component
oCfg.Resolver.K8sSvc.Service,
oCfg.Resolver.K8sSvc.Ports,
oCfg.Resolver.K8sSvc.Timeout,
oCfg.Resolver.K8sSvc.ReturnHostnames,
telemetry,
)
if err != nil {
Expand Down
18 changes: 14 additions & 4 deletions exporter/loadbalancingexporter/resolver_k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type k8sResolver struct {

endpoints []string
onChangeCallbacks []func([]string)
returnNames bool

stopCh chan struct{}
updateLock sync.RWMutex
Expand All @@ -75,6 +76,7 @@ func newK8sResolver(clt kubernetes.Interface,
service string,
ports []int32,
timeout time.Duration,
returnNames bool,
tb *metadata.TelemetryBuilder,
) (*k8sResolver, error) {

Expand Down Expand Up @@ -119,6 +121,7 @@ func newK8sResolver(clt kubernetes.Interface,
endpoints: epsStore,
logger: logger,
telemetry: tb,
returnNames: returnNames,
}
r := &k8sResolver{
logger: logger,
Expand All @@ -132,6 +135,7 @@ func newK8sResolver(clt kubernetes.Interface,
stopCh: make(chan struct{}),
lwTimeout: timeout,
telemetry: tb,
returnNames: returnNames,
}
h.callback = r.resolve

Expand Down Expand Up @@ -187,13 +191,19 @@ func (r *k8sResolver) resolve(ctx context.Context) ([]string, error) {
defer r.shutdownWg.Done()

var backends []string
r.endpointsStore.Range(func(address, _ any) bool {
addr := address.(string)
var ep string
r.endpointsStore.Range(func(host, _ any) bool {
switch r.returnNames {
case true:
ep = fmt.Sprintf("%s.%s.%s", host, r.svcName, r.svcNs)
default:
ep = host.(string)
}
if len(r.port) == 0 {
backends = append(backends, addr)
backends = append(backends, ep)
} else {
for _, port := range r.port {
backends = append(backends, net.JoinHostPort(addr, strconv.FormatInt(int64(port), 10)))
backends = append(backends, net.JoinHostPort(ep, strconv.FormatInt(int64(port), 10)))
}
}
return true
Expand Down
56 changes: 48 additions & 8 deletions exporter/loadbalancingexporter/resolver_k8s_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,31 @@ import (

var _ cache.ResourceEventHandler = (*handler)(nil)

const (
epMissingHostnamesMsg = "Endpoints object missing hostnames"
)

type handler struct {
endpoints *sync.Map
callback func(ctx context.Context) ([]string, error)
logger *zap.Logger
telemetry *metadata.TelemetryBuilder
returnNames bool
}

func (h handler) OnAdd(obj any, _ bool) {
var endpoints []string
var ok bool

switch object := obj.(type) {
case *corev1.Endpoints:
endpoints = convertToEndpoints(object)
ok, endpoints = convertToEndpoints(h.returnNames, object)
if !ok {
h.logger.Warn(epMissingHostnamesMsg, zap.Any("obj", obj))
h.telemetry.LoadbalancerNumResolutions.Add(context.Background(), 1, metric.WithAttributeSet(k8sResolverFailureAttrSet))
return
}

default: // unsupported
h.logger.Warn("Got an unexpected Kubernetes data type during the inclusion of a new pods for the service", zap.Any("obj", obj))
h.telemetry.LoadbalancerNumResolutions.Add(context.Background(), 1, metric.WithAttributeSet(k8sResolverFailureAttrSet))
Expand All @@ -49,7 +61,13 @@ func (h handler) OnAdd(obj any, _ bool) {
func (h handler) OnUpdate(oldObj, newObj any) {
switch oldEps := oldObj.(type) {
case *corev1.Endpoints:
epRemove := convertToEndpoints(oldEps)
hostnameOk, epRemove := convertToEndpoints(h.returnNames, oldEps)
if !hostnameOk {
h.logger.Warn(epMissingHostnamesMsg, zap.Any("obj", oldObj))
h.telemetry.LoadbalancerNumResolutions.Add(context.Background(), 1, metric.WithAttributeSet(k8sResolverFailureAttrSet))
return
}

for _, ep := range epRemove {
h.endpoints.Delete(ep)
}
Expand All @@ -63,8 +81,16 @@ func (h handler) OnUpdate(oldObj, newObj any) {
h.telemetry.LoadbalancerNumResolutions.Add(context.Background(), 1, metric.WithAttributeSet(k8sResolverFailureAttrSet))
return
}

hostnameOk, epUpdate := convertToEndpoints(h.returnNames, newEps)
if !hostnameOk {
h.logger.Warn(epMissingHostnamesMsg, zap.Any("obj", newEps))
h.telemetry.LoadbalancerNumResolutions.Add(context.Background(), 1, metric.WithAttributeSet(k8sResolverFailureAttrSet))
return
}

changed := false
for _, ep := range convertToEndpoints(newEps) {
for _, ep := range epUpdate {
if _, loaded := h.endpoints.LoadOrStore(ep, true); !loaded {
changed = true
}
Expand All @@ -81,13 +107,20 @@ func (h handler) OnUpdate(oldObj, newObj any) {

func (h handler) OnDelete(obj any) {
var endpoints []string
var ok bool

switch object := obj.(type) {
case *cache.DeletedFinalStateUnknown:
h.OnDelete(object.Obj)
return
case *corev1.Endpoints:
if object != nil {
endpoints = convertToEndpoints(object)
ok, endpoints = convertToEndpoints(h.returnNames, object)
if !ok {
h.logger.Warn(epMissingHostnamesMsg, zap.Any("obj", obj))
h.telemetry.LoadbalancerNumResolutions.Add(context.Background(), 1, metric.WithAttributeSet(k8sResolverFailureAttrSet))
return
}
}
default: // unsupported
h.logger.Warn("Got an unexpected Kubernetes data type during the removal of the pods for a service", zap.Any("obj", obj))
Expand All @@ -102,14 +135,21 @@ func (h handler) OnDelete(obj any) {
}
}

func convertToEndpoints(eps ...*corev1.Endpoints) []string {
var ipAddress []string
func convertToEndpoints(retNames bool, eps ...*corev1.Endpoints) (bool, []string) {
var res []string
for _, ep := range eps {
for _, subsets := range ep.Subsets {
for _, addr := range subsets.Addresses {
ipAddress = append(ipAddress, addr.IP)
if retNames {
if addr.Hostname == "" {
return false, nil
}
res = append(res, addr.Hostname)
} else {
res = append(res, addr.IP)
}
}
}
}
return ipAddress
return true, res
}
106 changes: 106 additions & 0 deletions exporter/loadbalancingexporter/resolver_k8s_handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// FILEPATH: /Users/matt.hoey/ml/tp-opentelemetry-collector-contrib/exporter/loadbalancingexporter/resolver_k8s_handler_test.go

package loadbalancingexporter

import (
"testing"

"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)


func TestConvertToEndpoints(tst *testing.T) {
// Create dummy Endpoints objects
endpoints1 := &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "test-endpoints-1",
Namespace: "test-namespace",
},
Subsets: []corev1.EndpointSubset{
{
Addresses: []corev1.EndpointAddress{
{
Hostname: "pod-1",
IP: "192.168.10.101",
},
},
},
},
}
endpoints2 := &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "test-endpoints-2",
Namespace: "test-namespace",
},
Subsets: []corev1.EndpointSubset{
{
Addresses: []corev1.EndpointAddress{
{
Hostname: "pod-2",
IP: "192.168.10.102",
},
},
},
},
}
endpoints3 := &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "test-endpoints-3",
Namespace: "test-namespace",
},
Subsets: []corev1.EndpointSubset{
{
Addresses: []corev1.EndpointAddress{
{
IP: "192.168.10.103",
},
},
},
},
}

tests := []struct {
name string
returnNames bool
includedEndpoints []*corev1.Endpoints
expectedEndpoints []string
wantNil bool
}{
{
name: "return hostnames",
returnNames: true,
includedEndpoints: []*corev1.Endpoints{endpoints1, endpoints2},
expectedEndpoints: []string{"pod-1", "pod-2"},
wantNil: false,
},
{
name: "return IPs",
returnNames: false,
includedEndpoints: []*corev1.Endpoints{endpoints1, endpoints2, endpoints3},
expectedEndpoints: []string{"192.168.10.101", "192.168.10.102", "192.168.10.103"},
wantNil: false,
},
{
name: "missing hostname",
returnNames: true,
includedEndpoints: []*corev1.Endpoints{endpoints1, endpoints3},
expectedEndpoints: nil,
wantNil: true,
},
}

for _, tt := range tests {
tst.Run(tt.name, func(tst *testing.T) {
ok, res := convertToEndpoints(tt.returnNames, tt.includedEndpoints...)
if tt.wantNil {
assert.Nil(tst, res)
} else {
assert.Equal(tst, tt.expectedEndpoints, res)
}
assert.Equal(tst, !tt.wantNil, ok)
})
}

}
Loading

0 comments on commit 21cf2b2

Please sign in to comment.