Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[filebeat][gcs] - Added input metrics #41505

Merged
merged 18 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Add support to include AWS cloudwatch linked accounts when using log_group_name_prefix to define log group names. {pull}41206[41206]
- Improved Azure Blob Storage input documentation. {pull}41252[41252]
- Make ETW input GA. {pull}41389[41389]
- Added input metrics to GCS input. {issue}36640[36640] {pull}41505[41505]
- Add support for Okta entity analytics provider to collect role and factor data for users. {pull}41460[41460]

*Auditbeat*
Expand Down
33 changes: 33 additions & 0 deletions x-pack/filebeat/docs/inputs/input-gcs.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -462,5 +462,38 @@ filebeat.inputs:
In this configuration even though we have specified `max_workers = 10`, `poll = true` and `poll_interval = 15s` at the root level, both the buckets
will override these values with their own respective values which are defined as part of their sub attibutes.

[float]
=== Metrics

This input exposes metrics under the <<http-endpoint, HTTP monitoring endpoint>>.
These metrics are exposed under the `/inputs` path. They can be used to
observe the activity of the input.

[options="header"]
|=======
| Metric | Description
| `url` | URL of the input resource.
| `errors_total` | Total number of errors encountered by the input.
| `decode_errors_total` | Total number of decode errors encountered by the input.
| `gcs_objects_requested_total` | Total number of GCS objects downloaded.
| `gcs_objects_published_total` | Total number of GCS objects processed that were published.
| `gcs_objects_listed_total` | Total number of GCS objects returned by list operations.
| `gcs_bytes_processed_total` | Total number of GCS bytes processed.
| `gcs_events_created_total` | Total number of events created from processing GCS data.
| `gcs_failed_jobs_total` | Total number of failed jobs.
| `gcs_expired_failed_jobs_total` | Total number of expired failed jobs that could not be recovered.
| `gcs_objects_tracked_gauge` | Number of objects currently tracked in the state registry (gauge).
| `gcs_objects_inflight_gauge` | Number of GCS objects inflight (gauge).
| `gcs_jobs_scheduled_after_validation` | Histogram of the number of jobs scheduled after validation.
| `gcs_object_processing_time` | Histogram of the elapsed GCS object processing times in nanoseconds (start of download to completion of parsing).
| `gcs_object_size_in_bytes` | Histogram of processed GCS object size in bytes.
| `gcs_events_per_object` | Histogram of event count per GCS object.
| `source_lag_time` | Histogram of the time between the source (Updated) timestamp and the time the object was read, in nanoseconds.
|=======

==== Common input options

[id="{beatname_lc}-input-{type}-common-options"]
include::../../../../filebeat/docs/inputs/input-common-options.asciidoc[]

NOTE: Any feedback is welcome which will help us further optimize this input. Please feel free to open a github issue for any bugs or feature requests.
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/gcs/decoding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestDecoding(t *testing.T) {
}
defer f.Close()
p := &pub{t: t}
j := newJob(&storage.BucketHandle{}, &storage.ObjectAttrs{Name: "test_object"}, "gs://test_uri", newState(), &Source{}, p, log, false)
j := newJob(&storage.BucketHandle{}, &storage.ObjectAttrs{Name: "test_object"}, "gs://test_uri", newState(), &Source{}, p, nil, log, false)
j.src.ReaderConfig.Decoding = tc.config
err = j.decode(context.Background(), f, "test")
if err != nil {
Expand Down
9 changes: 8 additions & 1 deletion x-pack/filebeat/input/gcs/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,19 @@
func (input *gcsInput) Run(inputCtx v2.Context, src cursor.Source,
cursor cursor.Cursor, publisher cursor.Publisher) error {
st := newState()
currentSource := src.(*Source)

Check failure on line 151 in x-pack/filebeat/input/gcs/input.go

View workflow job for this annotation

GitHub Actions / lint (linux)

Error return value is not checked (errcheck)

log := inputCtx.Logger.With("project_id", currentSource.ProjectId).With("bucket", currentSource.BucketName)
log.Infof("Running google cloud storage for project: %s", input.config.ProjectId)
// create a new inputMetrics instance
metrics := newInputMetrics(inputCtx.ID+":"+currentSource.BucketName, nil)
metrics.url.Set("gs://" + currentSource.BucketName)
defer metrics.Close()

var cp *Checkpoint
if !cursor.IsNew() {
if err := cursor.Unpack(&cp); err != nil {
metrics.errorsTotal.Inc()
return err
}

Expand All @@ -169,6 +175,7 @@

client, err := fetchStorageClient(ctx, input.config, log)
if err != nil {
metrics.errorsTotal.Inc()
return err
}
bucket := client.Bucket(currentSource.BucketName).Retryer(
Expand All @@ -180,7 +187,7 @@
// Since we are only reading, the operation is always idempotent
storage.WithPolicy(storage.RetryAlways),
)
scheduler := newScheduler(publisher, bucket, currentSource, &input.config, st, log)
scheduler := newScheduler(publisher, bucket, currentSource, &input.config, st, metrics, log)

return scheduler.schedule(ctx)
}
6 changes: 5 additions & 1 deletion x-pack/filebeat/input/gcs/input_stateless.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
pub := statelessPublisher{wrapped: publisher}
var source cursor.Source
var g errgroup.Group

for _, b := range in.config.Buckets {
bucket := tryOverrideOrDefault(in.config, b)
source = &Source{
Expand All @@ -66,8 +67,11 @@
}

st := newState()
currentSource := source.(*Source)

Check failure on line 70 in x-pack/filebeat/input/gcs/input_stateless.go

View workflow job for this annotation

GitHub Actions / lint (linux)

Error return value is not checked (errcheck)
log := inputCtx.Logger.With("project_id", currentSource.ProjectId).With("bucket", currentSource.BucketName)
metrics := newInputMetrics(inputCtx.ID+":"+currentSource.BucketName, nil)
defer metrics.Close()
metrics.url.Set("gs://" + currentSource.BucketName)

ctx, cancel := context.WithCancel(context.Background())
go func() {
Expand All @@ -85,7 +89,7 @@
storage.WithPolicy(storage.RetryAlways),
)

scheduler := newScheduler(pub, bkt, currentSource, &in.config, st, log)
scheduler := newScheduler(pub, bkt, currentSource, &in.config, st, metrics, log)
// allows multiple containers to be scheduled concurrently while testing
// the stateless input is triggered only while testing and till now it did not mimic
// the real world concurrent execution of multiple containers. This fix allows it to do so.
Expand Down
20 changes: 17 additions & 3 deletions x-pack/filebeat/input/gcs/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ package gcs

import (
"context"
"crypto/rand"
"crypto/tls"
"encoding/hex"
"errors"
"fmt"
"net/http"
Expand Down Expand Up @@ -547,7 +549,7 @@ func Test_StorageClient(t *testing.T) {
chanClient := beattest.NewChanClient(len(tt.expected))
t.Cleanup(func() { _ = chanClient.Close() })

ctx, cancel := newV2Context()
ctx, cancel := newV2Context(t)
t.Cleanup(cancel)

var g errgroup.Group
Expand Down Expand Up @@ -607,11 +609,23 @@ func Test_StorageClient(t *testing.T) {
}
}

func newV2Context() (v2.Context, func()) {
func newV2Context(t *testing.T) (v2.Context, func()) {
ctx, cancel := context.WithCancel(context.Background())
id, err := generateRandomID(8)
if err != nil {
t.Fatalf("failed to generate random id: %v", err)
}
return v2.Context{
Logger: logp.NewLogger("gcs_test"),
ID: "test_id",
ID: "gcs_test-" + id,
Cancelation: ctx,
}, cancel
}

func generateRandomID(length int) (string, error) {
bytes := make([]byte, length)
if _, err := rand.Read(bytes); err != nil {
return "", err
}
return hex.EncodeToString(bytes), nil
}
81 changes: 64 additions & 17 deletions x-pack/filebeat/input/gcs/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
src *Source
// publisher is used to publish a beat event to the output stream
publisher cursor.Publisher
// metrics used to track the errors and success of jobs
metrics *inputMetrics
// custom logger
log *logp.Logger
// flag used to denote if this object has previously failed without being processed at all.
Expand All @@ -53,8 +55,12 @@

// newJob, returns an instance of a job, which is a unit of work that can be assigned to a go routine
func newJob(bucket *storage.BucketHandle, object *storage.ObjectAttrs, objectURI string,
state *state, src *Source, publisher cursor.Publisher, log *logp.Logger, isFailed bool,
state *state, src *Source, publisher cursor.Publisher, metrics *inputMetrics, log *logp.Logger, isFailed bool,
) *job {
if metrics == nil {
// metrics are optional, initialize a stub if not provided
metrics = newInputMetrics("", nil)
}
return &job{
bucket: bucket,
object: object,
Expand All @@ -63,6 +69,7 @@
state: state,
src: src,
publisher: publisher,
metrics: metrics,
log: log,
isFailed: isFailed,
}
Expand All @@ -78,17 +85,33 @@

func (j *job) do(ctx context.Context, id string) {
var fields mapstr.M
// metrics & logging
j.log.Debug("begin gcs object processing.")
j.metrics.gcsObjectsRequestedTotal.Inc()
j.metrics.gcsObjectsInflight.Inc()
start := time.Now()
defer func() {
elapsed := time.Since(start)
j.metrics.gcsObjectsInflight.Dec()
j.metrics.gcsObjectProcessingTime.Update(elapsed.Nanoseconds())
j.log.Debugw("end gcs object processing.", "elapsed_time_ns", elapsed)
}()

if allowedContentTypes[j.object.ContentType] {
if j.object.ContentType == gzType || j.object.ContentEncoding == encodingGzip {
j.isCompressed = true
}
err := j.processAndPublishData(ctx, id)
if err != nil {
j.state.updateFailedJobs(j.object.Name)
j.state.updateFailedJobs(j.object.Name, j.metrics)
j.log.Errorw("job encountered an error while publishing data and has been added to a failed jobs list", "gcs.jobId", id, "error", err)
j.metrics.gcsFailedJobsTotal.Inc()
j.metrics.errorsTotal.Inc()
return
}
j.metrics.gcsObjectsPublishedTotal.Inc()
//nolint:gosec // object size cannot be negative hence this conversion is safe

Check failure on line 113 in x-pack/filebeat/input/gcs/job.go

View workflow job for this annotation

GitHub Actions / lint (linux)

directive `//nolint:gosec // object size cannot be negative hence this conversion is safe` is unused for linter "gosec" (nolintlint)
j.metrics.gcsBytesProcessedTotal.Add(uint64(j.object.Size))

} else {
err := fmt.Errorf("job with jobId %s encountered an error: content-type %s not supported", id, j.object.ContentType)
Expand All @@ -101,9 +124,10 @@
}
event.SetID(objectID(j.hash, 0))
// locks while data is being saved and published to avoid concurrent map read/writes
cp, done := j.state.saveForTx(j.object.Name, j.object.Updated)
cp, done := j.state.saveForTx(j.object.Name, j.object.Updated, j.metrics)
if err := j.publisher.Publish(event, cp); err != nil {
j.log.Errorw("job encountered an error while publishing event", "gcs.jobId", id, "error", err)
j.metrics.errorsTotal.Inc()
}
// unlocks after data is saved and published
done()
Expand Down Expand Up @@ -133,11 +157,21 @@
defer func() {
err = reader.Close()
if err != nil {
j.metrics.errorsTotal.Inc()
j.log.Errorw("failed to close reader for object", "objectName", j.object.Name, "error", err)
}
}()

return j.decode(ctx, reader, id)
// update the source lag time metric
j.metrics.sourceLagTime.Update(time.Since(j.object.Updated).Nanoseconds())

// calculate number of decode errors
if err := j.decode(ctx, reader, id); err != nil {
j.metrics.decodeErrorsTotal.Inc()
return fmt.Errorf("failed to decode object: %s, with error: %w", j.object.Name, err)
}

return nil
}

func (j *job) decode(ctx context.Context, r io.Reader, id string) error {
Expand All @@ -163,7 +197,7 @@
var v mapstr.M
msg, v, err = dec.decodeValue()
if err != nil {
if err == io.EOF {

Check failure on line 200 in x-pack/filebeat/input/gcs/job.go

View workflow job for this annotation

GitHub Actions / lint (linux)

comparing with == will fail on wrapped errors. Use errors.Is to check for a specific error (errorlint)
return nil
}
break
Expand All @@ -172,7 +206,7 @@
} else {
msg, err = dec.decode()
if err != nil {
if err == io.EOF {

Check failure on line 209 in x-pack/filebeat/input/gcs/job.go

View workflow job for this annotation

GitHub Actions / lint (linux)

comparing with == will fail on wrapped errors. Use errors.Is to check for a specific error (errorlint)
return nil
}
break
Expand All @@ -188,7 +222,7 @@
for dec.next() {
msg, err := dec.decode()
if err != nil {
if err == io.EOF {

Check failure on line 225 in x-pack/filebeat/input/gcs/job.go

View workflow job for this annotation

GitHub Actions / lint (linux)

comparing with == will fail on wrapped errors. Use errors.Is to check for a specific error (errorlint)
return nil
}
break
Expand Down Expand Up @@ -241,17 +275,24 @@

// if expand_event_list_from_field is set, then split the event list
if j.src.ExpandEventListFromField != "" {
if err := j.splitEventList(j.src.ExpandEventListFromField, item, offset, j.hash, id); err != nil {
if numEvents, err := j.splitEventList(j.src.ExpandEventListFromField, item, offset, id); err != nil {
return err
} else {
j.metrics.gcsEventsPerObject.Update(int64(numEvents))
}
continue
} else {
j.metrics.gcsEventsPerObject.Update(1)
}

var parsedData []mapstr.M
if j.src.ParseJSON {
parsedData, err = decodeJSON(bytes.NewReader(item))
if err != nil {
j.log.Errorw("job encountered an error", "gcs.jobId", id, "error", err)
// since we do not want to stop processing the job here as this is purely cosmetic and optional, we log the error and continue
j.metrics.errorsTotal.Inc()
j.metrics.decodeErrorsTotal.Inc()
j.log.Errorw("job encountered an error during 'ParseJSON' op", "gcs.jobId", id, "error", err)
}
}
evt := j.createEvent(item, parsedData, offset)
Expand All @@ -263,29 +304,32 @@
func (j *job) publish(evt beat.Event, last bool, id string) {
if last {
// if this is the last object, then perform a complete state save
cp, done := j.state.saveForTx(j.object.Name, j.object.Updated)
cp, done := j.state.saveForTx(j.object.Name, j.object.Updated, j.metrics)
if err := j.publisher.Publish(evt, cp); err != nil {
j.metrics.errorsTotal.Inc()
j.log.Errorw("job encountered an error while publishing event", "gcs.jobId", id, "error", err)
}
done()
return
}
// since we don't update the cursor checkpoint, lack of a lock here is not a problem
if err := j.publisher.Publish(evt, nil); err != nil {
j.metrics.errorsTotal.Inc()
j.log.Errorw("job encountered an error while publishing event", "gcs.jobId", id, "error", err)
}
}

// splitEventList splits the event list into individual events and publishes them
func (j *job) splitEventList(key string, raw json.RawMessage, offset int64, objHash string, id string) error {
func (j *job) splitEventList(key string, raw json.RawMessage, offset int64, id string) (int, error) {
var jsonObject map[string]json.RawMessage
var eventsPerObject int
if err := json.Unmarshal(raw, &jsonObject); err != nil {
return fmt.Errorf("job with job id %s encountered an unmarshaling error: %w", id, err)
return eventsPerObject, fmt.Errorf("job with job id %s encountered an unmarshaling error: %w", id, err)
}

raw, found := jsonObject[key]
if !found {
return fmt.Errorf("expand_event_list_from_field key <%v> is not in event", key)
return eventsPerObject, fmt.Errorf("expand_event_list_from_field key <%v> is not in event", key)
}

dec := json.NewDecoder(bytes.NewReader(raw))
Expand All @@ -294,43 +338,46 @@

tok, err := dec.Token()
if err != nil {
return fmt.Errorf("failed to read JSON token for object: %s, with error: %w", j.object.Name, err)
return eventsPerObject, fmt.Errorf("failed to read JSON token for object: %s, with error: %w", j.object.Name, err)
}
delim, ok := tok.(json.Delim)
if !ok || delim != '[' {
return fmt.Errorf("expand_event_list_from_field <%v> is not an array", key)
return eventsPerObject, fmt.Errorf("expand_event_list_from_field <%v> is not an array", key)
}

for dec.More() {
arrayOffset := dec.InputOffset()

var item json.RawMessage
if err := dec.Decode(&item); err != nil {
return fmt.Errorf("failed to decode array item at offset %d: %w", offset+arrayOffset, err)
return eventsPerObject, fmt.Errorf("failed to decode array item at offset %d: %w", offset+arrayOffset, err)
}

data, err := item.MarshalJSON()
if err != nil {
return fmt.Errorf("job with job id %s encountered a marshaling error: %w", id, err)
return eventsPerObject, fmt.Errorf("job with job id %s encountered a marshaling error: %w", id, err)
}
evt := j.createEvent(data, nil, offset+arrayOffset)

if !dec.More() {
// if this is the last object, then perform a complete state save
cp, done := j.state.saveForTx(j.object.Name, j.object.Updated)
cp, done := j.state.saveForTx(j.object.Name, j.object.Updated, j.metrics)
if err := j.publisher.Publish(evt, cp); err != nil {
j.metrics.errorsTotal.Inc()
j.log.Errorw("job encountered an error while publishing event", "gcs.jobId", id, "error", err)
}
done()
} else {
// since we don't update the cursor checkpoint, lack of a lock here is not a problem
if err := j.publisher.Publish(evt, nil); err != nil {
j.metrics.errorsTotal.Inc()
j.log.Errorw("job encountered an error while publishing event", "gcs.jobId", id, "error", err)
}
}
eventsPerObject++
}

return nil
return eventsPerObject, nil
}

// addGzipDecoderIfNeeded determines whether the given stream of bytes (encapsulated in a buffered reader)
Expand All @@ -338,7 +385,7 @@
// so the function can peek into the byte stream without consuming it. This makes it convenient for
// code executed after this function call to consume the stream if it wants.
func (j *job) addGzipDecoderIfNeeded(reader *bufio.Reader) (io.Reader, error) {
isStreamGzipped := false

Check failure on line 388 in x-pack/filebeat/input/gcs/job.go

View workflow job for this annotation

GitHub Actions / lint (linux)

assigned to isStreamGzipped, but reassigned without using the value (wastedassign)
// check if stream is gziped or not
buf, err := reader.Peek(3)
if err != nil {
Expand Down Expand Up @@ -426,7 +473,7 @@
},
}
event.SetID(objectID(j.hash, offset))

j.metrics.gcsEventsCreatedTotal.Inc()
return event
}

Expand Down
Loading
Loading