Skip to content

Commit

Permalink
x-pack/filebeat/input/cel: add http metrics collection (elastic#39503)
Browse files Browse the repository at this point in the history
  • Loading branch information
efd6 authored May 15, 2024
1 parent 23d1066 commit 3b4bb54
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 14 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Ensure all responses sent by HTTP Endpoint are HTML-escaped. {pull}39329[39329]
- Update CEL mito extensions to v1.11.0 to improve type checking. {pull}39460[39460]
- Improve logging of request and response with request trace logging in error conditions. {pull}39455[39455]
- Add HTTP metrics to CEL input. {issue}39501[39501] {pull}39503[39503]

*Auditbeat*

Expand Down
39 changes: 30 additions & 9 deletions x-pack/filebeat/docs/inputs/input-cel.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -765,15 +765,36 @@ observe the activity of the input.

[options="header"]
|=======
| Metric | Description
| `resource` | URL or path of the input resource.
| `cel_executions` | Number times the CEL program has been executed.
| `batches_received_total` | Number of event arrays received.
| `events_received_total` | Number of events received.
| `batches_published_total` | Number of event arrays published.
| `events_published_total` | Number of events published.
| `cel_processing_time` | Histogram of the elapsed successful CEL program processing times in nanoseconds.
| `batch_processing_time` | Histogram of the elapsed successful batch processing times in nanoseconds (time of receipt to time of ACK for non-empty batches).
| Metric | Description
| `resource` | URL or path of the input resource.
| `cel_executions` | Number times the CEL program has been executed.
| `batches_received_total` | Number of event arrays received.
| `events_received_total` | Number of events received.
| `batches_published_total` | Number of event arrays published.
| `events_published_total` | Number of events published.
| `cel_processing_time` | Histogram of the elapsed successful CEL program processing times in nanoseconds.
| `batch_processing_time` | Histogram of the elapsed successful batch processing times in nanoseconds (time of receipt to time of ACK for non-empty batches).
| `http_request_total` | Total number of processed requests.
| `http_request_errors_total` | Total number of request errors.
| `http_request_delete_total` | Total number of `DELETE` requests.
| `http_request_get_total` | Total number of `GET` requests.
| `http_request_head_total` | Total number of `HEAD` requests.
| `http_request_options_total` | Total number of `OPTIONS` requests.
| `http_request_patch_total` | Total number of `PATCH` requests.
| `http_request_post_total` | Total number of `POST` requests.
| `http_request_put_total` | Total number of `PUT` requests.
| `http_request_body_bytes_total` | Total of the requests body size.
| `http_request_body_bytes` | Histogram of the requests body size.
| `http_response_total` | Total number of responses received.
| `http_response_errors_total` | Total number of response errors.
| `http_response_1xx_total` | Total number of `1xx` responses.
| `http_response_2xx_total` | Total number of `2xx` responses.
| `http_response_3xx_total` | Total number of `3xx` responses.
| `http_response_4xx_total` | Total number of `4xx` responses.
| `http_response_5xx_total` | Total number of `5xx` responses.
| `http_response_body_bytes_total` | Total of the responses body size.
| `http_response_body_bytes` | Histogram of the responses body size.
| `http_round_trip_time` | Histogram of the round trip time.
|=======

==== Developer tools
Expand Down
15 changes: 10 additions & 5 deletions x-pack/filebeat/input/cel/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/elastic/beats/v7/libbeat/monitoring/inputmon"
"github.com/elastic/beats/v7/libbeat/version"
"github.com/elastic/beats/v7/x-pack/filebeat/input/internal/httplog"
"github.com/elastic/beats/v7/x-pack/filebeat/input/internal/httpmon"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/monitoring"
Expand Down Expand Up @@ -122,7 +123,7 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p
cfg := src.cfg
log := env.Logger.With("input_url", cfg.Resource.URL)

metrics := newInputMetrics(env.ID)
metrics, reg := newInputMetrics(env.ID)
defer metrics.Close()

ctx := ctxtool.FromCanceller(env.Cancelation)
Expand All @@ -132,7 +133,7 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p
cfg.Resource.Tracer.Filename = strings.ReplaceAll(cfg.Resource.Tracer.Filename, "*", id)
}

client, trace, err := newClient(ctx, cfg, log)
client, trace, err := newClient(ctx, cfg, log, reg)
if err != nil {
return err
}
Expand Down Expand Up @@ -686,7 +687,7 @@ func getLimit(which string, rateLimit map[string]interface{}, log *logp.Logger)
return limit, true
}

func newClient(ctx context.Context, cfg config, log *logp.Logger) (*http.Client, *httplog.LoggingRoundTripper, error) {
func newClient(ctx context.Context, cfg config, log *logp.Logger, reg *monitoring.Registry) (*http.Client, *httplog.LoggingRoundTripper, error) {
if !wantClient(cfg) {
return nil, nil, nil
}
Expand Down Expand Up @@ -729,6 +730,10 @@ func newClient(ctx context.Context, cfg config, log *logp.Logger) (*http.Client,
c.Transport = trace
}

if reg != nil {
c.Transport = httpmon.NewMetricsRoundTripper(c.Transport, reg)
}

c.CheckRedirect = checkRedirect(cfg.Resource, log)

if cfg.Resource.Retry.getMaxAttempts() > 1 {
Expand Down Expand Up @@ -1070,7 +1075,7 @@ type inputMetrics struct {
batchProcessingTime metrics.Sample // histogram of the elapsed successful batch processing times in nanoseconds (time of receipt to time of ACK for non-empty batches).
}

func newInputMetrics(id string) *inputMetrics {
func newInputMetrics(id string) (*inputMetrics, *monitoring.Registry) {
reg, unreg := inputmon.NewInputRegistry(inputName, id, nil)
out := &inputMetrics{
unregister: unreg,
Expand All @@ -1088,7 +1093,7 @@ func newInputMetrics(id string) *inputMetrics {
_ = adapter.NewGoMetrics(reg, "batch_processing_time", adapter.Accept).
Register("histogram", metrics.NewHistogram(out.batchProcessingTime))

return out
return out, reg
}

func (m *inputMetrics) Close() {
Expand Down

0 comments on commit 3b4bb54

Please sign in to comment.