Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

report output health #3127

Merged
merged 13 commits into from
Dec 6, 2023
1 change: 1 addition & 0 deletions internal/pkg/dl/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const (
FleetPolicies = ".fleet-policies"
FleetPoliciesLeader = ".fleet-policies-leader"
FleetServers = ".fleet-servers"
FleetOutputHealth = "logs-fleet_server.output_health-default"
)

// Query fields
Expand Down
54 changes: 54 additions & 0 deletions internal/pkg/dl/output_health.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package dl

import (
"context"
"encoding/json"
"time"

"github.com/elastic/fleet-server/v7/internal/pkg/bulk"
"github.com/gofrs/uuid"
)

type DataStream struct {
Dataset string `json:"dataset,omitempty"`
Type string `json:"type,omitempty"`
Namespace string `json:"namespace,omitempty"`
}

type OutputHealth struct {
Output string `json:"output,omitempty"`
State string `json:"state,omitempty"`
Message string `json:"message,omitempty"`
Timestamp string `json:"@timestamp,omitempty"`
DataStream DataStream `json:"data_stream,omitempty"`
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is something that is being written to ES, does it make more sense to define in in model/schema.json instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved to schema.json


func CreateOutputHealth(ctx context.Context, bulker bulk.Bulk, doc OutputHealth) error {
return createOutputHealth(ctx, bulker, FleetOutputHealth, doc)
}

func createOutputHealth(ctx context.Context, bulker bulk.Bulk, index string, doc OutputHealth) error {
if doc.Timestamp == "" {
doc.Timestamp = time.Now().UTC().Format(time.RFC3339)
}
doc.DataStream = DataStream{
Dataset: "fleet_server.output_health",
Type: "logs",
Namespace: "default",
Comment on lines +26 to +28
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these be constants? Can Namespace ever be something else?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it will be always default.

}
body, err := json.Marshal(doc)
if err != nil {
return err
}

id, err := uuid.NewV4()
if err != nil {
return err
}
_, err = bulker.Create(ctx, index, id.String(), body, bulk.WithRefresh())
return err
}
26 changes: 23 additions & 3 deletions internal/pkg/policy/policy_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/rs/zerolog"
"go.elastic.co/apm/v2"

"github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/fleet-server/v7/internal/pkg/apikey"
"github.com/elastic/fleet-server/v7/internal/pkg/bulk"
"github.com/elastic/fleet-server/v7/internal/pkg/dl"
Expand Down Expand Up @@ -260,17 +261,36 @@ func (p *Output) prepareElasticsearch(
ctx := zlog.WithContext(ctx)
outputAPIKey, err :=
generateOutputAPIKey(ctx, outputBulker, agent.Id, p.Name, p.Role.Raw)
// reporting output error status to self monitor and not returning the error to keep fleet-server running

// reporting output health and not returning the error to keep fleet-server running
if outputAPIKey == nil && p.Type == OutputTypeRemoteElasticsearch {
if err != nil {
zerolog.Ctx(ctx).Warn().Err(err).Msg("Could not create API key in remote ES")
doc := dl.OutputHealth{
Output: p.Name,
State: client.UnitStateDegraded.String(),
Message: fmt.Sprintf("remote ES could not create API key due to error: %s", err.Error()),
juliaElastic marked this conversation as resolved.
Show resolved Hide resolved
}
zerolog.Ctx(ctx).Warn().Err(err).Msg(doc.Message)

if err := dl.CreateOutputHealth(ctx, bulker, doc); err != nil {
zlog.Error().Err(err).Msg("error writing output health")
}
}

// replace type remote_elasticsearch with elasticsearch as agent doesn't recognize remote_elasticsearch
outputMap[p.Name][FieldOutputType] = OutputTypeElasticsearch
// remove the service token from the agent policy sent to the agent
delete(outputMap[p.Name], FieldOutputServiceToken)
return nil
} else if p.Type == OutputTypeRemoteElasticsearch {
doc := dl.OutputHealth{
Output: p.Name,
State: client.UnitStateHealthy.String(),
Message: "",
}
if err := dl.CreateOutputHealth(ctx, bulker, doc); err != nil {
zlog.Error().Err(err).Msg("create output health")
}
}
if err != nil {
return fmt.Errorf("failed generate output API key: %w", err)
Expand Down Expand Up @@ -304,7 +324,7 @@ func (p *Output) prepareElasticsearch(
// Using painless script to append the old keys to the history
body, err := renderUpdatePainlessScript(p.Name, fields)
if err != nil {
return fmt.Errorf("could no tupdate painless script: %w", err)
return fmt.Errorf("could not update painless script: %w", err)
}

if err = bulker.Update(ctx, dl.FleetAgents, agent.Id, body, bulk.WithRefresh(), bulk.WithRetryOnConflict(3)); err != nil {
Expand Down
28 changes: 28 additions & 0 deletions internal/pkg/policy/self.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,8 @@ func (m *selfMonitorT) updateState(ctx context.Context) (client.UnitState, error
return client.UnitStateStarting, nil
}

reportOutputHealth(ctx, m.bulker, m.log)
Copy link
Contributor Author

@juliaElastic juliaElastic Nov 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently pinging remote outputs every 5s (default monitor interval) and writing out a doc to the output health data stream.
We could change this to only write out a doc if the state changed.


state := client.UnitStateHealthy
extendMsg := ""
var payload map[string]interface{}
Expand Down Expand Up @@ -253,6 +255,32 @@ func (m *selfMonitorT) updateState(ctx context.Context) (client.UnitState, error
return state, nil
}

func reportOutputHealth(ctx context.Context, bulker bulk.Bulk, logger zerolog.Logger) {
//pinging logic
bulkerMap := bulker.GetBulkerMap()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as mentioned on the previous pr, the regular health reporting will stop if fleet-server is restarted, and doesn't restart until an agent tries to create an API key again (e.g. due to change in output config), because the bulkerMap is stored in memory and output bulkers are created when there is a config change or a new output used for the first time by an agent.

for outputName, outputBulker := range bulkerMap {
doc := dl.OutputHealth{
Output: outputName,
State: client.UnitStateHealthy.String(),
Message: "",
}
res, err := outputBulker.Client().Ping(outputBulker.Client().Ping.WithContext(ctx))
if err != nil {
doc.State = client.UnitStateDegraded.String()
doc.Message = fmt.Sprintf("remote ES is not reachable due to error: %s", err.Error())
logger.Error().Err(err).Msg(doc.Message)

} else if res.StatusCode != 200 {
doc.State = client.UnitStateDegraded.String()
doc.Message = fmt.Sprintf("remote ES is not reachable due to unexpected status code %d", res.StatusCode)
logger.Error().Err(err).Msg(doc.Message)
}
if err := dl.CreateOutputHealth(ctx, bulker, doc); err != nil {
logger.Error().Err(err).Msg("error writing output health")
}
}
}

func HasFleetServerInput(inputs []map[string]interface{}) bool {
for _, input := range inputs {
attr, ok := input["type"].(string)
Expand Down
Loading