-
Notifications
You must be signed in to change notification settings - Fork 82
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
report output health #3127
report output health #3127
Changes from 3 commits
fb6a3b4
928aac5
e9c22f9
8a672a9
7940d1a
eea71f3
41b70bc
5082970
0b45780
3d1888a
588c9d1
9644f30
fae7af6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
// or more contributor license agreements. Licensed under the Elastic License; | ||
// you may not use this file except in compliance with the Elastic License. | ||
|
||
package dl | ||
|
||
import ( | ||
"context" | ||
"encoding/json" | ||
"time" | ||
|
||
"github.com/elastic/fleet-server/v7/internal/pkg/bulk" | ||
"github.com/gofrs/uuid" | ||
) | ||
|
||
type DataStream struct { | ||
Dataset string `json:"dataset,omitempty"` | ||
Type string `json:"type,omitempty"` | ||
Namespace string `json:"namespace,omitempty"` | ||
} | ||
|
||
type OutputHealth struct { | ||
Output string `json:"output,omitempty"` | ||
State string `json:"state,omitempty"` | ||
Message string `json:"message,omitempty"` | ||
Timestamp string `json:"@timestamp,omitempty"` | ||
DataStream DataStream `json:"data_stream,omitempty"` | ||
} | ||
|
||
func CreateOutputHealth(ctx context.Context, bulker bulk.Bulk, doc OutputHealth) error { | ||
return createOutputHealth(ctx, bulker, FleetOutputHealth, doc) | ||
} | ||
|
||
func createOutputHealth(ctx context.Context, bulker bulk.Bulk, index string, doc OutputHealth) error { | ||
if doc.Timestamp == "" { | ||
doc.Timestamp = time.Now().UTC().Format(time.RFC3339) | ||
} | ||
doc.DataStream = DataStream{ | ||
Dataset: "fleet_server.output_health", | ||
Type: "logs", | ||
Namespace: "default", | ||
Comment on lines
+26
to
+28
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should these be constants? Can There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it will be always default. |
||
} | ||
body, err := json.Marshal(doc) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
id, err := uuid.NewV4() | ||
if err != nil { | ||
return err | ||
} | ||
_, err = bulker.Create(ctx, index, id.String(), body, bulk.WithRefresh()) | ||
return err | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -218,6 +218,8 @@ func (m *selfMonitorT) updateState(ctx context.Context) (client.UnitState, error | |
return client.UnitStateStarting, nil | ||
} | ||
|
||
reportOutputHealth(ctx, m.bulker, m.log) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Currently pinging remote outputs every 5s (default monitor interval) and writing out a doc to the output health data stream. |
||
|
||
state := client.UnitStateHealthy | ||
extendMsg := "" | ||
var payload map[string]interface{} | ||
|
@@ -253,6 +255,32 @@ func (m *selfMonitorT) updateState(ctx context.Context) (client.UnitState, error | |
return state, nil | ||
} | ||
|
||
func reportOutputHealth(ctx context.Context, bulker bulk.Bulk, logger zerolog.Logger) { | ||
//pinging logic | ||
bulkerMap := bulker.GetBulkerMap() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. as mentioned on the previous pr, the regular health reporting will stop if fleet-server is restarted, and doesn't restart until an agent tries to create an API key again (e.g. due to change in output config), because the |
||
for outputName, outputBulker := range bulkerMap { | ||
doc := dl.OutputHealth{ | ||
Output: outputName, | ||
State: client.UnitStateHealthy.String(), | ||
Message: "", | ||
} | ||
res, err := outputBulker.Client().Ping(outputBulker.Client().Ping.WithContext(ctx)) | ||
if err != nil { | ||
doc.State = client.UnitStateDegraded.String() | ||
doc.Message = fmt.Sprintf("remote ES is not reachable due to error: %s", err.Error()) | ||
logger.Error().Err(err).Msg(doc.Message) | ||
|
||
} else if res.StatusCode != 200 { | ||
doc.State = client.UnitStateDegraded.String() | ||
doc.Message = fmt.Sprintf("remote ES is not reachable due to unexpected status code %d", res.StatusCode) | ||
logger.Error().Err(err).Msg(doc.Message) | ||
} | ||
if err := dl.CreateOutputHealth(ctx, bulker, doc); err != nil { | ||
logger.Error().Err(err).Msg("error writing output health") | ||
} | ||
} | ||
} | ||
|
||
func HasFleetServerInput(inputs []map[string]interface{}) bool { | ||
for _, input := range inputs { | ||
attr, ok := input["type"].(string) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is something that is being written to ES, does it make more sense to define in in model/schema.json instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved to schema.json