Skip to content

Commit

Permalink
[filebeat][gcs] - Added input metrics (#41505)
Browse files Browse the repository at this point in the history
  • Loading branch information
ShourieG authored Nov 18, 2024
1 parent c266c9f commit 821697d
Show file tree
Hide file tree
Showing 11 changed files with 301 additions and 32 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Add support to include AWS cloudwatch linked accounts when using log_group_name_prefix to define log group names. {pull}41206[41206]
- Improved Azure Blob Storage input documentation. {pull}41252[41252]
- Make ETW input GA. {pull}41389[41389]
- Added input metrics to GCS input. {issue}36640[36640] {pull}41505[41505]
- Add support for Okta entity analytics provider to collect role and factor data for users. {pull}41460[41460]
- Add ability to remove request trace logs from http_endpoint input. {pull}40005[40005]
- Add ability to remove request trace logs from entityanalytics input. {pull}40004[40004]
Expand Down
33 changes: 33 additions & 0 deletions x-pack/filebeat/docs/inputs/input-gcs.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -462,5 +462,38 @@ filebeat.inputs:
In this configuration even though we have specified `max_workers = 10`, `poll = true` and `poll_interval = 15s` at the root level, both the buckets
will override these values with their own respective values which are defined as part of their sub attibutes.

[float]
=== Metrics

This input exposes metrics under the <<http-endpoint, HTTP monitoring endpoint>>.
These metrics are exposed under the `/inputs` path. They can be used to
observe the activity of the input.

[options="header"]
|=======
| Metric | Description
| `url` | URL of the input resource.
| `errors_total` | Total number of errors encountered by the input.
| `decode_errors_total` | Total number of decode errors encountered by the input.
| `gcs_objects_requested_total` | Total number of GCS objects downloaded.
| `gcs_objects_published_total` | Total number of GCS objects processed that were published.
| `gcs_objects_listed_total` | Total number of GCS objects returned by list operations.
| `gcs_bytes_processed_total` | Total number of GCS bytes processed.
| `gcs_events_created_total` | Total number of events created from processing GCS data.
| `gcs_failed_jobs_total` | Total number of failed jobs.
| `gcs_expired_failed_jobs_total` | Total number of expired failed jobs that could not be recovered.
| `gcs_objects_tracked_gauge` | Number of objects currently tracked in the state registry (gauge).
| `gcs_objects_inflight_gauge` | Number of GCS objects inflight (gauge).
| `gcs_jobs_scheduled_after_validation` | Histogram of the number of jobs scheduled after validation.
| `gcs_object_processing_time` | Histogram of the elapsed GCS object processing times in nanoseconds (start of download to completion of parsing).
| `gcs_object_size_in_bytes` | Histogram of processed GCS object size in bytes.
| `gcs_events_per_object` | Histogram of event count per GCS object.
| `source_lag_time` | Histogram of the time between the source (Updated) timestamp and the time the object was read, in nanoseconds.
|=======

==== Common input options

[id="{beatname_lc}-input-{type}-common-options"]
include::../../../../filebeat/docs/inputs/input-common-options.asciidoc[]

NOTE: Any feedback is welcome which will help us further optimize this input. Please feel free to open a github issue for any bugs or feature requests.
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/gcs/decoding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestDecoding(t *testing.T) {
}
defer f.Close()
p := &pub{t: t}
j := newJob(&storage.BucketHandle{}, &storage.ObjectAttrs{Name: "test_object"}, "gs://test_uri", newState(), &Source{}, p, log, false)
j := newJob(&storage.BucketHandle{}, &storage.ObjectAttrs{Name: "test_object"}, "gs://test_uri", newState(), &Source{}, p, nil, log, false)
j.src.ReaderConfig.Decoding = tc.config
err = j.decode(context.Background(), f, "test")
if err != nil {
Expand Down
9 changes: 8 additions & 1 deletion x-pack/filebeat/input/gcs/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,15 @@ func (input *gcsInput) Run(inputCtx v2.Context, src cursor.Source,

log := inputCtx.Logger.With("project_id", currentSource.ProjectId).With("bucket", currentSource.BucketName)
log.Infof("Running google cloud storage for project: %s", input.config.ProjectId)
// create a new inputMetrics instance
metrics := newInputMetrics(inputCtx.ID+":"+currentSource.BucketName, nil)
metrics.url.Set("gs://" + currentSource.BucketName)
defer metrics.Close()

var cp *Checkpoint
if !cursor.IsNew() {
if err := cursor.Unpack(&cp); err != nil {
metrics.errorsTotal.Inc()
return err
}

Expand All @@ -169,6 +175,7 @@ func (input *gcsInput) Run(inputCtx v2.Context, src cursor.Source,

client, err := fetchStorageClient(ctx, input.config, log)
if err != nil {
metrics.errorsTotal.Inc()
return err
}
bucket := client.Bucket(currentSource.BucketName).Retryer(
Expand All @@ -180,7 +187,7 @@ func (input *gcsInput) Run(inputCtx v2.Context, src cursor.Source,
// Since we are only reading, the operation is always idempotent
storage.WithPolicy(storage.RetryAlways),
)
scheduler := newScheduler(publisher, bucket, currentSource, &input.config, st, log)
scheduler := newScheduler(publisher, bucket, currentSource, &input.config, st, metrics, log)

return scheduler.schedule(ctx)
}
6 changes: 5 additions & 1 deletion x-pack/filebeat/input/gcs/input_stateless.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func (in *statelessInput) Run(inputCtx v2.Context, publisher stateless.Publisher
pub := statelessPublisher{wrapped: publisher}
var source cursor.Source
var g errgroup.Group

for _, b := range in.config.Buckets {
bucket := tryOverrideOrDefault(in.config, b)
source = &Source{
Expand All @@ -68,6 +69,9 @@ func (in *statelessInput) Run(inputCtx v2.Context, publisher stateless.Publisher
st := newState()
currentSource := source.(*Source)
log := inputCtx.Logger.With("project_id", currentSource.ProjectId).With("bucket", currentSource.BucketName)
metrics := newInputMetrics(inputCtx.ID+":"+currentSource.BucketName, nil)
defer metrics.Close()
metrics.url.Set("gs://" + currentSource.BucketName)

ctx, cancel := context.WithCancel(context.Background())
go func() {
Expand All @@ -85,7 +89,7 @@ func (in *statelessInput) Run(inputCtx v2.Context, publisher stateless.Publisher
storage.WithPolicy(storage.RetryAlways),
)

scheduler := newScheduler(pub, bkt, currentSource, &in.config, st, log)
scheduler := newScheduler(pub, bkt, currentSource, &in.config, st, metrics, log)
// allows multiple containers to be scheduled concurrently while testing
// the stateless input is triggered only while testing and till now it did not mimic
// the real world concurrent execution of multiple containers. This fix allows it to do so.
Expand Down
20 changes: 17 additions & 3 deletions x-pack/filebeat/input/gcs/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ package gcs

import (
"context"
"crypto/rand"
"crypto/tls"
"encoding/hex"
"errors"
"fmt"
"net/http"
Expand Down Expand Up @@ -547,7 +549,7 @@ func Test_StorageClient(t *testing.T) {
chanClient := beattest.NewChanClient(len(tt.expected))
t.Cleanup(func() { _ = chanClient.Close() })

ctx, cancel := newV2Context()
ctx, cancel := newV2Context(t)
t.Cleanup(cancel)

var g errgroup.Group
Expand Down Expand Up @@ -607,11 +609,23 @@ func Test_StorageClient(t *testing.T) {
}
}

func newV2Context() (v2.Context, func()) {
func newV2Context(t *testing.T) (v2.Context, func()) {
ctx, cancel := context.WithCancel(context.Background())
id, err := generateRandomID(8)
if err != nil {
t.Fatalf("failed to generate random id: %v", err)
}
return v2.Context{
Logger: logp.NewLogger("gcs_test"),
ID: "test_id",
ID: "gcs_test-" + id,
Cancelation: ctx,
}, cancel
}

func generateRandomID(length int) (string, error) {
bytes := make([]byte, length)
if _, err := rand.Read(bytes); err != nil {
return "", err
}
return hex.EncodeToString(bytes), nil
}
81 changes: 64 additions & 17 deletions x-pack/filebeat/input/gcs/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ type job struct {
src *Source
// publisher is used to publish a beat event to the output stream
publisher cursor.Publisher
// metrics used to track the errors and success of jobs
metrics *inputMetrics
// custom logger
log *logp.Logger
// flag used to denote if this object has previously failed without being processed at all.
Expand All @@ -53,8 +55,12 @@ type job struct {

// newJob, returns an instance of a job, which is a unit of work that can be assigned to a go routine
func newJob(bucket *storage.BucketHandle, object *storage.ObjectAttrs, objectURI string,
state *state, src *Source, publisher cursor.Publisher, log *logp.Logger, isFailed bool,
state *state, src *Source, publisher cursor.Publisher, metrics *inputMetrics, log *logp.Logger, isFailed bool,
) *job {
if metrics == nil {
// metrics are optional, initialize a stub if not provided
metrics = newInputMetrics("", nil)
}
return &job{
bucket: bucket,
object: object,
Expand All @@ -63,6 +69,7 @@ func newJob(bucket *storage.BucketHandle, object *storage.ObjectAttrs, objectURI
state: state,
src: src,
publisher: publisher,
metrics: metrics,
log: log,
isFailed: isFailed,
}
Expand All @@ -78,17 +85,33 @@ func gcsObjectHash(src *Source, object *storage.ObjectAttrs) string {

func (j *job) do(ctx context.Context, id string) {
var fields mapstr.M
// metrics & logging
j.log.Debug("begin gcs object processing.")
j.metrics.gcsObjectsRequestedTotal.Inc()
j.metrics.gcsObjectsInflight.Inc()
start := time.Now()
defer func() {
elapsed := time.Since(start)
j.metrics.gcsObjectsInflight.Dec()
j.metrics.gcsObjectProcessingTime.Update(elapsed.Nanoseconds())
j.log.Debugw("end gcs object processing.", "elapsed_time_ns", elapsed)
}()

if allowedContentTypes[j.object.ContentType] {
if j.object.ContentType == gzType || j.object.ContentEncoding == encodingGzip {
j.isCompressed = true
}
err := j.processAndPublishData(ctx, id)
if err != nil {
j.state.updateFailedJobs(j.object.Name)
j.state.updateFailedJobs(j.object.Name, j.metrics)
j.log.Errorw("job encountered an error while publishing data and has been added to a failed jobs list", "gcs.jobId", id, "error", err)
j.metrics.gcsFailedJobsTotal.Inc()
j.metrics.errorsTotal.Inc()
return
}
j.metrics.gcsObjectsPublishedTotal.Inc()
//nolint:gosec // object size cannot be negative hence this conversion is safe
j.metrics.gcsBytesProcessedTotal.Add(uint64(j.object.Size))

} else {
err := fmt.Errorf("job with jobId %s encountered an error: content-type %s not supported", id, j.object.ContentType)
Expand All @@ -101,9 +124,10 @@ func (j *job) do(ctx context.Context, id string) {
}
event.SetID(objectID(j.hash, 0))
// locks while data is being saved and published to avoid concurrent map read/writes
cp, done := j.state.saveForTx(j.object.Name, j.object.Updated)
cp, done := j.state.saveForTx(j.object.Name, j.object.Updated, j.metrics)
if err := j.publisher.Publish(event, cp); err != nil {
j.log.Errorw("job encountered an error while publishing event", "gcs.jobId", id, "error", err)
j.metrics.errorsTotal.Inc()
}
// unlocks after data is saved and published
done()
Expand Down Expand Up @@ -133,11 +157,21 @@ func (j *job) processAndPublishData(ctx context.Context, id string) error {
defer func() {
err = reader.Close()
if err != nil {
j.metrics.errorsTotal.Inc()
j.log.Errorw("failed to close reader for object", "objectName", j.object.Name, "error", err)
}
}()

return j.decode(ctx, reader, id)
// update the source lag time metric
j.metrics.sourceLagTime.Update(time.Since(j.object.Updated).Nanoseconds())

// calculate number of decode errors
if err := j.decode(ctx, reader, id); err != nil {
j.metrics.decodeErrorsTotal.Inc()
return fmt.Errorf("failed to decode object: %s, with error: %w", j.object.Name, err)
}

return nil
}

func (j *job) decode(ctx context.Context, r io.Reader, id string) error {
Expand Down Expand Up @@ -241,17 +275,24 @@ func (j *job) readJsonAndPublish(ctx context.Context, r io.Reader, id string) er

// if expand_event_list_from_field is set, then split the event list
if j.src.ExpandEventListFromField != "" {
if err := j.splitEventList(j.src.ExpandEventListFromField, item, offset, j.hash, id); err != nil {
if numEvents, err := j.splitEventList(j.src.ExpandEventListFromField, item, offset, id); err != nil {
return err
} else {
j.metrics.gcsEventsPerObject.Update(int64(numEvents))
}
continue
} else {
j.metrics.gcsEventsPerObject.Update(1)
}

var parsedData []mapstr.M
if j.src.ParseJSON {
parsedData, err = decodeJSON(bytes.NewReader(item))
if err != nil {
j.log.Errorw("job encountered an error", "gcs.jobId", id, "error", err)
// since we do not want to stop processing the job here as this is purely cosmetic and optional, we log the error and continue
j.metrics.errorsTotal.Inc()
j.metrics.decodeErrorsTotal.Inc()
j.log.Errorw("job encountered an error during 'ParseJSON' op", "gcs.jobId", id, "error", err)
}
}
evt := j.createEvent(item, parsedData, offset)
Expand All @@ -263,29 +304,32 @@ func (j *job) readJsonAndPublish(ctx context.Context, r io.Reader, id string) er
func (j *job) publish(evt beat.Event, last bool, id string) {
if last {
// if this is the last object, then perform a complete state save
cp, done := j.state.saveForTx(j.object.Name, j.object.Updated)
cp, done := j.state.saveForTx(j.object.Name, j.object.Updated, j.metrics)
if err := j.publisher.Publish(evt, cp); err != nil {
j.metrics.errorsTotal.Inc()
j.log.Errorw("job encountered an error while publishing event", "gcs.jobId", id, "error", err)
}
done()
return
}
// since we don't update the cursor checkpoint, lack of a lock here is not a problem
if err := j.publisher.Publish(evt, nil); err != nil {
j.metrics.errorsTotal.Inc()
j.log.Errorw("job encountered an error while publishing event", "gcs.jobId", id, "error", err)
}
}

// splitEventList splits the event list into individual events and publishes them
func (j *job) splitEventList(key string, raw json.RawMessage, offset int64, objHash string, id string) error {
func (j *job) splitEventList(key string, raw json.RawMessage, offset int64, id string) (int, error) {
var jsonObject map[string]json.RawMessage
var eventsPerObject int
if err := json.Unmarshal(raw, &jsonObject); err != nil {
return fmt.Errorf("job with job id %s encountered an unmarshaling error: %w", id, err)
return eventsPerObject, fmt.Errorf("job with job id %s encountered an unmarshaling error: %w", id, err)
}

raw, found := jsonObject[key]
if !found {
return fmt.Errorf("expand_event_list_from_field key <%v> is not in event", key)
return eventsPerObject, fmt.Errorf("expand_event_list_from_field key <%v> is not in event", key)
}

dec := json.NewDecoder(bytes.NewReader(raw))
Expand All @@ -294,43 +338,46 @@ func (j *job) splitEventList(key string, raw json.RawMessage, offset int64, objH

tok, err := dec.Token()
if err != nil {
return fmt.Errorf("failed to read JSON token for object: %s, with error: %w", j.object.Name, err)
return eventsPerObject, fmt.Errorf("failed to read JSON token for object: %s, with error: %w", j.object.Name, err)
}
delim, ok := tok.(json.Delim)
if !ok || delim != '[' {
return fmt.Errorf("expand_event_list_from_field <%v> is not an array", key)
return eventsPerObject, fmt.Errorf("expand_event_list_from_field <%v> is not an array", key)
}

for dec.More() {
arrayOffset := dec.InputOffset()

var item json.RawMessage
if err := dec.Decode(&item); err != nil {
return fmt.Errorf("failed to decode array item at offset %d: %w", offset+arrayOffset, err)
return eventsPerObject, fmt.Errorf("failed to decode array item at offset %d: %w", offset+arrayOffset, err)
}

data, err := item.MarshalJSON()
if err != nil {
return fmt.Errorf("job with job id %s encountered a marshaling error: %w", id, err)
return eventsPerObject, fmt.Errorf("job with job id %s encountered a marshaling error: %w", id, err)
}
evt := j.createEvent(data, nil, offset+arrayOffset)

if !dec.More() {
// if this is the last object, then perform a complete state save
cp, done := j.state.saveForTx(j.object.Name, j.object.Updated)
cp, done := j.state.saveForTx(j.object.Name, j.object.Updated, j.metrics)
if err := j.publisher.Publish(evt, cp); err != nil {
j.metrics.errorsTotal.Inc()
j.log.Errorw("job encountered an error while publishing event", "gcs.jobId", id, "error", err)
}
done()
} else {
// since we don't update the cursor checkpoint, lack of a lock here is not a problem
if err := j.publisher.Publish(evt, nil); err != nil {
j.metrics.errorsTotal.Inc()
j.log.Errorw("job encountered an error while publishing event", "gcs.jobId", id, "error", err)
}
}
eventsPerObject++
}

return nil
return eventsPerObject, nil
}

// addGzipDecoderIfNeeded determines whether the given stream of bytes (encapsulated in a buffered reader)
Expand Down Expand Up @@ -426,7 +473,7 @@ func (j *job) createEvent(message []byte, data []mapstr.M, offset int64) beat.Ev
},
}
event.SetID(objectID(j.hash, offset))

j.metrics.gcsEventsCreatedTotal.Inc()
return event
}

Expand Down
Loading

0 comments on commit 821697d

Please sign in to comment.