From c734cf216af33686922df985c0c0044be7e006f9 Mon Sep 17 00:00:00 2001 From: ritalwar Date: Sat, 21 Oct 2023 23:01:56 +0530 Subject: [PATCH 01/12] Update eventmapping implementation for statsd module --- .../metricbeat/module/statsd/server/data.go | 19 ++++++----- .../module/statsd/server/data_test.go | 6 ++-- .../metricbeat/module/statsd/server/server.go | 34 +++++++++---------- 3 files changed, 29 insertions(+), 30 deletions(-) diff --git a/x-pack/metricbeat/module/statsd/server/data.go b/x-pack/metricbeat/module/statsd/server/data.go index bac70457fd2..38f952063f1 100644 --- a/x-pack/metricbeat/module/statsd/server/data.go +++ b/x-pack/metricbeat/module/statsd/server/data.go @@ -102,18 +102,19 @@ func parse(b []byte) ([]statsdMetric, error) { return metrics, nil } -func eventMapping(metricName string, metricValue interface{}, metricSetFields mapstr.M, mappings map[string]StatsdMapping) { +func eventMapping(metricName string, metricValue interface{}, mappings map[string]StatsdMapping) *mapstr.M { + m := mapstr.M{} if len(mappings) == 0 { - metricSetFields[common.DeDot(metricName)] = metricValue - return + m[common.DeDot(metricName)] = metricValue + return &m } for _, mapping := range mappings { // The metricname match the one with no labels in mappings // Let's insert it dedotted and continue if metricName == mapping.Metric { - metricSetFields[mapping.Value.Field] = metricValue - return + m[mapping.Value.Field] = metricValue + return &m } res := mapping.regex.FindStringSubmatch(metricName) @@ -121,7 +122,7 @@ func eventMapping(metricName string, metricValue interface{}, metricSetFields ma // Not all labels match // Skip and continue to next mapping if len(res) != (len(mapping.Labels) + 1) { - logger.Debug("not all labels match in statsd.mapping, skipped") + logger.Debug("not all labels match in statsd.mappings, skipped") continue } @@ -133,13 +134,15 @@ func eventMapping(metricName string, metricValue interface{}, metricSetFields ma continue } - metricSetFields[label.Field] = res[i] + m[label.Field] = res[i] } } // Let's add the metric with the value field - metricSetFields[mapping.Value.Field] = metricValue + m[mapping.Value.Field] = metricValue + break } + return &m } func newMetricProcessor(ttl time.Duration) *metricProcessor { diff --git a/x-pack/metricbeat/module/statsd/server/data_test.go b/x-pack/metricbeat/module/statsd/server/data_test.go index 2377a2fae5b..9b25499316b 100644 --- a/x-pack/metricbeat/module/statsd/server/data_test.go +++ b/x-pack/metricbeat/module/statsd/server/data_test.go @@ -737,11 +737,9 @@ func TestEventMapping(t *testing.T) { }, } { t.Run(test.metricName, func(t *testing.T) { - metricSetFields := mapstr.M{} builtMappings, _ := buildMappings(mappings) - eventMapping(test.metricName, test.metricValue, metricSetFields, builtMappings) - - assert.Equal(t, test.expected, metricSetFields) + ms := eventMapping(test.metricName, test.metricValue, builtMappings) + assert.Equal(t, test.expected, *ms) }) } } diff --git a/x-pack/metricbeat/module/statsd/server/server.go b/x-pack/metricbeat/module/statsd/server/server.go index 48aee89e460..35f303b2d25 100644 --- a/x-pack/metricbeat/module/statsd/server/server.go +++ b/x-pack/metricbeat/module/statsd/server/server.go @@ -95,7 +95,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { mappings, err := buildMappings(config.Mappings) if err != nil { - return nil, fmt.Errorf("invalid mapping configuration for `statsd.mapping`: %w", err) + return nil, fmt.Errorf("invalid mapping configuration for `statsd.mappings`: %w", err) } return &MetricSet{ BaseMetricSet: base, @@ -165,28 +165,26 @@ func buildMappings(config []StatsdMapping) (map[string]StatsdMapping, error) { func (m *MetricSet) getEvents() []*mb.Event { groups := m.processor.GetAll() - events := make([]*mb.Event, len(groups)) - - for idx, tagGroup := range groups { - - mapstrTags := mapstr.M{} + if len(groups) == 0 { + return nil + } + events := make([]*mb.Event, 0, len(groups)) + for _, tagGroup := range groups { + mapstrTags := make(mapstr.M, len(tagGroup.tags)) for k, v := range tagGroup.tags { mapstrTags[k] = v } - sanitizedMetrics := mapstr.M{} for k, v := range tagGroup.metrics { - eventMapping(k, v, sanitizedMetrics, m.mappings) - } - - if len(sanitizedMetrics) == 0 { - continue - } - - events[idx] = &mb.Event{ - MetricSetFields: sanitizedMetrics, - RootFields: mapstr.M{"labels": mapstrTags}, - Namespace: m.Module().Name(), + ms := eventMapping(k, v, m.mappings) + if ms == nil || len(*ms) == 0 { + continue + } + events = append(events, &mb.Event{ + MetricSetFields: *ms, + RootFields: mapstr.M{"labels": mapstrTags}, + Namespace: m.Module().Name(), + }) } } return events From ca489d59e9b09ff129026db2472a3f4853d8557e Mon Sep 17 00:00:00 2001 From: ritalwar Date: Sat, 21 Oct 2023 23:27:02 +0530 Subject: [PATCH 02/12] Fix test cases --- .../metricbeat/module/statsd/server/data_test.go | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/x-pack/metricbeat/module/statsd/server/data_test.go b/x-pack/metricbeat/module/statsd/server/data_test.go index 9b25499316b..f0502e4cd99 100644 --- a/x-pack/metricbeat/module/statsd/server/data_test.go +++ b/x-pack/metricbeat/module/statsd/server/data_test.go @@ -1130,7 +1130,7 @@ func TestTagsGrouping(t *testing.T) { require.NoError(t, err) events := ms.getEvents() - assert.Len(t, events, 2) + assert.Len(t, events, 4) actualTags := []mapstr.M{} for _, e := range events { @@ -1144,6 +1144,18 @@ func TestTagsGrouping(t *testing.T) { "k2": "v2", }, }, + { + "labels": mapstr.M{ + "k1": "v1", + "k2": "v2", + }, + }, + { + "labels": mapstr.M{ + "k1": "v2", + "k2": "v3", + }, + }, { "labels": mapstr.M{ "k1": "v2", @@ -1222,7 +1234,7 @@ func TestData(t *testing.T) { require.NoError(t, err) events := ms.getEvents() - assert.Len(t, events, 1) + assert.Len(t, events, 10) mbevent := mbtest.StandardizeEvent(ms, *events[0]) mbtest.WriteEventToDataJSON(t, mbevent, "") From 69d22625bf2d98369184eac5a46e17a32c6dd1f1 Mon Sep 17 00:00:00 2001 From: ritalwar Date: Sat, 21 Oct 2023 23:42:39 +0530 Subject: [PATCH 03/12] Fixing Linting issue --- x-pack/metricbeat/module/statsd/server/server.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/metricbeat/module/statsd/server/server.go b/x-pack/metricbeat/module/statsd/server/server.go index 35f303b2d25..80c1b9621e4 100644 --- a/x-pack/metricbeat/module/statsd/server/server.go +++ b/x-pack/metricbeat/module/statsd/server/server.go @@ -107,8 +107,8 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // Host returns the hostname or other module specific value that identifies a // specific host or service instance from which to collect metrics. -func (b *MetricSet) Host() string { - return b.server.(*udp.UdpServer).GetHost() +func (m *MetricSet) Host() string { + return m.server.(*udp.UdpServer).GetHost() } func buildMappings(config []StatsdMapping) (map[string]StatsdMapping, error) { From f1e8e37612d45eea9342748c1bc9db911c2fda93 Mon Sep 17 00:00:00 2001 From: ritalwar Date: Sun, 22 Oct 2023 00:10:30 +0530 Subject: [PATCH 04/12] Update Changelog.asciidoc --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 7ca5ca55525..8afcd42f6fe 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -140,6 +140,7 @@ is collected by it. - Add missing 'TransactionType' dimension for Azure Storage Account. {pull}36413[36413] - Add log error when statsd server fails to start {pull}36477[36477] - Fix CassandraConnectionClosures metric configuration {pull}34742[34742] +- Fix event mapping implementation for statsd module {pull}36925[36925] *Osquerybeat* From 5cd84406150c264228330d27eef78d1b290ba617 Mon Sep 17 00:00:00 2001 From: ritalwar Date: Mon, 23 Oct 2023 10:29:15 +0530 Subject: [PATCH 05/12] Update review comments --- x-pack/metricbeat/module/statsd/server/data.go | 8 ++++---- x-pack/metricbeat/module/statsd/server/data_test.go | 2 +- x-pack/metricbeat/module/statsd/server/server.go | 4 ++-- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/x-pack/metricbeat/module/statsd/server/data.go b/x-pack/metricbeat/module/statsd/server/data.go index 38f952063f1..ada9aeed050 100644 --- a/x-pack/metricbeat/module/statsd/server/data.go +++ b/x-pack/metricbeat/module/statsd/server/data.go @@ -102,11 +102,11 @@ func parse(b []byte) ([]statsdMetric, error) { return metrics, nil } -func eventMapping(metricName string, metricValue interface{}, mappings map[string]StatsdMapping) *mapstr.M { +func eventMapping(metricName string, metricValue interface{}, mappings map[string]StatsdMapping) mapstr.M { m := mapstr.M{} if len(mappings) == 0 { m[common.DeDot(metricName)] = metricValue - return &m + return m } for _, mapping := range mappings { @@ -114,7 +114,7 @@ func eventMapping(metricName string, metricValue interface{}, mappings map[strin // Let's insert it dedotted and continue if metricName == mapping.Metric { m[mapping.Value.Field] = metricValue - return &m + return m } res := mapping.regex.FindStringSubmatch(metricName) @@ -142,7 +142,7 @@ func eventMapping(metricName string, metricValue interface{}, mappings map[strin m[mapping.Value.Field] = metricValue break } - return &m + return m } func newMetricProcessor(ttl time.Duration) *metricProcessor { diff --git a/x-pack/metricbeat/module/statsd/server/data_test.go b/x-pack/metricbeat/module/statsd/server/data_test.go index f0502e4cd99..2bdc97ab5c2 100644 --- a/x-pack/metricbeat/module/statsd/server/data_test.go +++ b/x-pack/metricbeat/module/statsd/server/data_test.go @@ -739,7 +739,7 @@ func TestEventMapping(t *testing.T) { t.Run(test.metricName, func(t *testing.T) { builtMappings, _ := buildMappings(mappings) ms := eventMapping(test.metricName, test.metricValue, builtMappings) - assert.Equal(t, test.expected, *ms) + assert.Equal(t, test.expected, ms) }) } } diff --git a/x-pack/metricbeat/module/statsd/server/server.go b/x-pack/metricbeat/module/statsd/server/server.go index 80c1b9621e4..3d6a1ca991c 100644 --- a/x-pack/metricbeat/module/statsd/server/server.go +++ b/x-pack/metricbeat/module/statsd/server/server.go @@ -177,11 +177,11 @@ func (m *MetricSet) getEvents() []*mb.Event { for k, v := range tagGroup.metrics { ms := eventMapping(k, v, m.mappings) - if ms == nil || len(*ms) == 0 { + if ms == nil || len(ms) == 0 { continue } events = append(events, &mb.Event{ - MetricSetFields: *ms, + MetricSetFields: ms, RootFields: mapstr.M{"labels": mapstrTags}, Namespace: m.Module().Name(), }) From 800fbf5117493761b5fb7b3ed129f415716143f4 Mon Sep 17 00:00:00 2001 From: ritalwar Date: Mon, 23 Oct 2023 10:40:37 +0530 Subject: [PATCH 06/12] Update server.go --- x-pack/metricbeat/module/statsd/server/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/metricbeat/module/statsd/server/server.go b/x-pack/metricbeat/module/statsd/server/server.go index 3d6a1ca991c..13ed5193aab 100644 --- a/x-pack/metricbeat/module/statsd/server/server.go +++ b/x-pack/metricbeat/module/statsd/server/server.go @@ -177,7 +177,7 @@ func (m *MetricSet) getEvents() []*mb.Event { for k, v := range tagGroup.metrics { ms := eventMapping(k, v, m.mappings) - if ms == nil || len(ms) == 0 { + if len(ms) == 0 { continue } events = append(events, &mb.Event{ From ef5837b2946a5e8da6206f714f124408ac31915c Mon Sep 17 00:00:00 2001 From: ritalwar Date: Tue, 24 Oct 2023 21:15:53 +0530 Subject: [PATCH 07/12] Update data_test.go --- .../metricbeat/module/airflow/statsd/_meta/data.json | 8 ++++---- x-pack/metricbeat/module/airflow/statsd/data_test.go | 12 ++++++++---- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/x-pack/metricbeat/module/airflow/statsd/_meta/data.json b/x-pack/metricbeat/module/airflow/statsd/_meta/data.json index 4e5c41437cb..ff89b0ddde2 100644 --- a/x-pack/metricbeat/module/airflow/statsd/_meta/data.json +++ b/x-pack/metricbeat/module/airflow/statsd/_meta/data.json @@ -2,13 +2,13 @@ "@timestamp": "2017-10-12T08:05:34.853Z", "airflow": { "dag_duration": { - "15m_rate": 0.2, - "1m_rate": 0.2, - "5m_rate": 0.2, + "15m_rate": 0, + "1m_rate": 0, + "5m_rate": 0, "count": 1, "max": 200, "mean": 200, - "mean_rate": 0.2222490946071946, + "mean_rate": 38960.532980091164, "median": 200, "min": 200, "p75": 200, diff --git a/x-pack/metricbeat/module/airflow/statsd/data_test.go b/x-pack/metricbeat/module/airflow/statsd/data_test.go index c2c07d32f34..74094ff5d53 100644 --- a/x-pack/metricbeat/module/airflow/statsd/data_test.go +++ b/x-pack/metricbeat/module/airflow/statsd/data_test.go @@ -13,6 +13,7 @@ import ( "github.com/elastic/beats/v7/x-pack/metricbeat/module/statsd/server" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/auditbeat/core" @@ -42,14 +43,14 @@ func getConfig() map[string]interface{} { } } -func createEvent(t *testing.T) { +func createEvent(data string, t *testing.T) { udpAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", STATSD_HOST, STATSD_PORT)) require.NoError(t, err) conn, err := net.DialUDP("udp", nil, udpAddr) require.NoError(t, err) - _, err = fmt.Fprint(conn, "dagrun.duration.failed.a_dagid:200|ms|#k1:v1,k2:v2") + _, err = fmt.Fprint(conn, data) require.NoError(t, err) } @@ -70,15 +71,18 @@ func TestData(t *testing.T) { wg.Done() go ms.Run(reporter) - events = reporter.(*mbtest.CapturingPushReporterV2).BlockingCapture(1) + events = reporter.(*mbtest.CapturingPushReporterV2).BlockingCapture(2) close(done) }(wg) wg.Wait() - createEvent(t) + createEvent("dagrun.duration.failed.a_dagid:200|ms|#k1:v1,k2:v2", t) + createEvent("dagrun.duration.failed.b_dagid:500|ms|#k3:v3,k4:v4", t) <-done + assert.Len(t, events, 2) + if len(events) == 0 { t.Fatal("received no events") } From 4fc3100f11f5e164f1819b45c59f9da882d94f97 Mon Sep 17 00:00:00 2001 From: ritalwar Date: Tue, 24 Oct 2023 21:51:09 +0530 Subject: [PATCH 08/12] Update data_test.go --- x-pack/metricbeat/module/airflow/statsd/data_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/x-pack/metricbeat/module/airflow/statsd/data_test.go b/x-pack/metricbeat/module/airflow/statsd/data_test.go index 74094ff5d53..1a176ce527c 100644 --- a/x-pack/metricbeat/module/airflow/statsd/data_test.go +++ b/x-pack/metricbeat/module/airflow/statsd/data_test.go @@ -20,7 +20,6 @@ import ( _ "github.com/elastic/beats/v7/libbeat/processors/actions" "github.com/elastic/beats/v7/metricbeat/mb" mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing" - _ "github.com/elastic/beats/v7/x-pack/metricbeat/module/statsd/server" ) func init() { From e21fba9e18d9a9686cd921f7dd7b9019af0f44e2 Mon Sep 17 00:00:00 2001 From: ritalwar Date: Wed, 25 Oct 2023 16:10:40 +0530 Subject: [PATCH 09/12] Update comments for getEvents --- x-pack/metricbeat/module/statsd/server/server.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/x-pack/metricbeat/module/statsd/server/server.go b/x-pack/metricbeat/module/statsd/server/server.go index 13ed5193aab..0cb99010122 100644 --- a/x-pack/metricbeat/module/statsd/server/server.go +++ b/x-pack/metricbeat/module/statsd/server/server.go @@ -163,8 +163,13 @@ func buildMappings(config []StatsdMapping) (map[string]StatsdMapping, error) { return mappings, nil } +// It processes metric groups, applies event mappings, and creates Metricbeat events. +// The generated events include metric fields, labels, and the namespace associated with the MetricSet. +// Returns a slice of Metricbeat events. func (m *MetricSet) getEvents() []*mb.Event { groups := m.processor.GetAll() + + // If there are no groups, return an empty slice. if len(groups) == 0 { return nil } @@ -176,7 +181,10 @@ func (m *MetricSet) getEvents() []*mb.Event { } for k, v := range tagGroup.metrics { + // Apply event mapping to the metric and get MetricSetFields. ms := eventMapping(k, v, m.mappings) + + // If no MetricSetFields were generated, continue to the next metric. if len(ms) == 0 { continue } From b2cb507e8cb67e2e2c67575bcfd74f5df5703b50 Mon Sep 17 00:00:00 2001 From: ritalwar Date: Mon, 30 Oct 2023 08:48:11 +0530 Subject: [PATCH 10/12] Address review comments --- x-pack/metricbeat/module/statsd/server/data.go | 1 - x-pack/metricbeat/module/statsd/server/server.go | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/x-pack/metricbeat/module/statsd/server/data.go b/x-pack/metricbeat/module/statsd/server/data.go index ada9aeed050..27024e26284 100644 --- a/x-pack/metricbeat/module/statsd/server/data.go +++ b/x-pack/metricbeat/module/statsd/server/data.go @@ -111,7 +111,6 @@ func eventMapping(metricName string, metricValue interface{}, mappings map[strin for _, mapping := range mappings { // The metricname match the one with no labels in mappings - // Let's insert it dedotted and continue if metricName == mapping.Metric { m[mapping.Value.Field] = metricValue return m diff --git a/x-pack/metricbeat/module/statsd/server/server.go b/x-pack/metricbeat/module/statsd/server/server.go index 0cb99010122..c2366a71b6b 100644 --- a/x-pack/metricbeat/module/statsd/server/server.go +++ b/x-pack/metricbeat/module/statsd/server/server.go @@ -169,7 +169,7 @@ func buildMappings(config []StatsdMapping) (map[string]StatsdMapping, error) { func (m *MetricSet) getEvents() []*mb.Event { groups := m.processor.GetAll() - // If there are no groups, return an empty slice. + // If there are no metric groups, return nil to indicate no events. if len(groups) == 0 { return nil } From 74eb3aefde6bc9ddcfea5a982daccf6d0ec51fdb Mon Sep 17 00:00:00 2001 From: ritalwar Date: Mon, 30 Oct 2023 11:40:10 +0530 Subject: [PATCH 11/12] Update test --- x-pack/metricbeat/module/airflow/statsd/data_test.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/x-pack/metricbeat/module/airflow/statsd/data_test.go b/x-pack/metricbeat/module/airflow/statsd/data_test.go index 1a176ce527c..19cfde6d977 100644 --- a/x-pack/metricbeat/module/airflow/statsd/data_test.go +++ b/x-pack/metricbeat/module/airflow/statsd/data_test.go @@ -20,6 +20,7 @@ import ( _ "github.com/elastic/beats/v7/libbeat/processors/actions" "github.com/elastic/beats/v7/metricbeat/mb" mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing" + _ "github.com/elastic/beats/v7/x-pack/metricbeat/module/statsd/server" ) func init() { @@ -77,11 +78,9 @@ func TestData(t *testing.T) { wg.Wait() createEvent("dagrun.duration.failed.a_dagid:200|ms|#k1:v1,k2:v2", t) - createEvent("dagrun.duration.failed.b_dagid:500|ms|#k3:v3,k4:v4", t) + createEvent("dagrun.duration.failed.b_dagid:500|ms|#k1:v1,k2:v2", t) <-done - assert.Len(t, events, 2) - if len(events) == 0 { t.Fatal("received no events") } From b4a9cd28e86cee9fc40d8fd8fb470cc5303d3437 Mon Sep 17 00:00:00 2001 From: ritalwar Date: Mon, 30 Oct 2023 12:06:14 +0530 Subject: [PATCH 12/12] update data_test.go --- x-pack/metricbeat/module/airflow/statsd/data_test.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/x-pack/metricbeat/module/airflow/statsd/data_test.go b/x-pack/metricbeat/module/airflow/statsd/data_test.go index 19cfde6d977..fcfd943bc68 100644 --- a/x-pack/metricbeat/module/airflow/statsd/data_test.go +++ b/x-pack/metricbeat/module/airflow/statsd/data_test.go @@ -11,8 +11,6 @@ import ( "sync" "testing" - "github.com/elastic/beats/v7/x-pack/metricbeat/module/statsd/server" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -20,7 +18,7 @@ import ( _ "github.com/elastic/beats/v7/libbeat/processors/actions" "github.com/elastic/beats/v7/metricbeat/mb" mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing" - _ "github.com/elastic/beats/v7/x-pack/metricbeat/module/statsd/server" + "github.com/elastic/beats/v7/x-pack/metricbeat/module/statsd/server" ) func init() {