diff --git a/.chloggen/k8sobjects-data-structure.yaml b/.chloggen/k8sobjects-data-structure.yaml new file mode 100644 index 000000000000..0f4d777e8d2b --- /dev/null +++ b/.chloggen/k8sobjects-data-structure.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: k8sobjectsreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: ensure the `k8s.namespace.name` attribute is set for objects retrieved using the `watch` mode + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [36352] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/receiver/k8sobjectsreceiver/e2e_test.go b/receiver/k8sobjectsreceiver/e2e_test.go index 03b491cfa555..b7c9c4c54b08 100644 --- a/receiver/k8sobjectsreceiver/e2e_test.go +++ b/receiver/k8sobjectsreceiver/e2e_test.go @@ -145,6 +145,8 @@ func TestE2E(t *testing.T) { }, time.Duration(tc.timeoutMinutes)*time.Minute, 1*time.Second, "Timeout: failed to receive logs in %d minutes", tc.timeoutMinutes) + // golden.WriteLogs(t, expectedFile, logsConsumer.AllLogs()[0]) + require.NoErrorf(t, plogtest.CompareLogs(expected, logsConsumer.AllLogs()[0], plogtest.IgnoreObservedTimestamp(), plogtest.IgnoreResourceLogsOrder(), diff --git a/receiver/k8sobjectsreceiver/testdata/e2e/expected/watch_events.yaml b/receiver/k8sobjectsreceiver/testdata/e2e/expected/watch_events.yaml index 642bda8b3482..b4b822f5f682 100644 --- a/receiver/k8sobjectsreceiver/testdata/e2e/expected/watch_events.yaml +++ b/receiver/k8sobjectsreceiver/testdata/e2e/expected/watch_events.yaml @@ -1,5 +1,9 @@ resourceLogs: - - resource: {} + - resource: + attributes: + - key: k8s.namespace.name + value: + stringValue: default scopeLogs: - scope: {} logRecords: diff --git a/receiver/k8sobjectsreceiver/testdata/e2e/expected/watch_events_core.yaml b/receiver/k8sobjectsreceiver/testdata/e2e/expected/watch_events_core.yaml index a8d432c0d829..a1dca2f0c224 100644 --- a/receiver/k8sobjectsreceiver/testdata/e2e/expected/watch_events_core.yaml +++ b/receiver/k8sobjectsreceiver/testdata/e2e/expected/watch_events_core.yaml @@ -1,5 +1,9 @@ resourceLogs: - - resource: {} + - resource: + attributes: + - key: k8s.namespace.name + value: + stringValue: default scopeLogs: - scope: {} logRecords: diff --git a/receiver/k8sobjectsreceiver/unstructured_to_logdata.go b/receiver/k8sobjectsreceiver/unstructured_to_logdata.go index 15dabf6f78b0..38a02d79b531 100644 --- a/receiver/k8sobjectsreceiver/unstructured_to_logdata.go +++ b/receiver/k8sobjectsreceiver/unstructured_to_logdata.go @@ -51,16 +51,16 @@ func unstructuredListToLogData(event *unstructured.UnstructuredList, observedAt namespaceResourceMap := make(map[string]plog.LogRecordSlice) for _, e := range event.Items { - logSlice, ok := namespaceResourceMap[e.GetNamespace()] + logSlice, ok := namespaceResourceMap[getNamespace(e)] if !ok { rl := resourceLogs.AppendEmpty() resourceAttrs := rl.Resource().Attributes() - if namespace := e.GetNamespace(); namespace != "" { + if namespace := getNamespace(e); namespace != "" { resourceAttrs.PutStr(semconv.AttributeK8SNamespaceName, namespace) } sl := rl.ScopeLogs().AppendEmpty() logSlice = sl.LogRecords() - namespaceResourceMap[e.GetNamespace()] = logSlice + namespaceResourceMap[getNamespace(e)] = logSlice } record := logSlice.AppendEmpty() record.SetObservedTimestamp(pcommon.NewTimestampFromTime(observedAt)) @@ -79,3 +79,15 @@ func unstructuredListToLogData(event *unstructured.UnstructuredList, observedAt } return out } + +func getNamespace(e unstructured.Unstructured) string { + // first, try to use the GetNamespace() method, which checks for the metadata.namespace property + if namespace := e.GetNamespace(); namespace != "" { + return namespace + } + // try to look up namespace in object.metadata.namespace (for objects reported via watch mode) + if namespace, ok, _ := unstructured.NestedString(e.Object, "object", "metadata", "namespace"); ok { + return namespace + } + return "" +} diff --git a/receiver/k8sobjectsreceiver/unstructured_to_logdata_test.go b/receiver/k8sobjectsreceiver/unstructured_to_logdata_test.go index d6ed1c1d1e8a..bca2ba2554fb 100644 --- a/receiver/k8sobjectsreceiver/unstructured_to_logdata_test.go +++ b/receiver/k8sobjectsreceiver/unstructured_to_logdata_test.go @@ -9,7 +9,6 @@ import ( "time" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" semconv "go.opentelemetry.io/collector/semconv/v1.27.0" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" @@ -91,7 +90,7 @@ func TestUnstructuredListToLogData(t *testing.T) { assert.Equal(t, 3, logRecords.Len()) }) - t.Run("Test event.name in watch events", func(t *testing.T) { + t.Run("Test event observed timestamp is present", func(t *testing.T) { config := &K8sObjectsConfig{ gvr: &schema.GroupVersionResource{ Group: "", @@ -112,7 +111,8 @@ func TestUnstructuredListToLogData(t *testing.T) { }, } - logs, err := watchObjectsToLogData(event, time.Now(), config) + observedAt := time.Now() + logs, err := watchObjectsToLogData(event, observedAt, config) assert.NoError(t, err) assert.Equal(t, 1, logs.LogRecordCount()) @@ -123,14 +123,12 @@ func TestUnstructuredListToLogData(t *testing.T) { logRecords := rl.ScopeLogs().At(0).LogRecords() assert.Equal(t, 1, rl.ScopeLogs().Len()) assert.Equal(t, 1, logRecords.Len()) - - attrs := logRecords.At(0).Attributes() - eventName, ok := attrs.Get("event.name") - require.True(t, ok) - assert.EqualValues(t, "generic-name", eventName.AsRaw()) + assert.Positive(t, logRecords.At(0).ObservedTimestamp().AsTime().Unix()) + assert.Equal(t, logRecords.At(0).ObservedTimestamp().AsTime().Unix(), observedAt.Unix()) }) - t.Run("Test event observed timestamp is present", func(t *testing.T) { + t.Run("Test pull and watch objects both contain k8s.namespace.name", func(t *testing.T) { + observedTimestamp := time.Now() config := &K8sObjectsConfig{ gvr: &schema.GroupVersionResource{ Group: "", @@ -138,32 +136,92 @@ func TestUnstructuredListToLogData(t *testing.T) { Resource: "events", }, } - event := &watch.Event{ + watchedEvent := &watch.Event{ Type: watch.Added, Object: &unstructured.Unstructured{ Object: map[string]any{ "kind": "Event", "apiVersion": "v1", "metadata": map[string]any{ - "name": "generic-name", + "name": "generic-name", + "namespace": "my-namespace", }, }, }, } - observedAt := time.Now() - logs, err := watchObjectsToLogData(event, observedAt, config) - assert.NoError(t, err) - - assert.Equal(t, 1, logs.LogRecordCount()) + pulledEvent := &unstructured.UnstructuredList{ + Items: []unstructured.Unstructured{{ + Object: map[string]any{ + "kind": "Event", + "apiVersion": "v1", + "metadata": map[string]any{ + "name": "generic-name", + "namespace": "my-namespace", + }, + }, + }}, + } - resourceLogs := logs.ResourceLogs() - assert.Equal(t, 1, resourceLogs.Len()) - rl := resourceLogs.At(0) - logRecords := rl.ScopeLogs().At(0).LogRecords() - assert.Equal(t, 1, rl.ScopeLogs().Len()) - assert.Equal(t, 1, logRecords.Len()) - assert.Positive(t, logRecords.At(0).ObservedTimestamp().AsTime().Unix()) - assert.Equal(t, logRecords.At(0).ObservedTimestamp().AsTime().Unix(), observedAt.Unix()) + logEntryFromWatchEvent, err := watchObjectsToLogData(watchedEvent, observedTimestamp, config) + assert.NoError(t, err) + assert.NotNil(t, logEntryFromWatchEvent) + + // verify the event.type, event.domain and k8s.resource.name attributes have been added + + watchEventResourceAttrs := logEntryFromWatchEvent.ResourceLogs().At(0).Resource().Attributes() + k8sNamespace, ok := watchEventResourceAttrs.Get(semconv.AttributeK8SNamespaceName) + assert.True(t, ok) + assert.Equal(t, + "my-namespace", + k8sNamespace.Str(), + ) + + watchEvenLogRecordtAttrs := logEntryFromWatchEvent.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes() + eventType, ok := watchEvenLogRecordtAttrs.Get("event.name") + assert.True(t, ok) + assert.Equal( + t, + "generic-name", + eventType.AsString(), + ) + + eventDomain, ok := watchEvenLogRecordtAttrs.Get("event.domain") + assert.True(t, ok) + assert.Equal( + t, + "k8s", + eventDomain.AsString(), + ) + + k8sResourceName, ok := watchEvenLogRecordtAttrs.Get("k8s.resource.name") + assert.True(t, ok) + assert.Equal( + t, + "events", + k8sResourceName.AsString(), + ) + + logEntryFromPulledEvent := unstructuredListToLogData(pulledEvent, observedTimestamp, config) + assert.NotNil(t, logEntryFromPulledEvent) + + pullEventResourceAttrs := logEntryFromPulledEvent.ResourceLogs().At(0).Resource().Attributes() + k8sNamespace, ok = pullEventResourceAttrs.Get(semconv.AttributeK8SNamespaceName) + assert.True(t, ok) + assert.Equal( + t, + "my-namespace", + k8sNamespace.Str(), + ) + + pullEventLogRecordAttrs := logEntryFromPulledEvent.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes() + + k8sResourceName, ok = pullEventLogRecordAttrs.Get("k8s.resource.name") + assert.True(t, ok) + assert.Equal( + t, + "events", + k8sResourceName.AsString(), + ) }) }