Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Work in progress] Metricbeat - fix Azure duplicates #36778

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from
69 changes: 62 additions & 7 deletions x-pack/metricbeat/module/azure/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,16 +103,24 @@ func (client *Client) InitResources(fn mapResourceMetrics) error {
// GetMetricValues returns the specified metric data points for the specified resource ID/namespace.
func (client *Client) GetMetricValues(metrics []Metric, report mb.ReporterV2) []Metric {
var resultedMetrics []Metric

// Same end time for all metrics in the same batch
endTime := time.Now().UTC()

// loop over the set of metrics
for _, metric := range metrics {
// select period to collect metrics, will double the interval value in order to retrieve any missing values
//if timegrain is larger than intervalx2 then interval will be assigned the timegrain value
interval := client.Config.Period
if t := convertTimegrainToDuration(metric.TimeGrain); t > interval*2 {
interval = t
}
endTime := time.Now().UTC()
startTime := endTime.Add(interval * (-2))
duration := convertTimegrainToDuration(metric.TimeGrain)

// Adjust end time based on timegrain
endTime = endTime.Add(interval * (-1))

// Fetch in the range [{-2xINTERVAL},{-INTERVAL}) with a delay of {INTERVAL}
// It results in one data point {-2xINTERVAL} per call
// Adjust start time based on timegrain for the current metric
startTime := endTime.Add(interval * (-1))
timespan := fmt.Sprintf("%s/%s", startTime.Format(time.RFC3339), endTime.Format(time.RFC3339))

// build the 'filter' parameter which will contain any dimensions configured
Expand All @@ -124,6 +132,52 @@ func (client *Client) GetMetricValues(metrics []Metric, report mb.ReporterV2) []
}
filter = strings.Join(filterList, " AND ")
}

// Interval math for timegrain > period
if duration > client.Config.Period {

inTimespan := false

var diffSec = int64(endTime.Second() - startTime.Second())

var diffMin = int64(endTime.Minute() - startTime.Minute())
var diffMinDuration = time.Duration(diffMin) * time.Minute

var diffHour = int64(endTime.Hour() - startTime.Hour())
var diffHourDuration = time.Duration(diffHour) * time.Hour

// If timegrain is unit 1 day, 1 hour or 1 min
if duration == 24*time.Hour {
startOfDay := endTime.Truncate(24 * time.Hour)
if (startOfDay.Equal(startTime) || startOfDay.After(startTime)) && startOfDay.Before(endTime) {
inTimespan = true
}

} else if duration >= time.Hour {
if diffMin < 0 && diffHourDuration > 0 && diffHourDuration%duration == 0 {
inTimespan = true
}
} else {
if diffSec < 0 && diffMinDuration%duration == 0 {
inTimespan = true
}
}

// if the timegrain mark is not within the sampling timespan, remove that metric from the list in this batch and skip to the next one
if !inTimespan {
// Remove metric from list
ind := 0
for i, currentMetric := range client.ResourceConfigurations.Metrics {
if matchMetrics(currentMetric, metric) {
ind = i
break
}
}
client.ResourceConfigurations.Metrics = append(client.ResourceConfigurations.Metrics[:ind], client.ResourceConfigurations.Metrics[ind+1:]...)
continue
}
}

resp, timegrain, err := client.AzureMonitorService.GetMetricValues(metric.ResourceSubId, metric.Namespace, metric.TimeGrain, timespan, metric.Names,
metric.Aggregations, filter)
if err != nil {
Expand All @@ -133,7 +187,7 @@ func (client *Client) GetMetricValues(metrics []Metric, report mb.ReporterV2) []
} else {
for i, currentMetric := range client.ResourceConfigurations.Metrics {
if matchMetrics(currentMetric, metric) {
current := mapMetricValues(resp, currentMetric.Values, endTime.Truncate(time.Minute).Add(interval*(-1)), endTime.Truncate(time.Minute))
current := mapMetricValues(resp, currentMetric.Values)
client.ResourceConfigurations.Metrics[i].Values = current
if client.ResourceConfigurations.Metrics[i].TimeGrain == "" {
client.ResourceConfigurations.Metrics[i].TimeGrain = timegrain
Expand Down Expand Up @@ -170,14 +224,15 @@ func (client *Client) CreateMetric(resourceId string, subResourceId string, name

// MapMetricByPrimaryAggregation will map the primary aggregation of the metric definition to the client metric
func (client *Client) MapMetricByPrimaryAggregation(metrics []armmonitor.MetricDefinition, resourceId string, subResourceId string, namespace string, dim []Dimension, timegrain string) []Metric {
var clientMetrics []Metric

metricGroups := make(map[string][]armmonitor.MetricDefinition)

for _, met := range metrics {
metricGroups[string(*met.PrimaryAggregationType)] = append(metricGroups[string(*met.PrimaryAggregationType)], met)
}

clientMetrics := make([]Metric, 0, len(metricGroups))

for key, metricGroup := range metricGroups {
var metricNames []string
for _, metricName := range metricGroup {
Expand Down
9 changes: 5 additions & 4 deletions x-pack/metricbeat/module/azure/client_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const DefaultTimeGrain = "PT5M"
var instanceIdRegex = regexp.MustCompile(`.*?(\d+)$`)

// mapMetricValues should map the metric values
func mapMetricValues(metrics []armmonitor.Metric, previousMetrics []MetricValue, startTime time.Time, endTime time.Time) []MetricValue {
func mapMetricValues(metrics []armmonitor.Metric, previousMetrics []MetricValue) []MetricValue {
var currentMetrics []MetricValue
// compare with the previously returned values and filter out any double records
for _, v := range metrics {
Expand All @@ -29,9 +29,10 @@ func mapMetricValues(metrics []armmonitor.Metric, previousMetrics []MetricValue,
continue
}
// remove metric values that are not part of the timeline selected
if mv.TimeStamp.After(startTime) && mv.TimeStamp.Before(endTime) {
continue
}
// We don't need this filtering anymore, since we modify the timespan directly
//if mv.TimeStamp.After(startTime) && mv.TimeStamp.Before(endTime) {
// continue
//}
// define the new metric value and match aggregations values
var val MetricValue
val.name = *v.Name.Value
Expand Down
Loading