Skip to content

Commit

Permalink
Additional stats fields for Elasticsearch (#41944)
Browse files Browse the repository at this point in the history
* Perform an additional _settings API call for Elasticsearch module
* Added filter_path for cluster state & index settings fetch
* Added index creation version

(cherry picked from commit 16c753c)
  • Loading branch information
3kt authored and mergify[bot] committed Dec 12, 2024
1 parent 63df576 commit ee739f9
Show file tree
Hide file tree
Showing 9 changed files with 163 additions and 13 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Add support for region/zone for Vertex AI service in GCP module {pull}41551[41551]
- Add support for location label as an optional configuration parameter in GCP metrics metricset. {issue}41550[41550] {pull}41626[41626]
- Add support for podman metrics in docker module. {pull}41889[41889]
- Added `tier_preference`, `creation_date` and `version` fields to the `elasticsearch.index` metricset. {pull}41944[41944]

*Metricbeat*
- Add benchmark module {pull}41801[41801]
Expand Down
21 changes: 21 additions & 0 deletions metricbeat/docs/fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -32161,6 +32161,27 @@ type: keyword

--

*`elasticsearch.index.tier_preference`*::
+
--
type: keyword

--

*`elasticsearch.index.creation_date`*::
+
--
type: date

--

*`elasticsearch.index.version`*::
+
--
type: keyword

--

*`elasticsearch.index.name`*::
+
--
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/module/elasticsearch/cluster_stats/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func eventMapping(r mb.ReporterV2, httpClient *helper.HTTP, info elasticsearch.I
}

clusterStateMetrics := []string{"version", "master_node", "nodes", "routing_table"}
clusterState, err := elasticsearch.GetClusterState(httpClient, httpClient.GetURI(), clusterStateMetrics)
clusterState, err := elasticsearch.GetClusterState(httpClient, httpClient.GetURI(), clusterStateMetrics, []string{})
if err != nil {
return fmt.Errorf("failed to get cluster state from Elasticsearch: %w", err)
}
Expand Down
34 changes: 32 additions & 2 deletions metricbeat/module/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,13 +288,21 @@ func GetLicense(http *helper.HTTP, resetURI string) (*License, error) {
}

// GetClusterState returns cluster state information.
func GetClusterState(http *helper.HTTP, resetURI string, metrics []string) (mapstr.M, error) {
func GetClusterState(http *helper.HTTP, resetURI string, metrics []string, filterPaths []string) (mapstr.M, error) {
queryParams := []string{"local=true"}
clusterStateURI := "_cluster/state"
if len(metrics) > 0 {
clusterStateURI += "/" + strings.Join(metrics, ",")
}

content, err := fetchPath(http, resetURI, clusterStateURI, "local=true")
if len(filterPaths) > 0 {
filterPathQueryParam := "filter_path=" + strings.Join(filterPaths, ",")
queryParams = append(queryParams, filterPathQueryParam)
}

queryString := strings.Join(queryParams, "&")

content, err := fetchPath(http, resetURI, clusterStateURI, queryString)
if err != nil {
return nil, err
}
Expand All @@ -304,6 +312,28 @@ func GetClusterState(http *helper.HTTP, resetURI string, metrics []string) (maps
return clusterState, err
}

func GetIndexSettings(http *helper.HTTP, resetURI string, indexPattern string, filterPaths []string) (mapstr.M, error) {

queryParams := []string{"local=true", "expand_wildcards=hidden,all"}
indicesSettingsURI := indexPattern + "/_settings"

if len(filterPaths) > 0 {
filterPathQueryParam := "filter_path=" + strings.Join(filterPaths, ",")
queryParams = append(queryParams, filterPathQueryParam)
}

queryString := strings.Join(queryParams, "&")

content, err := fetchPath(http, resetURI, indicesSettingsURI, queryString)
if err != nil {
return nil, err
}

var indicesSettings map[string]interface{}
err = json.Unmarshal(content, &indicesSettings)
return indicesSettings, err
}

// GetClusterSettingsWithDefaults returns cluster settings.
func GetClusterSettingsWithDefaults(http *helper.HTTP, resetURI string, filterPaths []string) (mapstr.M, error) {
return GetClusterSettings(http, resetURI, true, filterPaths)
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/module/elasticsearch/fields.go

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions metricbeat/module/elasticsearch/index/_meta/data.json
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@
}
},
"status": "green",
"tier_preference": "data_content",
"creation_date": 1731657995821,
"version": "8505000",
"hidden": true,
"shards": {
"total": 1,
Expand Down
6 changes: 6 additions & 0 deletions metricbeat/module/elasticsearch/index/_meta/fields.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@
type: keyword
- name: status
type: keyword
- name: tier_preference
type: keyword
- name: creation_date
type: date
- name: version
type: keyword
- name: name
type: keyword
description: >
Expand Down
105 changes: 97 additions & 8 deletions metricbeat/module/elasticsearch/index/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package index
import (
"encoding/json"
"fmt"
"strconv"

"github.com/joeshaw/multierror"

Expand All @@ -40,9 +41,12 @@ type Index struct {
Primaries primaries `json:"primaries"`
Total total `json:"total"`

Index string `json:"index"`
Status string `json:"status"`
Shards shardStats `json:"shards"`
Index string `json:"index"`
Status string `json:"status"`
TierPreference string `json:"tier_preference"`
CreationDate int `json:"creation_date"`
Version string `json:"version"`
Shards shardStats `json:"shards"`
}

type primaries struct {
Expand Down Expand Up @@ -180,11 +184,19 @@ type bulkStats struct {

func eventsMapping(r mb.ReporterV2, httpClient *helper.HTTP, info elasticsearch.Info, content []byte, isXpack bool) error {
clusterStateMetrics := []string{"routing_table"}
clusterState, err := elasticsearch.GetClusterState(httpClient, httpClient.GetURI(), clusterStateMetrics)
clusterStateFilterPaths := []string{"routing_table"}
clusterState, err := elasticsearch.GetClusterState(httpClient, httpClient.GetURI(), clusterStateMetrics, clusterStateFilterPaths)
if err != nil {
return fmt.Errorf("failure retrieving cluster state from Elasticsearch: %w", err)
}

indicesSettingsPattern := "*,.*"
indicesSettingsFilterPaths := []string{"*.settings.index.creation_date", "*.settings.index.**._tier_preference", "*.settings.index.version.created"}
indicesSettings, err := elasticsearch.GetIndexSettings(httpClient, httpClient.GetURI(), indicesSettingsPattern, indicesSettingsFilterPaths)
if err != nil {
return fmt.Errorf("failure retrieving index settings from Elasticsearch: %w", err)
}

var indicesStats stats
if err := parseAPIResponse(content, &indicesStats); err != nil {
return fmt.Errorf("failure parsing Indices Stats Elasticsearch API response: %w", err)
Expand All @@ -204,6 +216,12 @@ func eventsMapping(r mb.ReporterV2, httpClient *helper.HTTP, info elasticsearch.
continue
}

err = addIndexSettings(&idx, indicesSettings)
if err != nil {
errs = append(errs, fmt.Errorf("failure adding index settings: %w", err))
continue
}

event.ModuleFields.Put("cluster.id", info.ClusterID)
event.ModuleFields.Put("cluster.name", info.ClusterName)

Expand Down Expand Up @@ -271,6 +289,63 @@ func addClusterStateFields(idx *Index, clusterState mapstr.M) error {
return nil
}

func addIndexSettings(idx *Index, indicesSettings mapstr.M) error {

// Recover the index settings for our specific index
indexSettingsValue, err := indicesSettings.GetValue(idx.Index)
if err != nil {
return fmt.Errorf("failed to get index settings for index %s: %w", idx.Index, err)
}

indexSettings, ok := indexSettingsValue.(map[string]interface{})
if !ok {
return fmt.Errorf("index settings is not a map for index: %s", idx.Index)
}

indexCreationDate, err := getIndexSettingForIndex(indexSettings, idx.Index, "index.creation_date")
if err != nil {
return fmt.Errorf("failed to get index creation date: %w", err)
}

idx.CreationDate, err = strconv.Atoi(indexCreationDate)
if err != nil {
return fmt.Errorf("failed to convert index creation date to int: %w", err)
}

indexTierPreference, err := getIndexSettingForIndex(indexSettings, idx.Index, "index.routing.allocation.require._tier_preference")
if err != nil {
indexTierPreference, err = getIndexSettingForIndex(indexSettings, idx.Index, "index.routing.allocation.include._tier_preference")
if err != nil {
return fmt.Errorf("failed to get index tier preference: %w", err)
}
}

idx.TierPreference = indexTierPreference

indexVersion, err := getIndexSettingForIndex(indexSettings, idx.Index, "index.version.created")
if err != nil {
return fmt.Errorf("failed to get index version: %w", err)
}

idx.Version = indexVersion

return nil
}

func getIndexSettingForIndex(indexSettings mapstr.M, index, settingKey string) (string, error) {
fieldKey := "settings." + settingKey
value, err := indexSettings.GetValue(fieldKey)
if err != nil {
return "", fmt.Errorf("'"+fieldKey+"': %w", err)
}

setting, ok := value.(string)
if !ok {
return "", elastic.MakeErrorForMissingField(fieldKey, elastic.Elasticsearch)
}
return setting, nil
}

func getClusterStateMetricForIndex(clusterState mapstr.M, index, metricKey string) (mapstr.M, error) {
fieldKey := metricKey + ".indices." + index
value, err := clusterState.GetValue(fieldKey)
Expand Down Expand Up @@ -308,8 +383,15 @@ func getIndexStatus(shards map[string]interface{}) (string, error) {

shard := mapstr.M(s)

isPrimary := shard["primary"].(bool)
state := shard["state"].(string)
isPrimary, ok := shard["primary"].(bool)
if !ok {
return "", fmt.Errorf("%v.shards[%v].primary is not a boolean", indexName, shardIdx)
}

state, ok := shard["state"].(string)
if !ok {
return "", fmt.Errorf("%v.shards[%v].state is not a string", indexName, shardIdx)
}

if isPrimary {
areAllPrimariesStarted = areAllPrimariesStarted && (state == "STARTED")
Expand Down Expand Up @@ -357,8 +439,15 @@ func getIndexShardStats(shards mapstr.M) (*shardStats, error) {

shard := mapstr.M(s)

isPrimary := shard["primary"].(bool)
state := shard["state"].(string)
isPrimary, ok := shard["primary"].(bool)
if !ok {
return nil, fmt.Errorf("%v.shards[%v].primary is not a boolean", indexName, shardIdx)
}

state, ok := shard["state"].(string)
if !ok {
return nil, fmt.Errorf("%v.shards[%v].state is not a string", indexName, shardIdx)
}

if isPrimary {
primaries++
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/tests/system/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def test_index_management(self):
assert len(es.cat.templates(name='metricbeat-*', h='name')) > 0

@unittest.skipUnless(INTEGRATION_TESTS, "integration test")
@pytest.mark.timeout(8*60, func_only=True)
@pytest.mark.timeout(8 * 60, func_only=True)
def test_dashboards(self):
"""
Test that the dashboards can be loaded with `setup --dashboards`
Expand Down

0 comments on commit ee739f9

Please sign in to comment.