From cef78ed7c12723d670b022d658357feb43af5623 Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Fri, 20 Sep 2024 12:35:38 -0600 Subject: [PATCH] [AWS CloudWatch Metrics] Record previous endTime to use for next collection period (#40870) This PR is to record the previous endTime from the previous collection period to use for calculating the next collection start time and end time. This PR also changes log.logger from cloudwatch to aws.cloudwatch to match the other logger. --- CHANGELOG-developer.next.asciidoc | 1 + .../metricbeat/module/aws/billing/billing.go | 4 +- .../module/aws/cloudwatch/cloudwatch.go | 6 +- .../module/aws/cloudwatch/cloudwatch_test.go | 18 +++-- x-pack/metricbeat/module/aws/utils.go | 13 +++- x-pack/metricbeat/module/aws/utils_test.go | 77 +++++++++++-------- 6 files changed, 77 insertions(+), 42 deletions(-) diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index 0bcca8282b62..abe0164fe70d 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -104,6 +104,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only. - Fix bug in minimum length for request trace logging. {pull}39834[39834] - Close connections properly in Filbeat's HTTPJSON input. {pull}39790[39790] - Add the Offset property to libbeat/reader.Message to store the total number of bytes read and discarded before generating the message. This enables inputs to accurately determine how much data has been read up to the message, using Message.Bytes + Message.Offset. {pull}39873[39873] {issue}39653[39653] +- AWS CloudWatch Metrics record previous endTime to use for next collection period and change log.logger from cloudwatch to aws.cloudwatch. {pull}40870[40870] ==== Added diff --git a/x-pack/metricbeat/module/aws/billing/billing.go b/x-pack/metricbeat/module/aws/billing/billing.go index f3111da60783..378d6564389f 100644 --- a/x-pack/metricbeat/module/aws/billing/billing.go +++ b/x-pack/metricbeat/module/aws/billing/billing.go @@ -62,6 +62,7 @@ type MetricSet struct { *aws.MetricSet logger *logp.Logger CostExplorerConfig CostExplorerConfig `config:"cost_explorer_config"` + PreviousEndTime time.Time } // CostExplorerConfig holds a configuration specific for billing metricset. @@ -121,7 +122,8 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) error { startDate, endDate := getStartDateEndDate(m.Period) // Get startTime and endTime - startTime, endTime := aws.GetStartTimeEndTime(time.Now(), m.Period, m.Latency) + startTime, endTime := aws.GetStartTimeEndTime(time.Now(), m.Period, m.Latency, m.PreviousEndTime) + m.PreviousEndTime = endTime // get cost metrics from cost explorer awsBeatsConfig := m.MetricSet.AwsConfig.Copy() diff --git a/x-pack/metricbeat/module/aws/cloudwatch/cloudwatch.go b/x-pack/metricbeat/module/aws/cloudwatch/cloudwatch.go index 241aa5216634..dde2463ea852 100644 --- a/x-pack/metricbeat/module/aws/cloudwatch/cloudwatch.go +++ b/x-pack/metricbeat/module/aws/cloudwatch/cloudwatch.go @@ -48,6 +48,7 @@ type MetricSet struct { *aws.MetricSet logger *logp.Logger CloudwatchConfigs []Config `config:"metrics" validate:"nonzero,required"` + PreviousEndTime time.Time } // Dimension holds name and value for cloudwatch metricset dimension config. @@ -87,7 +88,7 @@ type namespaceDetail struct { // New creates a new instance of the MetricSet. New is responsible for unpacking // any MetricSet specific configuration options if there are any. func New(base mb.BaseMetricSet) (mb.MetricSet, error) { - logger := logp.NewLogger(metricsetName) + logger := logp.NewLogger(aws.ModuleName + "." + metricsetName) metricSet, err := aws.NewMetricSet(base) if err != nil { return nil, fmt.Errorf("error creating aws metricset: %w", err) @@ -119,7 +120,8 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // of an error set the Error field of mb.Event or simply call report.Error(). func (m *MetricSet) Fetch(report mb.ReporterV2) error { // Get startTime and endTime - startTime, endTime := aws.GetStartTimeEndTime(time.Now(), m.Period, m.Latency) + startTime, endTime := aws.GetStartTimeEndTime(time.Now(), m.Period, m.Latency, m.PreviousEndTime) + m.PreviousEndTime = endTime m.Logger().Debugf("startTime = %s, endTime = %s", startTime, endTime) // Check statistic method in config diff --git a/x-pack/metricbeat/module/aws/cloudwatch/cloudwatch_test.go b/x-pack/metricbeat/module/aws/cloudwatch/cloudwatch_test.go index 3c355d246129..08f878f9bb39 100644 --- a/x-pack/metricbeat/module/aws/cloudwatch/cloudwatch_test.go +++ b/x-pack/metricbeat/module/aws/cloudwatch/cloudwatch_test.go @@ -1269,7 +1269,8 @@ func TestCreateEventsWithIdentifier(t *testing.T) { }} resourceTypeTagFilters := map[string][]aws.Tag{} resourceTypeTagFilters["ec2:instance"] = nameTestEC2Tag - startTime, endTime := aws.GetStartTimeEndTime(time.Now(), m.MetricSet.Period, m.MetricSet.Latency) + var previousEndTime time.Time + startTime, endTime := aws.GetStartTimeEndTime(time.Now(), m.MetricSet.Period, m.MetricSet.Latency, previousEndTime) events, err := m.createEvents(mockCloudwatchSvc, mockTaggingSvc, listMetricWithStatsTotal, resourceTypeTagFilters, regionName, startTime, endTime) assert.NoError(t, err) @@ -1314,7 +1315,8 @@ func TestCreateEventsWithoutIdentifier(t *testing.T) { } resourceTypeTagFilters := map[string][]aws.Tag{} - startTime, endTime := aws.GetStartTimeEndTime(time.Now(), m.MetricSet.Period, m.MetricSet.Latency) + var previousEndTime time.Time + startTime, endTime := aws.GetStartTimeEndTime(time.Now(), m.MetricSet.Period, m.MetricSet.Latency, previousEndTime) events, err := m.createEvents(mockCloudwatchSvc, mockTaggingSvc, listMetricWithStatsTotal, resourceTypeTagFilters, regionName, startTime, endTime) assert.NoError(t, err) @@ -1349,7 +1351,8 @@ func TestCreateEventsWithDataGranularity(t *testing.T) { } resourceTypeTagFilters := map[string][]aws.Tag{} - startTime, endTime := aws.GetStartTimeEndTime(time.Now(), m.MetricSet.Period, m.MetricSet.Latency) + var previousEndTime time.Time + startTime, endTime := aws.GetStartTimeEndTime(time.Now(), m.MetricSet.Period, m.MetricSet.Latency, previousEndTime) events, err := m.createEvents(mockCloudwatchSvc, mockTaggingSvc, listMetricWithStatsTotal, resourceTypeTagFilters, regionName, startTime, endTime) assert.NoError(t, err) @@ -1393,7 +1396,8 @@ func TestCreateEventsWithTagsFilter(t *testing.T) { resourceTypeTagFilters := map[string][]aws.Tag{} resourceTypeTagFilters["ec2:instance"] = nameTestEC2Tag - startTime, endTime := aws.GetStartTimeEndTime(time.Now(), m.MetricSet.Period, m.MetricSet.Latency) + var previousEndTime time.Time + startTime, endTime := aws.GetStartTimeEndTime(time.Now(), m.MetricSet.Period, m.MetricSet.Latency, previousEndTime) events, err := m.createEvents(mockCloudwatchSvc, mockTaggingSvc, listMetricWithStatsTotal, resourceTypeTagFilters, regionName, startTime, endTime) assert.NoError(t, err) assert.Equal(t, 1, len(events)) @@ -1556,7 +1560,8 @@ func TestCreateEventsTimestamp(t *testing.T) { } resourceTypeTagFilters := map[string][]aws.Tag{} - startTime, endTime := aws.GetStartTimeEndTime(time.Now(), m.MetricSet.Period, m.MetricSet.Latency) + var previousEndTime time.Time + startTime, endTime := aws.GetStartTimeEndTime(time.Now(), m.MetricSet.Period, m.MetricSet.Latency, previousEndTime) cloudwatchMock := &MockCloudWatchClientWithoutDim{} resGroupTaggingClientMock := &MockResourceGroupsTaggingClient{} @@ -1570,6 +1575,7 @@ func TestGetStartTimeEndTime(t *testing.T) { m.CloudwatchConfigs = []Config{{Statistic: []string{"Average"}}} m.MetricSet = &aws.MetricSet{Period: 5 * time.Minute} m.logger = logp.NewLogger("test") - startTime, endTime := aws.GetStartTimeEndTime(time.Now(), m.MetricSet.Period, m.MetricSet.Latency) + var previousEndTime time.Time + startTime, endTime := aws.GetStartTimeEndTime(time.Now(), m.MetricSet.Period, m.MetricSet.Latency, previousEndTime) assert.Equal(t, 5*time.Minute, endTime.Sub(startTime)) } diff --git a/x-pack/metricbeat/module/aws/utils.go b/x-pack/metricbeat/module/aws/utils.go index 6e1b3da19515..b5c1a9249137 100644 --- a/x-pack/metricbeat/module/aws/utils.go +++ b/x-pack/metricbeat/module/aws/utils.go @@ -31,10 +31,17 @@ const DefaultApiTimeout = 5 * time.Second // If durations are configured in non-whole minute periods, they are rounded up to the next minute e.g. 90s becomes 120s. // // If `latency` is configured, the period is shifted back in time by specified duration (before period alignment). -func GetStartTimeEndTime(now time.Time, period time.Duration, latency time.Duration) (time.Time, time.Time) { +// If endTime of the previous collection period is recorded, then use this endTime as the new startTime. This will guarantee no gap between collection timestamps. +func GetStartTimeEndTime(now time.Time, period time.Duration, latency time.Duration, previousEndTime time.Time) (time.Time, time.Time) { periodInMinutes := (period + time.Second*29).Round(time.Second * 60) - endTime := now.Add(latency * -1).Truncate(periodInMinutes) - startTime := endTime.Add(periodInMinutes * -1) + var startTime, endTime time.Time + if !previousEndTime.IsZero() { + startTime = previousEndTime + endTime = startTime.Add(periodInMinutes) + } else { + endTime = now.Add(latency * -1).Truncate(periodInMinutes) + startTime = endTime.Add(periodInMinutes * -1) + } return startTime, endTime } diff --git a/x-pack/metricbeat/module/aws/utils_test.go b/x-pack/metricbeat/module/aws/utils_test.go index ed36994eab49..fef95f74c521 100644 --- a/x-pack/metricbeat/module/aws/utils_test.go +++ b/x-pack/metricbeat/module/aws/utils_test.go @@ -322,7 +322,8 @@ func TestGetListMetricsOutputWithWildcard(t *testing.T) { } func TestGetMetricDataPerRegion(t *testing.T) { - startTime, endTime := GetStartTimeEndTime(time.Now(), 10*time.Minute, 0) + var previousEndTime time.Time + startTime, endTime := GetStartTimeEndTime(time.Now(), 10*time.Minute, 0, previousEndTime) mockSvc := &MockCloudWatchClient{} var metricDataQueries []cloudwatchtypes.MetricDataQuery @@ -356,7 +357,8 @@ func TestGetMetricDataPerRegion(t *testing.T) { } func TestGetMetricDataResults(t *testing.T) { - startTime, endTime := GetStartTimeEndTime(time.Now(), 10*time.Minute, 0) + var previousEndTime time.Time + startTime, endTime := GetStartTimeEndTime(time.Now(), 10*time.Minute, 0, previousEndTime) mockSvc := &MockCloudWatchClient{} metricInfo := cloudwatchtypes.Metric{ @@ -393,7 +395,8 @@ func TestGetMetricDataResults(t *testing.T) { } func TestGetMetricDataResultsCrossAccounts(t *testing.T) { - startTime, endTime := GetStartTimeEndTime(time.Now(), 10*time.Minute, 0) + var previousEndTime time.Time + startTime, endTime := GetStartTimeEndTime(time.Now(), 10*time.Minute, 0, previousEndTime) mockSvc := &MockCloudwatchClientCrossAccounts{} metricInfo := cloudwatchtypes.Metric{ @@ -552,6 +555,11 @@ func TestGetResourcesTags(t *testing.T) { } func parseTime(t *testing.T, in string) time.Time { + var zeroTime time.Time + if in == "" { + return zeroTime + } + time, err := time.Parse(time.RFC3339, in) if err != nil { t.Errorf("test setup failed - could not parse time with time.RFC3339: %s", in) @@ -561,39 +569,46 @@ func parseTime(t *testing.T, in string) time.Time { func TestGetStartTimeEndTime(t *testing.T) { var cases = []struct { - title string - start string - period time.Duration - latency time.Duration - expectedStart string - expectedEnd string + title string + start string + period time.Duration + latency time.Duration + previousEndTime string + expectedStart string + expectedEnd string }{ // window should align with period e.g. requesting a 5 minute period at 10:27 gives 10:20->10:25 - {"1 minute", "2022-08-15T13:38:45Z", time.Second * 60, 0, "2022-08-15T13:37:00Z", "2022-08-15T13:38:00Z"}, - {"2 minutes", "2022-08-15T13:38:45Z", time.Second * 60 * 2, 0, "2022-08-15T13:36:00Z", "2022-08-15T13:38:00Z"}, - {"3 minutes", "2022-08-15T13:38:45Z", time.Second * 60 * 3, 0, "2022-08-15T13:33:00Z", "2022-08-15T13:36:00Z"}, - {"5 minutes", "2022-08-15T13:38:45Z", time.Second * 60 * 5, 0, "2022-08-15T13:30:00Z", "2022-08-15T13:35:00Z"}, - {"30 minutes", "2022-08-15T13:38:45Z", time.Second * 60 * 30, 0, "2022-08-15T13:00:00Z", "2022-08-15T13:30:00Z"}, + {"1 minute", "2022-08-15T13:38:45Z", time.Second * 60, 0, "", "2022-08-15T13:37:00Z", "2022-08-15T13:38:00Z"}, + {"2 minutes", "2022-08-15T13:38:45Z", time.Second * 60 * 2, 0, "", "2022-08-15T13:36:00Z", "2022-08-15T13:38:00Z"}, + {"3 minutes", "2022-08-15T13:38:45Z", time.Second * 60 * 3, 0, "", "2022-08-15T13:33:00Z", "2022-08-15T13:36:00Z"}, + {"5 minutes", "2022-08-15T13:38:45Z", time.Second * 60 * 5, 0, "", "2022-08-15T13:30:00Z", "2022-08-15T13:35:00Z"}, + {"30 minutes", "2022-08-15T13:38:45Z", time.Second * 60 * 30, 0, "", "2022-08-15T13:00:00Z", "2022-08-15T13:30:00Z"}, // latency should shift the time *before* period alignment // e.g. requesting a 5 minute period at 10:27 with 1 minutes latency still gives 10:20->10:25, // but with 3 minutes latency gives 10:15->10:20 - {"1 minute, 10 minutes latency", "2022-08-15T13:38:45Z", time.Second * 60, time.Second * 60 * 10, "2022-08-15T13:27:00Z", "2022-08-15T13:28:00Z"}, - {"2 minutes, 1 minute latency", "2022-08-15T13:38:45Z", time.Second * 60 * 2, time.Second * 60, "2022-08-15T13:34:00Z", "2022-08-15T13:36:00Z"}, - {"5 minutes, 4 minutes latency", "2022-08-15T13:38:45Z", time.Second * 60 * 5, time.Second * 60 * 4, "2022-08-15T13:25:00Z", "2022-08-15T13:30:00Z"}, - {"30 minutes, 30 minutes latency", "2022-08-15T13:38:45Z", time.Second * 60 * 30, time.Second * 60 * 30, "2022-08-15T12:30:00Z", "2022-08-15T13:00:00Z"}, + {"1 minute, 10 minutes latency", "2022-08-15T13:38:45Z", time.Second * 60, time.Second * 60 * 10, "", "2022-08-15T13:27:00Z", "2022-08-15T13:28:00Z"}, + {"2 minutes, 1 minute latency", "2022-08-15T13:38:45Z", time.Second * 60 * 2, time.Second * 60, "", "2022-08-15T13:34:00Z", "2022-08-15T13:36:00Z"}, + {"5 minutes, 4 minutes latency", "2022-08-15T13:38:45Z", time.Second * 60 * 5, time.Second * 60 * 4, "", "2022-08-15T13:25:00Z", "2022-08-15T13:30:00Z"}, + {"30 minutes, 30 minutes latency", "2022-08-15T13:38:45Z", time.Second * 60 * 30, time.Second * 60 * 30, "", "2022-08-15T12:30:00Z", "2022-08-15T13:00:00Z"}, // non-whole-minute periods should be rounded up to the nearest minute; latency is applied as-is before period adjustment - {"20 seconds, 45 second latency", "2022-08-15T13:38:45Z", time.Second * 20, time.Second * 45, "2022-08-15T13:37:00Z", "2022-08-15T13:38:00Z"}, - {"1.5 minutes, 60 second latency", "2022-08-15T13:38:45Z", time.Second * 90, time.Second * 60, "2022-08-15T13:34:00Z", "2022-08-15T13:36:00Z"}, - {"just less than 5 minutes, 3 minute latency", "2022-08-15T13:38:45Z", time.Second * 59 * 5, time.Second * 90, "2022-08-15T13:30:00Z", "2022-08-15T13:35:00Z"}, + {"1 minute, 10 minutes latency", "2022-08-15T13:38:45Z", time.Second * 60, time.Second * 60 * 10, "2022-08-15T13:45:00Z", "2022-08-15T13:45:00Z", "2022-08-15T13:46:00Z"}, + {"5 minute, 3 minutes latency", "2022-08-15T13:38:45Z", time.Second * 60 * 5, time.Second * 60 * 3, "2022-08-15T13:45:00Z", "2022-08-15T13:45:00Z", "2022-08-15T13:50:00Z"}, + + // latency should shift the time *before* period alignment + // previousEndTime should be the same as the next startTime + {"1 minute, 10 minutes latency", "2022-08-15T13:38:45Z", time.Second * 60, time.Second * 60 * 10, "", "2022-08-15T13:27:00Z", "2022-08-15T13:28:00Z"}, + {"2 minutes, 1 minute latency", "2022-08-15T13:38:45Z", time.Second * 60 * 2, time.Second * 60, "", "2022-08-15T13:34:00Z", "2022-08-15T13:36:00Z"}, + {"5 minutes, 4 minutes latency", "2022-08-15T13:38:45Z", time.Second * 60 * 5, time.Second * 60 * 4, "", "2022-08-15T13:25:00Z", "2022-08-15T13:30:00Z"}, + {"30 minutes, 30 minutes latency", "2022-08-15T13:38:45Z", time.Second * 60 * 30, time.Second * 60 * 30, "", "2022-08-15T12:30:00Z", "2022-08-15T13:00:00Z"}, } for _, tt := range cases { t.Run(tt.title, func(t *testing.T) { - startTime, expectedStartTime, expectedEndTime := parseTime(t, tt.start), parseTime(t, tt.expectedStart), parseTime(t, tt.expectedEnd) + startTime, previousEndTime, expectedStartTime, expectedEndTime := parseTime(t, tt.start), parseTime(t, tt.previousEndTime), parseTime(t, tt.expectedStart), parseTime(t, tt.expectedEnd) - start, end := GetStartTimeEndTime(startTime, tt.period, tt.latency) + start, end := GetStartTimeEndTime(startTime, tt.period, tt.latency, previousEndTime) if expectedStartTime != start || expectedEndTime != end { t.Errorf("got (%s, %s), want (%s, %s)", start, end, tt.expectedStart, tt.expectedEnd) @@ -607,6 +622,7 @@ func TestGetStartTimeEndTime_AlwaysCreatesContinuousIntervals(t *testing.T) { start, end string } + var previousEndTime time.Time startTime := parseTime(t, "2022-08-24T11:01:00Z") numCalls := 5 @@ -614,38 +630,39 @@ func TestGetStartTimeEndTime_AlwaysCreatesContinuousIntervals(t *testing.T) { title string period time.Duration latency time.Duration + previousEndTime time.Time expectedIntervals []interval }{ // with no latency - {"1 minute", time.Second * 60, 0, []interval{ + {"1 minute", time.Second * 60, 0, previousEndTime, []interval{ {"2022-08-24T11:00:00Z", "2022-08-24T11:01:00Z"}, {"2022-08-24T11:01:00Z", "2022-08-24T11:02:00Z"}, {"2022-08-24T11:02:00Z", "2022-08-24T11:03:00Z"}, {"2022-08-24T11:03:00Z", "2022-08-24T11:04:00Z"}, {"2022-08-24T11:04:00Z", "2022-08-24T11:05:00Z"}, }}, - {"2 minutes", time.Second * 60 * 2, 0, []interval{ + {"2 minutes", time.Second * 60 * 2, 0, previousEndTime, []interval{ {"2022-08-24T10:58:00Z", "2022-08-24T11:00:00Z"}, {"2022-08-24T11:00:00Z", "2022-08-24T11:02:00Z"}, {"2022-08-24T11:02:00Z", "2022-08-24T11:04:00Z"}, {"2022-08-24T11:04:00Z", "2022-08-24T11:06:00Z"}, {"2022-08-24T11:06:00Z", "2022-08-24T11:08:00Z"}, }}, - {"3 minutes", time.Second * 60 * 3, 0, []interval{ + {"3 minutes", time.Second * 60 * 3, 0, previousEndTime, []interval{ {"2022-08-24T10:57:00Z", "2022-08-24T11:00:00Z"}, {"2022-08-24T11:00:00Z", "2022-08-24T11:03:00Z"}, {"2022-08-24T11:03:00Z", "2022-08-24T11:06:00Z"}, {"2022-08-24T11:06:00Z", "2022-08-24T11:09:00Z"}, {"2022-08-24T11:09:00Z", "2022-08-24T11:12:00Z"}, }}, - {"5 minutes", time.Second * 60 * 5, 0, []interval{ + {"5 minutes", time.Second * 60 * 5, 0, previousEndTime, []interval{ {"2022-08-24T10:55:00Z", "2022-08-24T11:00:00Z"}, {"2022-08-24T11:00:00Z", "2022-08-24T11:05:00Z"}, {"2022-08-24T11:05:00Z", "2022-08-24T11:10:00Z"}, {"2022-08-24T11:10:00Z", "2022-08-24T11:15:00Z"}, {"2022-08-24T11:15:00Z", "2022-08-24T11:20:00Z"}, }}, - {"30 minutes", time.Second * 60 * 30, 0, []interval{ + {"30 minutes", time.Second * 60 * 30, 0, previousEndTime, []interval{ {"2022-08-24T10:30:00Z", "2022-08-24T11:00:00Z"}, {"2022-08-24T11:00:00Z", "2022-08-24T11:30:00Z"}, {"2022-08-24T11:30:00Z", "2022-08-24T12:00:00Z"}, @@ -654,7 +671,7 @@ func TestGetStartTimeEndTime_AlwaysCreatesContinuousIntervals(t *testing.T) { }}, // with 90s latency (sanity check) - {"1 minute with 2 minute latency", time.Second * 60, time.Second * 90, []interval{ + {"1 minute with 2 minute latency", time.Second * 60, time.Second * 90, previousEndTime, []interval{ {"2022-08-24T10:58:00Z", "2022-08-24T10:59:00Z"}, {"2022-08-24T10:59:00Z", "2022-08-24T11:00:00Z"}, {"2022-08-24T11:00:00Z", "2022-08-24T11:01:00Z"}, @@ -669,7 +686,7 @@ func TestGetStartTimeEndTime_AlwaysCreatesContinuousIntervals(t *testing.T) { intervals := make([]interval, numCalls) for i := range intervals { adjustedStartTime := startTime.Add(tt.period * time.Duration(i)) - start, end := GetStartTimeEndTime(adjustedStartTime, tt.period, tt.latency) + start, end := GetStartTimeEndTime(adjustedStartTime, tt.period, tt.latency, tt.previousEndTime) intervals[i] = interval{start.Format(time.RFC3339), end.Format(time.RFC3339)} }