Skip to content

Commit

Permalink
Support Hints per container input (#3416)
Browse files Browse the repository at this point in the history
* Adding updates for code to introduces hints per container

* Splitting functions for hints and processors

* Adding test for specific container hints
  • Loading branch information
gizas authored Oct 6, 2023
1 parent 6cc587a commit 7a667da
Show file tree
Hide file tree
Showing 3 changed files with 221 additions and 13 deletions.
32 changes: 32 additions & 0 deletions changelog/fragments/1694692246-hintspercontainer.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: enhancement

# Change summary; a 80ish characters long description of the change.
summary: Hints Autodiscovery for Elastic Agent - Define configuration through annotations for specific containers inside a pod

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
#description:

# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component:

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
#pr: https://github.com/owner/repo/1234

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
#issue: https://github.com/owner/repo/1234
70 changes: 64 additions & 6 deletions internal/pkg/composable/providers/kubernetes/hints.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,19 +256,77 @@ func GetHintsMapping(k8sMapping map[string]interface{}, logger *logp.Logger, pre
composableMapping: mapstr.M{},
processors: []mapstr.M{},
}
var hints mapstr.M
var containerProcessors []mapstr.M

if ann, ok := k8sMapping["annotations"]; ok {
annotations, _ := ann.(mapstr.M)
hints := utils.GenerateHints(annotations, "", prefix)

if containerEntries, err := annotations.GetValue(prefix + ".hints"); err == nil {
entries, ok := containerEntries.(mapstr.M)
if ok && len(entries) > 0 {
for key := range entries {
parts := strings.Split(key, "/")

if len(parts) > 1 {
if con, ok := k8sMapping["container"]; ok {
containers, ok := con.(mapstr.M)
if ok {
if cname, err := containers.GetValue("name"); err == nil {
if parts[0] == cname {
// If there are hints like co.elastic.hints.<container_name>/ then add the values after the / to the corresponding container
// Eg Annotation "co.elastic.hints.nginx/stream: stderr" will create a hints entry for container nginx
hints, containerProcessors = GenerateHintsForContainer(annotations, parts[0], prefix)
}
}
}
}
}
}
}
} else {
// If there are top level hints like co.elastic.hints/ then just add the values after the /
// Eg. Annotation "co.elastic.hints/stream: stderr" will will create a hints entries for all containers in the pod
hints = utils.GenerateHints(annotations, "", prefix)
}
logger.Debugf("Extracted hints are :%v", hints)

if len(hints) > 0 {
logger.Debugf("Extracted hints are :%v", hints)
hintData.composableMapping = GenerateHintsMapping(hints, k8sMapping, logger, cID)
logger.Debugf("Generated hints mappings are :%v", hintData.composableMapping)
hintData = GenerateHintsResult(hints, k8sMapping, annotations, logger, prefix, cID)

hintData.processors = utils.GetConfigs(annotations, prefix, processorhints)
logger.Debugf("Generated Processors are :%v", hintData.processors)
// Only if there are processors defined in a specific container we append them to the processors of the top level
if len(containerProcessors) > 0 {
hintData.processors = append(hintData.processors, containerProcessors...)
}
logger.Debugf("Generated Processors mapping :%v", hintData.processors)
}
}

return hintData
}

// Generates hints and processors list for specific containers
func GenerateHintsForContainer(annotations mapstr.M, parts, prefix string) (mapstr.M, []mapstr.M) {
hints := utils.GenerateHints(annotations, parts, prefix)
// Processors for specific container
// We need to make an extra check if we have processors added only to the specific containers
containerProcessors := utils.GetConfigs(annotations, prefix, "hints."+parts+"/processors")

return hints, containerProcessors
}

// Generates the final hintData (hints and processors) struct that will be emitted in pods.
func GenerateHintsResult(hints mapstr.M, k8sMapping map[string]interface{}, annotations mapstr.M, logger *logp.Logger, prefix, cID string) hintsData {
hintData := hintsData{
composableMapping: mapstr.M{},
processors: []mapstr.M{},
}

hintData.composableMapping = GenerateHintsMapping(hints, k8sMapping, logger, cID)
logger.Debugf("Generated hints mappings :%v", hintData.composableMapping)

// Eg co.elastic.hints/processors.decode_json_fields.fields: "message" will add a processor in all containers of pod
hintData.processors = utils.GetConfigs(annotations, prefix, processorhints)

return hintData
}
132 changes: 125 additions & 7 deletions internal/pkg/composable/providers/kubernetes/hints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,11 +433,10 @@ func TestGenerateHintsMappingWithProcessors(t *testing.T) {
},
},
},
},
},
}},
}

expected_hints := mapstr.M{
expectedhints := mapstr.M{
"container_id": "asdfghjkl",
"apache": mapstr.M{
"container_logs": mapstr.M{
Expand All @@ -447,7 +446,7 @@ func TestGenerateHintsMappingWithProcessors(t *testing.T) {
},
}

expected_procesors := []mapstr.M{
expectedprocesors := []mapstr.M{
0: {
"rename": mapstr.M{
"fail_on_error": "false",
Expand All @@ -467,9 +466,128 @@ func TestGenerateHintsMappingWithProcessors(t *testing.T) {

hintData := GetHintsMapping(mapping, logger, "co.elastic", "asdfghjkl")

assert.Equal(t, expected_hints, hintData.composableMapping)
assert.Equal(t, expectedhints, hintData.composableMapping)
//assert.Equal(t, expected_procesors, hintData.processors). We replace this assertion with assert.Contains in order to avoid flakiness in tests because map keys are not sorted
assert.Contains(t, expected_procesors, hintData.processors[0])
assert.Contains(t, expected_procesors, hintData.processors[1])
if len(hintData.processors) > 0 {
assert.Contains(t, expectedprocesors, hintData.processors[0])
assert.Contains(t, expectedprocesors, hintData.processors[1])
}
}

// This test evaluates the hints Generation when you define specific container nginx
// Following will need to include all annotations after top level "co.elastic.hints/" plus those that defined for nginx with prefix "co.elastic.hints.nginx"
// mappings.container.name = nginx defines the container we want to emmit the new configuration. Annotations for other containers like co.elastic.hints.webapp should be excluded
func TestGenerateHintsMappingWithProcessorsForContainer(t *testing.T) {
logger := getLogger()
// pod := &kubernetes.Pod{
// ObjectMeta: metav1.ObjectMeta{
// Name: "testpod",
// UID: types.UID(uid),
// Namespace: "testns",
// Labels: map[string]string{
// "foo": "bar",
// "with-dash": "dash-value",
// "with/slash": "some/path",
// },
// Annotations: map[string]string{
// "app": "production",
// "co.elastic.hints/package": "apache",
// "co.elastic.hints/processors.decode_json_fields.fields": "message",
// "co.elastic.hints/processors.decode_json_fields.add_error_key": "true",
// "co.elastic.hints/processors.decode_json_fields.overwrite_keys": "true",
// "co.elastic.hints/processors.decode_json_fields.target": "team",
// "co.elastic.hints.nginx/stream": "stderr",
// "co.elastic.hints.nginx/processors.add_fields.fields.name": "myproject",
// "co.elastic.hints.webapp/processors.add_fields.fields.name": "myproject2",
// },
// },
// TypeMeta: metav1.TypeMeta{
// Kind: "Pod",
// APIVersion: "v1",
// },
// Spec: kubernetes.PodSpec{
// NodeName: "testnode",
// },
// Status: kubernetes.PodStatus{PodIP: "127.0.0.5"},
// }

mapping := map[string]interface{}{
"namespace": "testns",
"pod": mapstr.M{
"uid": string(types.UID(uid)),
"name": "testpod",
"ip": "127.0.0.5",
},
"namespace_annotations": mapstr.M{
"nsa": "nsb",
},
"labels": mapstr.M{
"foo": "bar",
"with-dash": "dash-value",
"with/slash": "some/path",
},
"container": mapstr.M{
"name": "nginx",
"id": "8863418215f5d6b1919db9b3b710615878f88b0773e2b098e714c8d696c3261f",
},
"annotations": mapstr.M{
"app": "production",
"co": mapstr.M{
"elastic": mapstr.M{
"hints/package": "apache",
"hints/processors": mapstr.M{
"decode_json_fields": mapstr.M{
"fields": "message",
"add_error_key": "true",
"overwrite_keys": "true",
"target": "team",
}},
"hints": mapstr.M{
"nginx/processors": mapstr.M{
"add_fields": mapstr.M{
"name": "myproject",
},
},
"nginx/stream": "stderr",
},
},
},
},
}

expectedhints := mapstr.M{
"container_id": "asdfghjkl",
"apache": mapstr.M{
"container_logs": mapstr.M{
"enabled": true,
},
"stream": "stderr",
"enabled": true,
},
}

expectedprocesors := []mapstr.M{
0: {
"decode_json_fields": mapstr.M{
"fields": "message",
"add_error_key": "true",
"overwrite_keys": "true",
"target": "team",
},
},
1: {
"add_fields": mapstr.M{
"name": "myproject",
},
},
}

hintData := GetHintsMapping(mapping, logger, "co.elastic", "asdfghjkl")

assert.Equal(t, expectedhints, hintData.composableMapping)
//assert.Equal(t, expected_procesors, hintData.processors). We replace this assertion with assert.Contains in order to avoid flakiness in tests because map keys are not sorted
if len(hintData.processors) > 0 {
assert.Contains(t, expectedprocesors, hintData.processors[0])
assert.Contains(t, expectedprocesors, hintData.processors[1])
}
}

0 comments on commit 7a667da

Please sign in to comment.