From f2b5f50198a17932d334c4286c2faff880b30fe2 Mon Sep 17 00:00:00 2001 From: constanca-m Date: Tue, 10 Oct 2023 09:17:23 +0200 Subject: [PATCH 1/5] Add default host. --- .../composable/providers/kubernetes/hints.go | 110 +++++++-------- .../providers/kubernetes/hints_test.go | 131 ++++++++++++++++++ .../composable/providers/kubernetes/pod.go | 4 - 3 files changed, 179 insertions(+), 66 deletions(-) diff --git a/internal/pkg/composable/providers/kubernetes/hints.go b/internal/pkg/composable/providers/kubernetes/hints.go index 0a439afd5da..934fcaf82e7 100644 --- a/internal/pkg/composable/providers/kubernetes/hints.go +++ b/internal/pkg/composable/providers/kubernetes/hints.go @@ -26,6 +26,9 @@ const ( username = "username" password = "password" stream = "stream" // this is the container stream: stdout/stderr + + hints = "hints" + processors = "processors" ) type hintsBuilder struct { @@ -250,83 +253,66 @@ func GenerateHintsMapping(hints mapstr.M, kubeMeta mapstr.M, logger *logp.Logger return hintsMapping } -// Generates the hints and processor mappings from provided pod annotation map +// GetHintsMapping Generates the hints and processor mappings from provided pod annotation map func GetHintsMapping(k8sMapping map[string]interface{}, logger *logp.Logger, prefix string, cID string) hintsData { hintData := hintsData{ composableMapping: mapstr.M{}, processors: []mapstr.M{}, } - var hints mapstr.M - var containerProcessors []mapstr.M - - if ann, ok := k8sMapping["annotations"]; ok { - annotations, _ := ann.(mapstr.M) - - if containerEntries, err := annotations.GetValue(prefix + ".hints"); err == nil { - entries, ok := containerEntries.(mapstr.M) - if ok && len(entries) > 0 { - for key := range entries { - parts := strings.Split(key, "/") - - if len(parts) > 1 { - if con, ok := k8sMapping["container"]; ok { - containers, ok := con.(mapstr.M) - if ok { - if cname, err := containers.GetValue("name"); err == nil { - if parts[0] == cname { - // If there are hints like co.elastic.hints./ then add the values after the / to the corresponding container - // Eg Annotation "co.elastic.hints.nginx/stream: stderr" will create a hints entry for container nginx - hints, containerProcessors = GenerateHintsForContainer(annotations, parts[0], prefix) - } - } - } - } - } - } - } - } else { - // If there are top level hints like co.elastic.hints/ then just add the values after the / - // Eg. Annotation "co.elastic.hints/stream: stderr" will will create a hints entries for all containers in the pod - hints = utils.GenerateHints(annotations, "", prefix) + + ann, ok := k8sMapping["annotations"] + if !ok { + return hintData + } + annotations, _ := ann.(mapstr.M) + + cName := "" + cHost := "" + // Get the name of the container from the metadata. We need it to extract the hints that affect it directly. + // E.g. co.elastic.hints./host: "..." + if con, ok := k8sMapping["container"]; ok { + containers, _ := con.(mapstr.M) + if name, err := containers.GetValue("name"); err == nil { + cName = name.(string) + } + if cPort, err := containers.GetValue("port"); err == nil { + // This is the default for the host value of a specific container. + cHost = "${kubernetes.pod.ip}:" + cPort.(string) } - logger.Debugf("Extracted hints are :%v", hints) + } - if len(hints) > 0 { - hintData = GenerateHintsResult(hints, k8sMapping, annotations, logger, prefix, cID) + hintsExtracted := utils.GenerateHints(annotations, cName, prefix) + if len(hintsExtracted) == 0 { + return hintData + } - // Only if there are processors defined in a specific container we append them to the processors of the top level - if len(containerProcessors) > 0 { - hintData.processors = append(hintData.processors, containerProcessors...) + // Check if host exists. Otherwise, add default entry for it. + if cHost != "" { + if hintsValues, ok := hintsExtracted[hints].(mapstr.M); ok { + if _, ok := hintsValues[host]; !ok { + hintsValues[host] = cHost + } + } else { + hintsExtracted[hints] = mapstr.M{ + host: cHost, } - logger.Debugf("Generated Processors mapping :%v", hintData.processors) } } - return hintData -} - -// Generates hints and processors list for specific containers -func GenerateHintsForContainer(annotations mapstr.M, parts, prefix string) (mapstr.M, []mapstr.M) { - hints := utils.GenerateHints(annotations, parts, prefix) - // Processors for specific container - // We need to make an extra check if we have processors added only to the specific containers - containerProcessors := utils.GetConfigs(annotations, prefix, "hints."+parts+"/processors") - - return hints, containerProcessors -} + logger.Debugf("Extracted hints are :%v", hintsExtracted) -// Generates the final hintData (hints and processors) struct that will be emitted in pods. -func GenerateHintsResult(hints mapstr.M, k8sMapping map[string]interface{}, annotations mapstr.M, logger *logp.Logger, prefix, cID string) hintsData { - hintData := hintsData{ - composableMapping: mapstr.M{}, - processors: []mapstr.M{}, - } - - hintData.composableMapping = GenerateHintsMapping(hints, k8sMapping, logger, cID) + hintData.composableMapping = GenerateHintsMapping(hintsExtracted, k8sMapping, logger, cID) logger.Debugf("Generated hints mappings :%v", hintData.composableMapping) - // Eg co.elastic.hints/processors.decode_json_fields.fields: "message" will add a processor in all containers of pod - hintData.processors = utils.GetConfigs(annotations, prefix, processorhints) + hintData.processors = utils.GetConfigs(annotations, prefix, hints+"/"+processors) + // We need to check the processors for the specific container, if they exist. + if cName != "" { + containerProcessors := utils.GetConfigs(annotations, prefix, hints+"."+cName+"/"+processors) + if len(containerProcessors) > 0 { + hintData.processors = append(hintData.processors, containerProcessors...) + } + } + logger.Debugf("Generated Processors mapping :%v", hintData.processors) return hintData } diff --git a/internal/pkg/composable/providers/kubernetes/hints_test.go b/internal/pkg/composable/providers/kubernetes/hints_test.go index 17e36b0d7c7..0cc709acaf7 100644 --- a/internal/pkg/composable/providers/kubernetes/hints_test.go +++ b/internal/pkg/composable/providers/kubernetes/hints_test.go @@ -591,3 +591,134 @@ func TestGenerateHintsMappingWithProcessorsForContainer(t *testing.T) { assert.Contains(t, expectedprocesors, hintData.processors[1]) } } + +func TestDefaultHost(t *testing.T) { + logger := getLogger() + cID := "abcd" + + mapping := map[string]interface{}{ + "namespace": "testns", + "pod": mapstr.M{ + "uid": string(types.UID(uid)), + "name": "testpod", + "ip": "127.0.0.5", + }, + "annotations": mapstr.M{ + "app": "production", + "co": mapstr.M{ + "elastic": mapstr.M{ + "hints/package": "redis", + "hints": mapstr.M{ + "redis-1/host": "${kubernetes.pod.ip}:6379", + "redis-1/stream": "stderr", + "redis-2/host": "${kubernetes.pod.ip}:6400", + "redis-4/stream": "stderr", + }, + }, + }, + }, + } + + addContainerMapping := func(mapping map[string]interface{}, container mapstr.M) map[string]interface{} { + clone := make(map[string]interface{}, len(mapping)) + for k, v := range mapping { + clone[k] = v + } + clone["container"] = container + return clone + } + + tests := []struct { + msg string + mapping map[string]interface{} + expected mapstr.M + }{ + { + msg: "Test container with two hints (redis-1), of which one is host.", + mapping: addContainerMapping(mapping, + mapstr.M{ + "name": "redis-1", + "port": "6379", + "id": cID, + }, + ), + expected: mapstr.M{ + "container_id": cID, + "redis": mapstr.M{ + "container_logs": mapstr.M{ + "enabled": true, + }, + "enabled": true, + "host": "127.0.0.5:6379", + "stream": "stderr", + }, + }, + }, + { + msg: "Test container with only one hint for host (redis-2).", + mapping: addContainerMapping(mapping, + mapstr.M{ + "name": "redis-2", + "port": "6400", + "id": cID, + }, + ), + expected: mapstr.M{ + "container_id": cID, + "redis": mapstr.M{ + "container_logs": mapstr.M{ + "enabled": true, + }, + "enabled": true, + "host": "127.0.0.5:6400", + }, + }, + }, + { + msg: "Test container without hints and check for the default host (redis-3).", + mapping: addContainerMapping(mapping, + mapstr.M{ + "name": "redis-3", + "port": "7000", + "id": cID, + }, + ), + expected: mapstr.M{ + "container_id": cID, + "redis": mapstr.M{ + "container_logs": mapstr.M{ + "enabled": true, + }, + "enabled": true, + "host": "127.0.0.5:7000", + }, + }, + }, + { + msg: "Test container with one hint for stream and without port defined (redis-4).", + mapping: addContainerMapping(mapping, + mapstr.M{ + "name": "redis-4", + "id": cID, + }, + ), + expected: mapstr.M{ + "container_id": cID, + "redis": mapstr.M{ + "container_logs": mapstr.M{ + "enabled": true, + }, + "enabled": true, + "stream": "stderr", + }, + }, + }, + } + + for _, test := range tests { + t.Run(test.msg, func(t *testing.T) { + hintData := GetHintsMapping(test.mapping, logger, "co.elastic", cID) + assert.Equal(t, test.expected, hintData.composableMapping) + }) + } +} diff --git a/internal/pkg/composable/providers/kubernetes/pod.go b/internal/pkg/composable/providers/kubernetes/pod.go index 1769e793183..ffe817218e7 100644 --- a/internal/pkg/composable/providers/kubernetes/pod.go +++ b/internal/pkg/composable/providers/kubernetes/pod.go @@ -24,10 +24,6 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/composable" ) -const ( - processorhints = "hints/processors" -) - type pod struct { watcher kubernetes.Watcher nodeWatcher kubernetes.Watcher From 3510b890e4697d3e66a98b2da8d1eee69a590851 Mon Sep 17 00:00:00 2001 From: constanca-m Date: Tue, 10 Oct 2023 13:05:06 +0200 Subject: [PATCH 2/5] Add changelog. --- .../1696935756-add-default-host.yaml | 32 +++++++++++++++++++ .../composable/providers/kubernetes/hints.go | 4 +-- 2 files changed, 34 insertions(+), 2 deletions(-) create mode 100644 changelog/fragments/1696935756-add-default-host.yaml diff --git a/changelog/fragments/1696935756-add-default-host.yaml b/changelog/fragments/1696935756-add-default-host.yaml new file mode 100644 index 00000000000..a497170f3e7 --- /dev/null +++ b/changelog/fragments/1696935756-add-default-host.yaml @@ -0,0 +1,32 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: feature + +# Change summary; a 80ish characters long description of the change. +summary: Hints Autodiscovery for Elastic Agent - Add default host for each container in a pod + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +#description: + +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: elastic-agent + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +pr: https://github.com/elastic/elastic-agent/pull/3575 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +issue: https://github.com/elastic/elastic-agent/issues/1453 diff --git a/internal/pkg/composable/providers/kubernetes/hints.go b/internal/pkg/composable/providers/kubernetes/hints.go index 934fcaf82e7..eac304b1c8e 100644 --- a/internal/pkg/composable/providers/kubernetes/hints.go +++ b/internal/pkg/composable/providers/kubernetes/hints.go @@ -259,6 +259,8 @@ func GetHintsMapping(k8sMapping map[string]interface{}, logger *logp.Logger, pre composableMapping: mapstr.M{}, processors: []mapstr.M{}, } + cName := "" + cHost := "" ann, ok := k8sMapping["annotations"] if !ok { @@ -266,8 +268,6 @@ func GetHintsMapping(k8sMapping map[string]interface{}, logger *logp.Logger, pre } annotations, _ := ann.(mapstr.M) - cName := "" - cHost := "" // Get the name of the container from the metadata. We need it to extract the hints that affect it directly. // E.g. co.elastic.hints./host: "..." if con, ok := k8sMapping["container"]; ok { From 7575e460efad9972ba0eaa95fa4b5e149199f3f4 Mon Sep 17 00:00:00 2001 From: constanca-m Date: Tue, 10 Oct 2023 13:56:10 +0200 Subject: [PATCH 3/5] Update const. --- internal/pkg/composable/providers/kubernetes/hints.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/internal/pkg/composable/providers/kubernetes/hints.go b/internal/pkg/composable/providers/kubernetes/hints.go index eac304b1c8e..837a52a2a32 100644 --- a/internal/pkg/composable/providers/kubernetes/hints.go +++ b/internal/pkg/composable/providers/kubernetes/hints.go @@ -16,9 +16,10 @@ import ( ) const ( + hints = "hints" + integration = "package" datastreams = "data_streams" - host = "host" period = "period" timeout = "timeout" @@ -26,9 +27,7 @@ const ( username = "username" password = "password" stream = "stream" // this is the container stream: stdout/stderr - - hints = "hints" - processors = "processors" + processors = "processors" ) type hintsBuilder struct { From 89dc7d001b62c8e211eef41044badcaacf59af7c Mon Sep 17 00:00:00 2001 From: constanca-m Date: Tue, 10 Oct 2023 14:58:41 +0200 Subject: [PATCH 4/5] Add casting error --- .../composable/providers/kubernetes/hints.go | 29 ++++++++++++------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/internal/pkg/composable/providers/kubernetes/hints.go b/internal/pkg/composable/providers/kubernetes/hints.go index 837a52a2a32..716fce88c19 100644 --- a/internal/pkg/composable/providers/kubernetes/hints.go +++ b/internal/pkg/composable/providers/kubernetes/hints.go @@ -265,18 +265,22 @@ func GetHintsMapping(k8sMapping map[string]interface{}, logger *logp.Logger, pre if !ok { return hintData } - annotations, _ := ann.(mapstr.M) + annotations, ok := ann.(mapstr.M) + if !ok { + return hintData + } // Get the name of the container from the metadata. We need it to extract the hints that affect it directly. // E.g. co.elastic.hints./host: "..." if con, ok := k8sMapping["container"]; ok { - containers, _ := con.(mapstr.M) - if name, err := containers.GetValue("name"); err == nil { - cName = name.(string) - } - if cPort, err := containers.GetValue("port"); err == nil { - // This is the default for the host value of a specific container. - cHost = "${kubernetes.pod.ip}:" + cPort.(string) + if containers, ok := con.(mapstr.M); ok { + if name, err := containers.GetValue("name"); err == nil { + cName = name.(string) + } + if cPort, err := containers.GetValue("port"); err == nil { + // This is the default for the host value of a specific container. + cHost = "${kubernetes.pod.ip}:" + cPort.(string) + } } } @@ -287,9 +291,12 @@ func GetHintsMapping(k8sMapping map[string]interface{}, logger *logp.Logger, pre // Check if host exists. Otherwise, add default entry for it. if cHost != "" { - if hintsValues, ok := hintsExtracted[hints].(mapstr.M); ok { - if _, ok := hintsValues[host]; !ok { - hintsValues[host] = cHost + hintsValues, ok := hintsExtracted[hints] + if ok { + if hintsHostValues, ok := hintsValues.(mapstr.M); ok { + if _, ok := hintsHostValues[host]; !ok { + hintsHostValues[host] = cHost + } } } else { hintsExtracted[hints] = mapstr.M{ From 4772cd4524814683fe9d21b19d506b013b56acd7 Mon Sep 17 00:00:00 2001 From: constanca-m Date: Tue, 10 Oct 2023 15:01:00 +0200 Subject: [PATCH 5/5] Add casting error --- internal/pkg/composable/providers/kubernetes/hints.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/internal/pkg/composable/providers/kubernetes/hints.go b/internal/pkg/composable/providers/kubernetes/hints.go index 716fce88c19..8557007c87e 100644 --- a/internal/pkg/composable/providers/kubernetes/hints.go +++ b/internal/pkg/composable/providers/kubernetes/hints.go @@ -275,11 +275,15 @@ func GetHintsMapping(k8sMapping map[string]interface{}, logger *logp.Logger, pre if con, ok := k8sMapping["container"]; ok { if containers, ok := con.(mapstr.M); ok { if name, err := containers.GetValue("name"); err == nil { - cName = name.(string) + if nameString, ok := name.(string); ok { + cName = nameString + } } if cPort, err := containers.GetValue("port"); err == nil { // This is the default for the host value of a specific container. - cHost = "${kubernetes.pod.ip}:" + cPort.(string) + if portString, ok := cPort.(string); ok { + cHost = "${kubernetes.pod.ip}:" + portString + } } } }