Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

report output health #3127

Merged
merged 13 commits into from
Dec 6, 2023
32 changes: 32 additions & 0 deletions changelog/fragments/1701340465-report-output-health.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: feature

# Change summary; a 80ish characters long description of the change.
summary: Report output health state to logs-fleet_server.output_health-default data stream

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
#description:

# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component:

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: https://github.com/owner/repo/3127

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: https://github.com/owner/repo/3116
1 change: 1 addition & 0 deletions internal/pkg/dl/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const (
FleetPolicies = ".fleet-policies"
FleetPoliciesLeader = ".fleet-policies-leader"
FleetServers = ".fleet-servers"
FleetOutputHealth = "logs-fleet_server.output_health-default"
)

// Query fields
Expand Down
41 changes: 41 additions & 0 deletions internal/pkg/dl/output_health.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package dl

import (
"context"
"encoding/json"
"time"

"github.com/elastic/fleet-server/v7/internal/pkg/bulk"
"github.com/elastic/fleet-server/v7/internal/pkg/model"
"github.com/gofrs/uuid"
)

func CreateOutputHealth(ctx context.Context, bulker bulk.Bulk, doc model.OutputHealth) error {
return createOutputHealth(ctx, bulker, FleetOutputHealth, doc)
}

func createOutputHealth(ctx context.Context, bulker bulk.Bulk, index string, doc model.OutputHealth) error {
if doc.Timestamp == "" {
doc.Timestamp = time.Now().UTC().Format(time.RFC3339)
}
doc.DataStream = &model.DataStream{
Dataset: "fleet_server.output_health",
Type: "logs",
Namespace: "default",
Comment on lines +26 to +28
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these be constants? Can Namespace ever be something else?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it will be always default.

}
body, err := json.Marshal(doc)
if err != nil {
return err
}

id, err := uuid.NewV4()
if err != nil {
return err
}
_, err = bulker.Create(ctx, index, id.String(), body, bulk.WithRefresh())
return err
}
25 changes: 25 additions & 0 deletions internal/pkg/model/schema.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 23 additions & 3 deletions internal/pkg/policy/policy_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/rs/zerolog"
"go.elastic.co/apm/v2"

"github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/fleet-server/v7/internal/pkg/apikey"
"github.com/elastic/fleet-server/v7/internal/pkg/bulk"
"github.com/elastic/fleet-server/v7/internal/pkg/dl"
Expand Down Expand Up @@ -260,17 +261,36 @@ func (p *Output) prepareElasticsearch(
ctx := zlog.WithContext(ctx)
outputAPIKey, err :=
generateOutputAPIKey(ctx, outputBulker, agent.Id, p.Name, p.Role.Raw)
// reporting output error status to self monitor and not returning the error to keep fleet-server running

// reporting output health and not returning the error to keep fleet-server running
if outputAPIKey == nil && p.Type == OutputTypeRemoteElasticsearch {
if err != nil {
zerolog.Ctx(ctx).Warn().Err(err).Msg("Could not create API key in remote ES")
doc := model.OutputHealth{
Output: p.Name,
State: client.UnitStateDegraded.String(),
Message: fmt.Sprintf("remote ES could not create API key due to error: %v", err),
}
zerolog.Ctx(ctx).Warn().Err(err).Str("outputName", p.Name).Msg(doc.Message)

if err := dl.CreateOutputHealth(ctx, bulker, doc); err != nil {
zlog.Error().Err(err).Str("outputName", p.Name).Msg("error writing output health")
}
}

// replace type remote_elasticsearch with elasticsearch as agent doesn't recognize remote_elasticsearch
outputMap[p.Name][FieldOutputType] = OutputTypeElasticsearch
// remove the service token from the agent policy sent to the agent
delete(outputMap[p.Name], FieldOutputServiceToken)
return nil
} else if p.Type == OutputTypeRemoteElasticsearch {
doc := model.OutputHealth{
Output: p.Name,
State: client.UnitStateHealthy.String(),
Message: "",
}
if err := dl.CreateOutputHealth(ctx, bulker, doc); err != nil {
zlog.Error().Err(err).Msg("create output health")
}
}
if err != nil {
return fmt.Errorf("failed generate output API key: %w", err)
Expand Down Expand Up @@ -304,7 +324,7 @@ func (p *Output) prepareElasticsearch(
// Using painless script to append the old keys to the history
body, err := renderUpdatePainlessScript(p.Name, fields)
if err != nil {
return fmt.Errorf("could no tupdate painless script: %w", err)
return fmt.Errorf("could not update painless script: %w", err)
}

if err = bulker.Update(ctx, dl.FleetAgents, agent.Id, body, bulk.WithRefresh(), bulk.WithRetryOnConflict(3)); err != nil {
Expand Down
56 changes: 56 additions & 0 deletions internal/pkg/policy/policy_output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,17 @@ package policy

import (
"context"
"encoding/json"
"errors"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/fleet-server/v7/internal/pkg/bulk"
"github.com/elastic/fleet-server/v7/internal/pkg/dl"
"github.com/elastic/fleet-server/v7/internal/pkg/model"
ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing"
testlog "github.com/elastic/fleet-server/v7/internal/pkg/testing/log"
Expand Down Expand Up @@ -462,6 +466,14 @@ func TestPolicyRemoteESOutputPrepare(t *testing.T) {
mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(&apiKey, nil).Once()
bulker.On("CreateAndGetBulker", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(outputBulker, false).Once()
bulker.On("Create", mock.Anything, dl.FleetOutputHealth, mock.Anything, mock.MatchedBy(func(body []byte) bool {
var doc model.OutputHealth
err := json.Unmarshal(body, &doc)
if err != nil {
t.Fatal(err)
}
return doc.Message == "" && doc.State == client.UnitStateHealthy.String()
}), mock.Anything).Return("", nil)

output := Output{
Type: OutputTypeRemoteElasticsearch,
Expand Down Expand Up @@ -500,4 +512,48 @@ func TestPolicyRemoteESOutputPrepare(t *testing.T) {

bulker.AssertExpectations(t)
})

t.Run("Report degraded output health on API key create failure", func(t *testing.T) {
logger := testlog.SetLogger(t)
bulker := ftesting.NewMockBulk()
var apiKey *bulk.APIKey = nil
var err error = errors.New("error connecting")

outputBulker := ftesting.NewMockBulk()
outputBulker.On("APIKeyCreate",
mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(apiKey, err).Once()
bulker.On("CreateAndGetBulker", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(outputBulker, false).Once()
bulker.On("Create", mock.Anything, dl.FleetOutputHealth, mock.Anything, mock.MatchedBy(func(body []byte) bool {
var doc model.OutputHealth
err := json.Unmarshal(body, &doc)
if err != nil {
t.Fatal(err)
}
return doc.Message == "remote ES could not create API key due to error: error connecting" && doc.State == client.UnitStateDegraded.String()
}), mock.Anything).Return("", nil)

output := Output{
Type: OutputTypeRemoteElasticsearch,
Name: "test output",
Role: &RoleT{
Sha2: "new-hash",
Raw: TestPayload,
},
}

policyMap := map[string]map[string]interface{}{
"test output": map[string]interface{}{
"hosts": []interface{}{"http://localhost"},
"service_token": "serviceToken1",
"type": OutputTypeRemoteElasticsearch,
},
}
testAgent := &model.Agent{Outputs: map[string]*model.PolicyOutput{}}

err = output.Prepare(context.Background(), logger, bulker, testAgent, policyMap)
require.NoError(t, err, "expected prepare to pass")

bulker.AssertExpectations(t)
})
}
38 changes: 30 additions & 8 deletions internal/pkg/policy/self.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,7 @@ LOOP:
return err
}
cT.Reset(m.checkTime)
if state == client.UnitStateHealthy {
// running; can stop
break LOOP
}
m.log.Trace().Msg(state.String())
case hits := <-s.Output():
policies := make([]model.Policy, len(hits))
for i, hit := range hits {
Expand All @@ -125,10 +122,7 @@ LOOP:
if err != nil {
return err
}
if state == client.UnitStateHealthy {
// running; can stop
break LOOP
}
m.log.Trace().Msg(state.String())
}
}

Expand Down Expand Up @@ -218,6 +212,8 @@ func (m *selfMonitorT) updateState(ctx context.Context) (client.UnitState, error
return client.UnitStateStarting, nil
}

reportOutputHealth(ctx, m.bulker, m.log)
Copy link
Contributor Author

@juliaElastic juliaElastic Nov 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently pinging remote outputs every 5s (default monitor interval) and writing out a doc to the output health data stream.
We could change this to only write out a doc if the state changed.


state := client.UnitStateHealthy
extendMsg := ""
var payload map[string]interface{}
Expand Down Expand Up @@ -253,6 +249,32 @@ func (m *selfMonitorT) updateState(ctx context.Context) (client.UnitState, error
return state, nil
}

func reportOutputHealth(ctx context.Context, bulker bulk.Bulk, logger zerolog.Logger) {
//pinging logic
bulkerMap := bulker.GetBulkerMap()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as mentioned on the previous pr, the regular health reporting will stop if fleet-server is restarted, and doesn't restart until an agent tries to create an API key again (e.g. due to change in output config), because the bulkerMap is stored in memory and output bulkers are created when there is a config change or a new output used for the first time by an agent.

for outputName, outputBulker := range bulkerMap {
doc := model.OutputHealth{
Output: outputName,
State: client.UnitStateHealthy.String(),
Message: "",
}
res, err := outputBulker.Client().Ping(outputBulker.Client().Ping.WithContext(ctx))
if err != nil {
doc.State = client.UnitStateDegraded.String()
doc.Message = fmt.Sprintf("remote ES is not reachable due to error: %s", err.Error())
logger.Error().Err(err).Str("outputName", outputName).Msg(doc.Message)

} else if res.StatusCode != 200 {
doc.State = client.UnitStateDegraded.String()
doc.Message = fmt.Sprintf("remote ES is not reachable due to unexpected status code %d", res.StatusCode)
logger.Error().Err(err).Str("outputName", outputName).Msg(doc.Message)
}
if err := dl.CreateOutputHealth(ctx, bulker, doc); err != nil {
logger.Error().Err(err).Str("outputName", outputName).Msg("error writing output health")
}
}
}

func HasFleetServerInput(inputs []map[string]interface{}) bool {
for _, input := range inputs {
attr, ok := input["type"].(string)
Expand Down
Loading
Loading