Skip to content

Commit

Permalink
address PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
dehaansa committed Feb 15, 2025
1 parent bef8864 commit 15801fe
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ The following arguments are supported:
|---------------------------------|---------------------|--------------------------------------------------------------------------------------------|-------------|----------|
| `include` | `list(string)` | A list of glob patterns to include files. | `[]` | yes |
| `exclude` | `list(string)` | A list of glob patterns to exclude files that would be included by the `include` patterns. | `[]` | no |
| `poll_interval` | `time.Duration` | The interval at which the file is polled for new entries. | `1s` | no |
| `poll_interval` | `time.Duration` | The interval at which the file is polled for new entries. | `200ms` | no |
| `max_concurrent_files` | `int` | The maximum number of files to read concurrently. | `10` | no |
| `max_batches` | `int` | The maximum number of batches to process concurrently. | `10` | no |
| `start_at` | `string` | The position to start reading the file from. | `beginning` | no |
Expand Down Expand Up @@ -104,7 +104,7 @@ The following blocks are supported inside the definition of
| retry_on_failure | [retry_on_failure][] | Configures the retry behavior when the receiver encounters an error downstream in the pipeline. | no |
| debug_metrics | [debug_metrics][] | Configures the metrics that this component generates to monitor its state. | no |
| ordering_criteria | [ordering_criteria][] | Configures the order in which log files are processed. | no |
| ordering_criteria > sort_by | [sort_by][] | Configures the fields to sort by within the ordering critera. | no |
| ordering_criteria > sort_by | [sort_by][] | Configures the fields to sort by within the ordering critera. | yes |

The `>` symbol indicates deeper levels of nesting. For example, `ordering_criteria > sort_by`
refers to a `sort_by` block defined inside a `ordering_criteria` block.
Expand Down
3 changes: 3 additions & 0 deletions internal/alloycli/automemlimit_linux.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
//go:build linux
// +build linux

package alloycli

import (
Expand Down
2 changes: 2 additions & 0 deletions internal/component/otelcol/config_stanza_receivers.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// Stanza is the name of the logs agent that was donated to the OpenTelemetry project.
// Stanza receivers are logs receivers built of stanza operators.
package otelcol

import (
Expand Down
36 changes: 27 additions & 9 deletions internal/component/otelcol/receiver/filelog/filelog.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,10 +244,11 @@ func (args Arguments) NextConsumers() *otelcol.ConsumerArguments {
// Validate implements syntax.Validator.
func (args *Arguments) Validate() error {
var errs error
// Convert will validate operator(s)
_, err := args.Convert()
if err != nil {
errs = multierror.Append(errs, err)
for _, op := range args.Operators {
_, err := op.Convert()
if err != nil {
errs = multierror.Append(errs, fmt.Errorf("failed to parse 'operator': %w", err))
}
}

if args.MaxConcurrentFiles < 1 {
Expand All @@ -262,9 +263,9 @@ func (args *Arguments) Validate() error {
errs = multierror.Append(errs, errors.New("'delete_after_read' cannot be used with 'start_at = end'"))
}

_, err = decode.LookupEncoding(args.Encoding)
_, err := decode.LookupEncoding(args.Encoding)
if err != nil {
errs = multierror.Append(errs, fmt.Errorf("invalid encoding: %w", err))
errs = multierror.Append(errs, fmt.Errorf("invalid 'encoding': %w", err))
}

if args.MatchCriteria.OrderingCriteria != nil {
Expand All @@ -274,13 +275,13 @@ func (args *Arguments) Validate() error {

for _, s := range args.MatchCriteria.OrderingCriteria.SortBy {
if !slices.Contains([]string{"timestamp", "numeric", "lexicographic", "mtime"}, s.SortType) {
errs = multierror.Append(errs, fmt.Errorf("invalid sort type: %s", s.SortType))
errs = multierror.Append(errs, fmt.Errorf("invalid 'sort_type': %s", s.SortType))
}
}
}

if args.Compression != "" && args.Compression != "gzip" {
errs = multierror.Append(errs, fmt.Errorf("invalid compression type: %s", args.Compression))
errs = multierror.Append(errs, fmt.Errorf("invalid 'compression' type: %s", args.Compression))
}

if args.PollInterval < 0 {
Expand All @@ -299,9 +300,26 @@ func (args *Arguments) Validate() error {
errs = multierror.Append(errs, errors.New("'force_flush_period' must not be negative"))
}

if args.MatchCriteria.ExcludeOlderThan < 0 {
errs = multierror.Append(errs, errors.New("'exclude_older_than' must not be negative"))
}

if args.MultilineConfig != nil {
if err := args.MultilineConfig.Validate(); err != nil {
errs = multierror.Append(errs, fmt.Errorf("invalid multiline: %w", err))
errs = multierror.Append(errs, fmt.Errorf("invalid 'multiline': %w", err))
}
}

if args.Header != nil {
if len(args.Header.MetadataOperators) == 0 {
errs = multierror.Append(errs, errors.New("'header' requires at least one 'metadata_operator'"))
} else {
for _, op := range args.Header.MetadataOperators {
_, err := op.Convert()
if err != nil {
errs = multierror.Append(errs, fmt.Errorf("failed to parse 'metadata_operator': %w", err))
}
}
}
}

Expand Down
70 changes: 68 additions & 2 deletions internal/component/otelcol/receiver/filelog/filelog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func Test(t *testing.T) {

require.NoError(t, ctrl.WaitRunning(3*time.Second))

// TODO(@dehaansa) - test if this is removeable after https://github.com/grafana/alloy/pull/2262
// TODO(@dehaansa) - discover a better way to wait for otelcol component readiness
time.Sleep(1 * time.Second)

// Add a log message to the file
Expand Down Expand Up @@ -127,7 +127,10 @@ func TestUnmarshal(t *testing.T) {
header {
pattern = "^HEADER .*$"
metadata_operators = []
metadata_operators = [{
type = "regex_parser",
regex = "^HEADER env='(?P<env>[^ ]+)'",
}]
}
multiline {
Expand Down Expand Up @@ -159,4 +162,67 @@ func TestUnmarshal(t *testing.T) {
var args filelog.Arguments
err := syntax.Unmarshal([]byte(alloyCfg), &args)
require.NoError(t, err)

err = args.Validate()
require.NoError(t, err)
}

func TestValidate(t *testing.T) {
alloyCfg := `
include = ["/var/log/*.log"]
exclude_older_than = "-5m"
ordering_criteria {
regex = "^(?P<timestamp>\\d{8})_(?P<severity>\\d+)_"
top_n = -3
group_by = "severity"
sort_by {
sort_type = "std_dev"
regex_key = "severity"
ascending = true
}
}
poll_interval = "-10s"
max_concurrent_files = 0
max_batches = -3
start_at = "middle"
fingerprint_size = "-4KiB"
max_log_size = "-3MiB"
encoding = "webdings"
force_flush_period = "-5s"
compression = "tar"
header {
pattern = "^HEADER .*$"
metadata_operators = []
}
operators = [{
type = "regex_parser",
regex = "^(?P<timestamp>[^ ]+)",
timestamp = {
parse_from = "timestamp",
layout = "%Y-%m-%dT%H:%M:%S.%fZ",
location = "UTC",
},
}]
output {}
`
var args filelog.Arguments
err := syntax.Unmarshal([]byte(alloyCfg), &args)
require.Error(t, err)
require.Contains(t, err.Error(), "error decoding 'parse_from': unrecognized prefix")
require.Contains(t, err.Error(), "'max_concurrent_files' must be positive")
require.Contains(t, err.Error(), "'max_batches' must not be negative")
require.Contains(t, err.Error(), "invalid 'encoding': unsupported encoding 'webdings'")
require.Contains(t, err.Error(), "'top_n' must not be negative")
require.Contains(t, err.Error(), "invalid 'sort_type': std_dev")
require.Contains(t, err.Error(), "invalid 'compression' type: tar")
require.Contains(t, err.Error(), "'poll_interval' must not be negative")
require.Contains(t, err.Error(), "'fingerprint_size' must not be negative")
require.Contains(t, err.Error(), "'max_log_size' must not be negative")
require.Contains(t, err.Error(), "'force_flush_period' must not be negative")
require.Contains(t, err.Error(), "'header' requires at least one 'metadata_operator'")
require.Contains(t, err.Error(), "'exclude_older_than' must not be negative")
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,16 @@ func (filelogReceiverConverter) ConvertAndAppend(state *State, id componentstatu
// TODO(@dehaansa) - find a way to convert the operators
if len(cfg.(*filelogreceiver.FileLogConfig).Operators) > 0 {
diags.Add(
diag.SeverityLevelInfo,
fmt.Sprintf("Operators cannot currently be translated for %s", StringifyInstanceID(id)),
diag.SeverityLevelWarn,
fmt.Sprintf("operators cannot currently be translated for %s", StringifyInstanceID(id)),
)
}

// TODO(@dehaansa) - find a way to convert the metadata operators
if cfg.(*filelogreceiver.FileLogConfig).InputConfig.Header != nil {
diags.Add(
diag.SeverityLevelInfo,
fmt.Sprintf("Header cannot currently be translated for %s", StringifyInstanceID(id)),
diag.SeverityLevelWarn,
fmt.Sprintf("header metadata_operators cannot currently be translated for %s", StringifyInstanceID(id)),
)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
(Warn) operators cannot currently be translated for filelog
(Warn) header metadata_operators cannot currently be translated for filelog

0 comments on commit 15801fe

Please sign in to comment.