diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 30132874cb31..b0f3817b18dc 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -169,6 +169,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] *Metricbeat* +- Fix first HTTP 401 error when fetching metrics from the Kubelet API caused by a token update {pull}40636[40636] - Fix Azure Monitor 429 error by causing metricbeat to retry the request again. {pull}38294[38294] - Fix fields not being parsed correctly in postgresql/database {issue}25301[25301] {pull}37720[37720] - rabbitmq/queue - Change the mapping type of `rabbitmq.queue.consumers.utilisation.pct` to `scaled_float` from `long` because the values fall within the range of `[0.0, 1.0]`. Previously, conversion to integer resulted in reporting either `0` or `1`. diff --git a/metricbeat/helper/http.go b/metricbeat/helper/http.go index 9b8cf792879c..d90213d4f1f8 100644 --- a/metricbeat/helper/http.go +++ b/metricbeat/helper/http.go @@ -23,14 +23,15 @@ import ( "encoding/json" "fmt" "io" - "io/ioutil" "net/http" + "os" + + "github.com/elastic/elastic-agent-libs/transport/httpcommon" + "github.com/elastic/elastic-agent-libs/useragent" "github.com/elastic/beats/v7/libbeat/version" "github.com/elastic/beats/v7/metricbeat/helper/dialer" "github.com/elastic/beats/v7/metricbeat/mb" - "github.com/elastic/elastic-agent-libs/transport/httpcommon" - "github.com/elastic/elastic-agent-libs/useragent" ) var userAgent = useragent.UserAgent("Metricbeat", version.GetDefaultVersion(), version.Commit(), version.BuildTime().String()) @@ -38,13 +39,14 @@ var userAgent = useragent.UserAgent("Metricbeat", version.GetDefaultVersion(), v // HTTP is a custom HTTP Client that handle the complexity of connection and retrieving information // from HTTP endpoint. type HTTP struct { - hostData mb.HostData - client *http.Client // HTTP client that is reused across requests. - headers http.Header - name string - uri string - method string - body []byte + hostData mb.HostData + bearerFile string + client *http.Client // HTTP client that is reused across requests. + headers http.Header + name string + uri string + method string + body []byte } // NewHTTP creates new http helper @@ -57,7 +59,7 @@ func NewHTTP(base mb.BaseMetricSet) (*HTTP, error) { return NewHTTPFromConfig(config, base.HostData()) } -// newHTTPWithConfig creates a new http helper from some configuration +// NewHTTPFromConfig newHTTPWithConfig creates a new http helper from some configuration func NewHTTPFromConfig(config Config, hostData mb.HostData) (*HTTP, error) { headers := http.Header{} if config.Headers == nil { @@ -96,12 +98,13 @@ func NewHTTPFromConfig(config Config, hostData mb.HostData) (*HTTP, error) { } return &HTTP{ - hostData: hostData, - client: client, - headers: headers, - method: "GET", - uri: hostData.SanitizedURI, - body: nil, + hostData: hostData, + bearerFile: config.BearerTokenFile, + client: client, + headers: headers, + method: "GET", + uri: hostData.SanitizedURI, + body: nil, }, nil } @@ -126,7 +129,7 @@ func (h *HTTP) FetchResponse() (*http.Response, error) { resp, err := h.client.Do(req) if err != nil { - return nil, fmt.Errorf("error making http request: %v", err) + return nil, fmt.Errorf("error making http request: %w", err) } return resp, nil @@ -179,7 +182,7 @@ func (h *HTTP) FetchContent() ([]byte, error) { return nil, fmt.Errorf("HTTP error %d in %s: %s", resp.StatusCode, h.name, resp.Status) } - return ioutil.ReadAll(resp.Body) + return io.ReadAll(resp.Body) } // FetchScanner returns a Scanner for the content. @@ -210,11 +213,23 @@ func (h *HTTP) FetchJSON() (map[string]interface{}, error) { return data, nil } -// getAuthHeaderFromToken reads a bearer authorizaiton token from the given file +func (h *HTTP) RefreshAuthorizationHeader() (bool, error) { + if h.bearerFile != "" { + header, err := getAuthHeaderFromToken(h.bearerFile) + if err != nil { + return false, err + } + h.headers.Set("Authorization", header) + return true, nil + } + return false, nil +} + +// getAuthHeaderFromToken reads a bearer authorization token from the given file func getAuthHeaderFromToken(path string) (string, error) { var token string - b, err := ioutil.ReadFile(path) + b, err := os.ReadFile(path) if err != nil { return "", fmt.Errorf("reading bearer token file: %w", err) } diff --git a/metricbeat/helper/http_test.go b/metricbeat/helper/http_test.go index 3fcb25578bae..3666dc564aeb 100644 --- a/metricbeat/helper/http_test.go +++ b/metricbeat/helper/http_test.go @@ -24,6 +24,7 @@ import ( "net/http" "net/http/httptest" "os" + "path/filepath" "runtime" "testing" "time" @@ -295,6 +296,42 @@ func TestUserAgentCheck(t *testing.T) { assert.Contains(t, ua, "Metricbeat") } +func TestRefreshAuthorizationHeader(t *testing.T) { + path := t.TempDir() + bearerFileName := "token" + bearerFilePath := filepath.Join(path, bearerFileName) + + getAuth := func(helper *HTTP) string { + for k, v := range helper.headers { + if k == "Authorization" { + return v[0] + } + } + return "" + } + + firstToken := "token-1" + err := os.WriteFile(bearerFilePath, []byte(firstToken), 0644) + assert.NoError(t, err) + + helper := &HTTP{bearerFile: bearerFilePath, headers: make(http.Header)} + updated, err := helper.RefreshAuthorizationHeader() + assert.NoError(t, err) + assert.True(t, updated) + expected := fmt.Sprintf("Bearer %s", firstToken) + assert.Equal(t, expected, getAuth(helper)) + + secondToken := "token-2" + err = os.WriteFile(bearerFilePath, []byte(secondToken), 0644) + assert.NoError(t, err) + + updated, err = helper.RefreshAuthorizationHeader() + assert.NoError(t, err) + assert.True(t, updated) + expected = fmt.Sprintf("Bearer %s", secondToken) + assert.Equal(t, expected, getAuth(helper)) +} + func checkTimeout(t *testing.T, h *HTTP) { t.Helper() diff --git a/metricbeat/module/kubernetes/container/container.go b/metricbeat/module/kubernetes/container/container.go index d1071f613de8..e48926215483 100644 --- a/metricbeat/module/kubernetes/container/container.go +++ b/metricbeat/module/kubernetes/container/container.go @@ -20,12 +20,13 @@ package container import ( "fmt" + "github.com/elastic/elastic-agent-libs/mapstr" + "github.com/elastic/beats/v7/metricbeat/helper" "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/beats/v7/metricbeat/mb/parse" k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes" "github.com/elastic/beats/v7/metricbeat/module/kubernetes/util" - "github.com/elastic/elastic-agent-libs/mapstr" ) const ( diff --git a/metricbeat/module/kubernetes/kubernetes.go b/metricbeat/module/kubernetes/kubernetes.go index 23611e0b63c5..238b8ec21d46 100644 --- a/metricbeat/module/kubernetes/kubernetes.go +++ b/metricbeat/module/kubernetes/kubernetes.go @@ -19,6 +19,8 @@ package kubernetes import ( "fmt" + httpnet "net/http" + "strings" "sync" "time" @@ -123,7 +125,7 @@ func (m *module) GetStateMetricsFamilies(prometheus p.Prometheus) ([]*p.MetricFa defer m.kubeStateMetricsCache.lock.Unlock() now := time.Now() - // NOTE: These entries will be never removed, this can be a leak if + // NOTE: These entries will never be removed, this can be a leak if // metricbeat is used to monitor clusters dynamically created. // (https://github.com/elastic/beats/pull/25640#discussion_r633395213) familiesCache := m.kubeStateMetricsCache.getCacheMapEntry(m.cacheHash) @@ -142,13 +144,32 @@ func (m *module) GetKubeletStats(http *helper.HTTP) ([]byte, error) { now := time.Now() - // NOTE: These entries will be never removed, this can be a leak if + // NOTE: These entries will never be removed, this can be a leak if // metricbeat is used to monitor clusters dynamically created. // (https://github.com/elastic/beats/pull/25640#discussion_r633395213) statsCache := m.kubeletStatsCache.getCacheMapEntry(m.cacheHash) + // Check if the last time we tried to make a request to the Kubelet API ended in a 401 Unauthorized error. + // If this is the case, we should not keep making requests. + errorUnauthorisedMsg := fmt.Sprintf("HTTP error %d", httpnet.StatusUnauthorized) + if statsCache.lastFetchErr != nil && strings.Contains(statsCache.lastFetchErr.Error(), errorUnauthorisedMsg) { + return statsCache.sharedStats, statsCache.lastFetchErr + } + + // If this is the first request, or it has passed more time than config.period, we should + // make a request to the Kubelet API again to get the last metrics' values. if statsCache.lastFetchTimestamp.IsZero() || now.Sub(statsCache.lastFetchTimestamp) > m.Config().Period { statsCache.sharedStats, statsCache.lastFetchErr = http.FetchContent() + + // If we got an unauthorized error from our HTTP request, it is possible the token has expired. + // We should update the Authorization header in that case. We only try this for the first time + // we get HTTP 401 to avoid getting in a loop in case the cause of the error is something different. + if statsCache.lastFetchErr != nil && strings.Contains(statsCache.lastFetchErr.Error(), errorUnauthorisedMsg) { + if _, err := http.RefreshAuthorizationHeader(); err == nil { + statsCache.sharedStats, statsCache.lastFetchErr = http.FetchContent() + } + } + statsCache.lastFetchTimestamp = now }