Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Hints Support] Add default host #3575

Merged
merged 5 commits into from
Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions changelog/fragments/1696935756-add-default-host.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: feature

# Change summary; a 80ish characters long description of the change.
summary: Hints Autodiscovery for Elastic Agent - Add default host for each container in a pod

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
#description:

# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component: elastic-agent

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: https://github.com/elastic/elastic-agent/pull/3575

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: https://github.com/elastic/elastic-agent/issues/1453
110 changes: 48 additions & 62 deletions internal/pkg/composable/providers/kubernetes/hints.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ const (
username = "username"
password = "password"
stream = "stream" // this is the container stream: stdout/stderr

hints = "hints"
constanca-m marked this conversation as resolved.
Show resolved Hide resolved
processors = "processors"
)

type hintsBuilder struct {
Expand Down Expand Up @@ -250,83 +253,66 @@ func GenerateHintsMapping(hints mapstr.M, kubeMeta mapstr.M, logger *logp.Logger
return hintsMapping
}

// Generates the hints and processor mappings from provided pod annotation map
// GetHintsMapping Generates the hints and processor mappings from provided pod annotation map
func GetHintsMapping(k8sMapping map[string]interface{}, logger *logp.Logger, prefix string, cID string) hintsData {
hintData := hintsData{
composableMapping: mapstr.M{},
processors: []mapstr.M{},
}
var hints mapstr.M
var containerProcessors []mapstr.M

if ann, ok := k8sMapping["annotations"]; ok {
annotations, _ := ann.(mapstr.M)

if containerEntries, err := annotations.GetValue(prefix + ".hints"); err == nil {
entries, ok := containerEntries.(mapstr.M)
if ok && len(entries) > 0 {
for key := range entries {
parts := strings.Split(key, "/")

if len(parts) > 1 {
if con, ok := k8sMapping["container"]; ok {
containers, ok := con.(mapstr.M)
if ok {
if cname, err := containers.GetValue("name"); err == nil {
if parts[0] == cname {
// If there are hints like co.elastic.hints.<container_name>/ then add the values after the / to the corresponding container
// Eg Annotation "co.elastic.hints.nginx/stream: stderr" will create a hints entry for container nginx
hints, containerProcessors = GenerateHintsForContainer(annotations, parts[0], prefix)
}
}
}
}
}
}
}
} else {
// If there are top level hints like co.elastic.hints/ then just add the values after the /
// Eg. Annotation "co.elastic.hints/stream: stderr" will will create a hints entries for all containers in the pod
hints = utils.GenerateHints(annotations, "", prefix)
cName := ""
cHost := ""

ann, ok := k8sMapping["annotations"]
if !ok {
return hintData
}
annotations, _ := ann.(mapstr.M)
constanca-m marked this conversation as resolved.
Show resolved Hide resolved

// Get the name of the container from the metadata. We need it to extract the hints that affect it directly.
// E.g. co.elastic.hints.<container-name>/host: "..."
if con, ok := k8sMapping["container"]; ok {
containers, _ := con.(mapstr.M)
if name, err := containers.GetValue("name"); err == nil {
cName = name.(string)
}
if cPort, err := containers.GetValue("port"); err == nil {
// This is the default for the host value of a specific container.
cHost = "${kubernetes.pod.ip}:" + cPort.(string)
constanca-m marked this conversation as resolved.
Show resolved Hide resolved
}
logger.Debugf("Extracted hints are :%v", hints)
}

if len(hints) > 0 {
hintData = GenerateHintsResult(hints, k8sMapping, annotations, logger, prefix, cID)
hintsExtracted := utils.GenerateHints(annotations, cName, prefix)
if len(hintsExtracted) == 0 {
return hintData
}

// Only if there are processors defined in a specific container we append them to the processors of the top level
if len(containerProcessors) > 0 {
hintData.processors = append(hintData.processors, containerProcessors...)
// Check if host exists. Otherwise, add default entry for it.
if cHost != "" {
if hintsValues, ok := hintsExtracted[hints].(mapstr.M); ok {
constanca-m marked this conversation as resolved.
Show resolved Hide resolved
if _, ok := hintsValues[host]; !ok {
hintsValues[host] = cHost
}
} else {
hintsExtracted[hints] = mapstr.M{
host: cHost,
}
logger.Debugf("Generated Processors mapping :%v", hintData.processors)
}
}

return hintData
}

// Generates hints and processors list for specific containers
func GenerateHintsForContainer(annotations mapstr.M, parts, prefix string) (mapstr.M, []mapstr.M) {
constanca-m marked this conversation as resolved.
Show resolved Hide resolved
hints := utils.GenerateHints(annotations, parts, prefix)
// Processors for specific container
// We need to make an extra check if we have processors added only to the specific containers
containerProcessors := utils.GetConfigs(annotations, prefix, "hints."+parts+"/processors")

return hints, containerProcessors
}
logger.Debugf("Extracted hints are :%v", hintsExtracted)

// Generates the final hintData (hints and processors) struct that will be emitted in pods.
func GenerateHintsResult(hints mapstr.M, k8sMapping map[string]interface{}, annotations mapstr.M, logger *logp.Logger, prefix, cID string) hintsData {
hintData := hintsData{
composableMapping: mapstr.M{},
processors: []mapstr.M{},
}

hintData.composableMapping = GenerateHintsMapping(hints, k8sMapping, logger, cID)
hintData.composableMapping = GenerateHintsMapping(hintsExtracted, k8sMapping, logger, cID)
logger.Debugf("Generated hints mappings :%v", hintData.composableMapping)

// Eg co.elastic.hints/processors.decode_json_fields.fields: "message" will add a processor in all containers of pod
hintData.processors = utils.GetConfigs(annotations, prefix, processorhints)
hintData.processors = utils.GetConfigs(annotations, prefix, hints+"/"+processors)
// We need to check the processors for the specific container, if they exist.
if cName != "" {
containerProcessors := utils.GetConfigs(annotations, prefix, hints+"."+cName+"/"+processors)
if len(containerProcessors) > 0 {
hintData.processors = append(hintData.processors, containerProcessors...)
}
}
logger.Debugf("Generated Processors mapping :%v", hintData.processors)

return hintData
}
131 changes: 131 additions & 0 deletions internal/pkg/composable/providers/kubernetes/hints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,3 +591,134 @@ func TestGenerateHintsMappingWithProcessorsForContainer(t *testing.T) {
assert.Contains(t, expectedprocesors, hintData.processors[1])
}
}

func TestDefaultHost(t *testing.T) {
logger := getLogger()
cID := "abcd"

mapping := map[string]interface{}{
"namespace": "testns",
"pod": mapstr.M{
"uid": string(types.UID(uid)),
"name": "testpod",
"ip": "127.0.0.5",
},
"annotations": mapstr.M{
"app": "production",
"co": mapstr.M{
"elastic": mapstr.M{
"hints/package": "redis",
"hints": mapstr.M{
"redis-1/host": "${kubernetes.pod.ip}:6379",
"redis-1/stream": "stderr",
"redis-2/host": "${kubernetes.pod.ip}:6400",
"redis-4/stream": "stderr",
},
},
},
},
}

addContainerMapping := func(mapping map[string]interface{}, container mapstr.M) map[string]interface{} {
clone := make(map[string]interface{}, len(mapping))
for k, v := range mapping {
clone[k] = v
}
clone["container"] = container
return clone
}

tests := []struct {
msg string
mapping map[string]interface{}
expected mapstr.M
}{
{
msg: "Test container with two hints (redis-1), of which one is host.",
mapping: addContainerMapping(mapping,
mapstr.M{
"name": "redis-1",
"port": "6379",
"id": cID,
},
),
expected: mapstr.M{
"container_id": cID,
"redis": mapstr.M{
"container_logs": mapstr.M{
"enabled": true,
},
"enabled": true,
"host": "127.0.0.5:6379",
"stream": "stderr",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need the stream here? If not shall we remove it from all tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need it, I just wanted to make sure that adding the default host was not overwriting any of the hints.

},
},
},
{
msg: "Test container with only one hint for host (redis-2).",
mapping: addContainerMapping(mapping,
mapstr.M{
"name": "redis-2",
"port": "6400",
"id": cID,
},
),
expected: mapstr.M{
"container_id": cID,
"redis": mapstr.M{
"container_logs": mapstr.M{
"enabled": true,
},
"enabled": true,
"host": "127.0.0.5:6400",
},
},
},
{
msg: "Test container without hints and check for the default host (redis-3).",
mapping: addContainerMapping(mapping,
mapstr.M{
"name": "redis-3",
"port": "7000",
"id": cID,
},
),
expected: mapstr.M{
"container_id": cID,
"redis": mapstr.M{
"container_logs": mapstr.M{
"enabled": true,
},
"enabled": true,
"host": "127.0.0.5:7000",
},
},
},
{
msg: "Test container with one hint for stream and without port defined (redis-4).",
mapping: addContainerMapping(mapping,
mapstr.M{
"name": "redis-4",
"id": cID,
},
),
expected: mapstr.M{
"container_id": cID,
"redis": mapstr.M{
"container_logs": mapstr.M{
"enabled": true,
},
"enabled": true,
"stream": "stderr",
},
},
},
}

for _, test := range tests {
t.Run(test.msg, func(t *testing.T) {
hintData := GetHintsMapping(test.mapping, logger, "co.elastic", cID)
assert.Equal(t, test.expected, hintData.composableMapping)
})
}
}
4 changes: 0 additions & 4 deletions internal/pkg/composable/providers/kubernetes/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@ import (
"github.com/elastic/elastic-agent/internal/pkg/composable"
)

const (
processorhints = "hints/processors"
)

type pod struct {
watcher kubernetes.Watcher
nodeWatcher kubernetes.Watcher
Expand Down
Loading