Skip to content

Commit

Permalink
Merge branch 'main' of github.com:elastic/elastic-agent into feat/rel…
Browse files Browse the repository at this point in the history
…ax-server
  • Loading branch information
michalpristas committed Oct 19, 2023
2 parents 7de6ef7 + 9196320 commit 9b65627
Show file tree
Hide file tree
Showing 4 changed files with 220 additions and 65 deletions.
32 changes: 32 additions & 0 deletions changelog/fragments/1696935756-add-default-host.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: feature

# Change summary; a 80ish characters long description of the change.
summary: Hints Autodiscovery for Elastic Agent - Add default host for each container in a pod

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
#description:

# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component: elastic-agent

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: https://github.com/elastic/elastic-agent/pull/3575

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: https://github.com/elastic/elastic-agent/issues/1453
118 changes: 57 additions & 61 deletions internal/pkg/composable/providers/kubernetes/hints.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,18 @@ import (
)

const (
hints = "hints"

integration = "package"
datastreams = "data_streams"

host = "host"
period = "period"
timeout = "timeout"
metricspath = "metrics_path"
username = "username"
password = "password"
stream = "stream" // this is the container stream: stdout/stderr
processors = "processors"
)

type hintsBuilder struct {
Expand Down Expand Up @@ -250,83 +252,77 @@ func GenerateHintsMapping(hints mapstr.M, kubeMeta mapstr.M, logger *logp.Logger
return hintsMapping
}

// Generates the hints and processor mappings from provided pod annotation map
// GetHintsMapping Generates the hints and processor mappings from provided pod annotation map
func GetHintsMapping(k8sMapping map[string]interface{}, logger *logp.Logger, prefix string, cID string) hintsData {
hintData := hintsData{
composableMapping: mapstr.M{},
processors: []mapstr.M{},
}
var hints mapstr.M
var containerProcessors []mapstr.M

if ann, ok := k8sMapping["annotations"]; ok {
annotations, _ := ann.(mapstr.M)

if containerEntries, err := annotations.GetValue(prefix + ".hints"); err == nil {
entries, ok := containerEntries.(mapstr.M)
if ok && len(entries) > 0 {
for key := range entries {
parts := strings.Split(key, "/")

if len(parts) > 1 {
if con, ok := k8sMapping["container"]; ok {
containers, ok := con.(mapstr.M)
if ok {
if cname, err := containers.GetValue("name"); err == nil {
if parts[0] == cname {
// If there are hints like co.elastic.hints.<container_name>/ then add the values after the / to the corresponding container
// Eg Annotation "co.elastic.hints.nginx/stream: stderr" will create a hints entry for container nginx
hints, containerProcessors = GenerateHintsForContainer(annotations, parts[0], prefix)
}
}
}
}
}
cName := ""
cHost := ""

ann, ok := k8sMapping["annotations"]
if !ok {
return hintData
}
annotations, ok := ann.(mapstr.M)
if !ok {
return hintData
}

// Get the name of the container from the metadata. We need it to extract the hints that affect it directly.
// E.g. co.elastic.hints.<container-name>/host: "..."
if con, ok := k8sMapping["container"]; ok {
if containers, ok := con.(mapstr.M); ok {
if name, err := containers.GetValue("name"); err == nil {
if nameString, ok := name.(string); ok {
cName = nameString
}
}
if cPort, err := containers.GetValue("port"); err == nil {
// This is the default for the host value of a specific container.
if portString, ok := cPort.(string); ok {
cHost = "${kubernetes.pod.ip}:" + portString
}
}
} else {
// If there are top level hints like co.elastic.hints/ then just add the values after the /
// Eg. Annotation "co.elastic.hints/stream: stderr" will will create a hints entries for all containers in the pod
hints = utils.GenerateHints(annotations, "", prefix)
}
logger.Debugf("Extracted hints are :%v", hints)
}

if len(hints) > 0 {
hintData = GenerateHintsResult(hints, k8sMapping, annotations, logger, prefix, cID)
hintsExtracted := utils.GenerateHints(annotations, cName, prefix)
if len(hintsExtracted) == 0 {
return hintData
}

// Only if there are processors defined in a specific container we append them to the processors of the top level
if len(containerProcessors) > 0 {
hintData.processors = append(hintData.processors, containerProcessors...)
// Check if host exists. Otherwise, add default entry for it.
if cHost != "" {
hintsValues, ok := hintsExtracted[hints]
if ok {
if hintsHostValues, ok := hintsValues.(mapstr.M); ok {
if _, ok := hintsHostValues[host]; !ok {
hintsHostValues[host] = cHost
}
}
} else {
hintsExtracted[hints] = mapstr.M{
host: cHost,
}
logger.Debugf("Generated Processors mapping :%v", hintData.processors)
}
}

return hintData
}

// Generates hints and processors list for specific containers
func GenerateHintsForContainer(annotations mapstr.M, parts, prefix string) (mapstr.M, []mapstr.M) {
hints := utils.GenerateHints(annotations, parts, prefix)
// Processors for specific container
// We need to make an extra check if we have processors added only to the specific containers
containerProcessors := utils.GetConfigs(annotations, prefix, "hints."+parts+"/processors")
logger.Debugf("Extracted hints are :%v", hintsExtracted)

return hints, containerProcessors
}

// Generates the final hintData (hints and processors) struct that will be emitted in pods.
func GenerateHintsResult(hints mapstr.M, k8sMapping map[string]interface{}, annotations mapstr.M, logger *logp.Logger, prefix, cID string) hintsData {
hintData := hintsData{
composableMapping: mapstr.M{},
processors: []mapstr.M{},
}

hintData.composableMapping = GenerateHintsMapping(hints, k8sMapping, logger, cID)
hintData.composableMapping = GenerateHintsMapping(hintsExtracted, k8sMapping, logger, cID)
logger.Debugf("Generated hints mappings :%v", hintData.composableMapping)

// Eg co.elastic.hints/processors.decode_json_fields.fields: "message" will add a processor in all containers of pod
hintData.processors = utils.GetConfigs(annotations, prefix, processorhints)
hintData.processors = utils.GetConfigs(annotations, prefix, hints+"/"+processors)
// We need to check the processors for the specific container, if they exist.
if cName != "" {
containerProcessors := utils.GetConfigs(annotations, prefix, hints+"."+cName+"/"+processors)
if len(containerProcessors) > 0 {
hintData.processors = append(hintData.processors, containerProcessors...)
}
}
logger.Debugf("Generated Processors mapping :%v", hintData.processors)

return hintData
}
131 changes: 131 additions & 0 deletions internal/pkg/composable/providers/kubernetes/hints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,3 +591,134 @@ func TestGenerateHintsMappingWithProcessorsForContainer(t *testing.T) {
assert.Contains(t, expectedprocesors, hintData.processors[1])
}
}

func TestDefaultHost(t *testing.T) {
logger := getLogger()
cID := "abcd"

mapping := map[string]interface{}{
"namespace": "testns",
"pod": mapstr.M{
"uid": string(types.UID(uid)),
"name": "testpod",
"ip": "127.0.0.5",
},
"annotations": mapstr.M{
"app": "production",
"co": mapstr.M{
"elastic": mapstr.M{
"hints/package": "redis",
"hints": mapstr.M{
"redis-1/host": "${kubernetes.pod.ip}:6379",
"redis-1/stream": "stderr",
"redis-2/host": "${kubernetes.pod.ip}:6400",
"redis-4/stream": "stderr",
},
},
},
},
}

addContainerMapping := func(mapping map[string]interface{}, container mapstr.M) map[string]interface{} {
clone := make(map[string]interface{}, len(mapping))
for k, v := range mapping {
clone[k] = v
}
clone["container"] = container
return clone
}

tests := []struct {
msg string
mapping map[string]interface{}
expected mapstr.M
}{
{
msg: "Test container with two hints (redis-1), of which one is host.",
mapping: addContainerMapping(mapping,
mapstr.M{
"name": "redis-1",
"port": "6379",
"id": cID,
},
),
expected: mapstr.M{
"container_id": cID,
"redis": mapstr.M{
"container_logs": mapstr.M{
"enabled": true,
},
"enabled": true,
"host": "127.0.0.5:6379",
"stream": "stderr",
},
},
},
{
msg: "Test container with only one hint for host (redis-2).",
mapping: addContainerMapping(mapping,
mapstr.M{
"name": "redis-2",
"port": "6400",
"id": cID,
},
),
expected: mapstr.M{
"container_id": cID,
"redis": mapstr.M{
"container_logs": mapstr.M{
"enabled": true,
},
"enabled": true,
"host": "127.0.0.5:6400",
},
},
},
{
msg: "Test container without hints and check for the default host (redis-3).",
mapping: addContainerMapping(mapping,
mapstr.M{
"name": "redis-3",
"port": "7000",
"id": cID,
},
),
expected: mapstr.M{
"container_id": cID,
"redis": mapstr.M{
"container_logs": mapstr.M{
"enabled": true,
},
"enabled": true,
"host": "127.0.0.5:7000",
},
},
},
{
msg: "Test container with one hint for stream and without port defined (redis-4).",
mapping: addContainerMapping(mapping,
mapstr.M{
"name": "redis-4",
"id": cID,
},
),
expected: mapstr.M{
"container_id": cID,
"redis": mapstr.M{
"container_logs": mapstr.M{
"enabled": true,
},
"enabled": true,
"stream": "stderr",
},
},
},
}

for _, test := range tests {
t.Run(test.msg, func(t *testing.T) {
hintData := GetHintsMapping(test.mapping, logger, "co.elastic", cID)
assert.Equal(t, test.expected, hintData.composableMapping)
})
}
}
4 changes: 0 additions & 4 deletions internal/pkg/composable/providers/kubernetes/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@ import (
"github.com/elastic/elastic-agent/internal/pkg/composable"
)

const (
processorhints = "hints/processors"
)

type pod struct {
watcher kubernetes.Watcher
nodeWatcher kubernetes.Watcher
Expand Down

0 comments on commit 9b65627

Please sign in to comment.