Skip to content

Commit

Permalink
Add filebeat receiver to otel mode (#5833)
Browse files Browse the repository at this point in the history
  • Loading branch information
leehinman authored Oct 30, 2024
1 parent fc99ac7 commit 89dca1d
Show file tree
Hide file tree
Showing 7 changed files with 37,673 additions and 17,513 deletions.
54,453 changes: 37,015 additions & 17,438 deletions NOTICE.txt

Large diffs are not rendered by default.

32 changes: 32 additions & 0 deletions changelog/fragments/1729630977-Add-filebeat-otel-receiver.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: enhancement

# Change summary; a 80ish characters long description of the change.
summary: Add filebeat otel receiver

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
#description:

# Affected component; a word indicating the component this changeset affects.
component: elastic-agent

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: https://github.com/elastic/elastic-agent/pull/5833

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
#issue: https://github.com/owner/repo/1234
1 change: 1 addition & 0 deletions dev-tools/notice/overrides.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@
{"name": "github.com/pascaldekloe/goe", "licenceFile": "LICENSE", "licenceType": "CC0-1.0"}
{"name": "github.com/dnaeon/go-vcr", "licenceFile": "LICENSE", "licenceType": "BSD-2-Clause"}
{"name": "github.com/grpc-ecosystem/go-grpc-middleware/v2", "licenceFile": "LICENSE", "licenceType": "Apache-2.0"}
{"name": "github.com/JohnCGriffin/overflow", "licenceFile": "README.md", "licenceType": "MIT"}
150 changes: 134 additions & 16 deletions go.mod

Large diffs are not rendered by default.

419 changes: 370 additions & 49 deletions go.sum

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions internal/pkg/otel/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
zipkinreceiver "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/zipkinreceiver"
otlpreceiver "go.opentelemetry.io/collector/receiver/otlpreceiver"

fbreceiver "github.com/elastic/beats/v7/x-pack/filebeat/fbreceiver"

// Processors:
attributesprocessor "github.com/open-telemetry/opentelemetry-collector-contrib/processor/attributesprocessor" // for modifying signal attributes
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterprocessor"
Expand Down Expand Up @@ -76,6 +78,7 @@ func components() (otelcol.Factories, error) {
prometheusreceiver.NewFactory(),
jaegerreceiver.NewFactory(),
zipkinreceiver.NewFactory(),
fbreceiver.NewFactory(),
)
if err != nil {
return otelcol.Factories{}, err
Expand Down
128 changes: 118 additions & 10 deletions testing/integration/otel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"strings"
"sync"
"testing"
"text/template"
"time"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -126,7 +127,7 @@ func TestOtelFileProcessing(t *testing.T) {
// otel mode should be detected automatically
tempDir := t.TempDir()
cfgFilePath := filepath.Join(tempDir, "otel.yml")
require.NoError(t, os.WriteFile(cfgFilePath, []byte(fileProcessingConfig), 0600))
require.NoError(t, os.WriteFile(cfgFilePath, []byte(fileProcessingConfig), 0o600))

fixture, err := define.NewFixtureFromLocalBuild(t, define.Version(), aTesting.WithAdditionalArgs([]string{"--config", cfgFilePath}))
require.NoError(t, err)
Expand Down Expand Up @@ -183,7 +184,7 @@ func TestOtelFileProcessing(t *testing.T) {

func validateCommandIsWorking(t *testing.T, ctx context.Context, fixture *aTesting.Fixture, tempDir string) {
cfgFilePath := filepath.Join(tempDir, "otel-valid.yml")
require.NoError(t, os.WriteFile(cfgFilePath, []byte(fileProcessingConfig), 0600))
require.NoError(t, os.WriteFile(cfgFilePath, []byte(fileProcessingConfig), 0o600))

// check `elastic-agent otel validate` command works for otel config
cmd, err := fixture.PrepareAgentCommand(ctx, []string{"otel", "validate", "--config", cfgFilePath})
Expand All @@ -199,7 +200,7 @@ func validateCommandIsWorking(t *testing.T, ctx context.Context, fixture *aTesti

// check `elastic-agent otel validate` command works for invalid otel config
cfgFilePath = filepath.Join(tempDir, "otel-invalid.yml")
require.NoError(t, os.WriteFile(cfgFilePath, []byte(fileInvalidOtelConfig), 0600))
require.NoError(t, os.WriteFile(cfgFilePath, []byte(fileInvalidOtelConfig), 0o600))

out, err = fixture.Exec(ctx, []string{"otel", "validate", "--config", cfgFilePath})
require.Error(t, err)
Expand Down Expand Up @@ -276,7 +277,7 @@ func TestOtelLogsIngestion(t *testing.T) {
logsIngestionConfig = strings.ReplaceAll(logsIngestionConfig, "{{.TestId}}", testId)

cfgFilePath := filepath.Join(tempDir, "otel.yml")
require.NoError(t, os.WriteFile(cfgFilePath, []byte(logsIngestionConfig), 0600))
require.NoError(t, os.WriteFile(cfgFilePath, []byte(logsIngestionConfig), 0o600))

fixture, err := define.NewFixtureFromLocalBuild(t, define.Version(), aTesting.WithAdditionalArgs([]string{"--config", cfgFilePath}))
require.NoError(t, err)
Expand All @@ -300,7 +301,7 @@ func TestOtelLogsIngestion(t *testing.T) {

// Write logs to input file.
logsCount := 10_000
inputFile, err := os.OpenFile(inputFilePath, os.O_CREATE|os.O_WRONLY, 0600)
inputFile, err := os.OpenFile(inputFilePath, os.O_CREATE|os.O_WRONLY, 0o600)
require.NoError(t, err)
for i := 0; i < logsCount; i++ {
_, err = fmt.Fprintf(inputFile, "This is a test log message %d\n", i+1)
Expand Down Expand Up @@ -357,8 +358,8 @@ func TestOtelAPMIngestion(t *testing.T) {
cfgFilePath := filepath.Join(tempDir, "otel.yml")
fileName := "content.log"
apmConfig := fmt.Sprintf(apmOtelConfig, filepath.Join(tempDir, fileName), testId)
require.NoError(t, os.WriteFile(cfgFilePath, []byte(apmConfig), 0600))
require.NoError(t, os.WriteFile(filepath.Join(tempDir, fileName), []byte{}, 0600))
require.NoError(t, os.WriteFile(cfgFilePath, []byte(apmConfig), 0o600))
require.NoError(t, os.WriteFile(filepath.Join(tempDir, fileName), []byte{}, 0o600))

fixture, err := define.NewFixtureFromLocalBuild(t, define.Version(), aTesting.WithAdditionalArgs([]string{"--config", cfgFilePath}))
require.NoError(t, err)
Expand Down Expand Up @@ -426,7 +427,7 @@ func TestOtelAPMIngestion(t *testing.T) {
)
require.NoError(t, err, "APM not initialized")

require.NoError(t, os.WriteFile(filepath.Join(tempDir, fileName), []byte(apmProcessingContent), 0600))
require.NoError(t, os.WriteFile(filepath.Join(tempDir, fileName), []byte(apmProcessingContent), 0o600))

// check index
var hits int
Expand All @@ -436,10 +437,12 @@ func TestOtelAPMIngestion(t *testing.T) {

// apm mismatch or proper docs in ES

watchLines := linesTrackMap([]string{"This is a test error message",
watchLines := linesTrackMap([]string{
"This is a test error message",
"This is a test debug message 2",
"This is a test debug message 3",
"This is a test debug message 4"})
"This is a test debug message 4",
})

// failed to get APM version mismatch in time
// processing should be running
Expand Down Expand Up @@ -535,3 +538,108 @@ func mapAtLeastOneTrue(mm map[string]bool) bool {

return false
}

func TestFileBeatReceiver(t *testing.T) {
define.Require(t, define.Requirements{
Group: Default,
Local: true,
OS: []define.OS{
// {Type: define.Windows}, we don't support otel on Windows yet
{Type: define.Linux},
{Type: define.Darwin},
},
})

type otelConfigOptions struct {
Message string
Output string
HomeDir string
}
testMessage := "supercalifragilisticexpialidocious"
tmpDir := t.TempDir()
exporterOutputPath := filepath.Join(tmpDir, "output.json")
t.Cleanup(func() {
if t.Failed() {
contents, err := os.ReadFile(exporterOutputPath)
if err != nil {
t.Logf("No exporter output file")
return
}
t.Logf("Contents of exporter output file:\n%s\n", string(contents))
}
})
otelConfigPath := filepath.Join(tmpDir, "otel.yml")
otelConfigTemplate := `receivers:
filebeatreceiver:
filebeat:
inputs:
- type: benchmark
enabled: true
count: 1
message: {{.Message}}
output:
otelconsumer:
logging:
level: info
selectors:
- '*'
path.home: {{.HomeDir}}
exporters:
file/no_rotation:
path: {{.Output}}
service:
pipelines:
logs:
receivers: [filebeatreceiver]
exporters: [file/no_rotation]
`

var otelConfigBuffer bytes.Buffer
require.NoError(t,
template.Must(template.New("otelConfig").Parse(otelConfigTemplate)).Execute(&otelConfigBuffer,
otelConfigOptions{
Message: testMessage,
Output: exporterOutputPath,
HomeDir: tmpDir,
}))
require.NoError(t, os.WriteFile(otelConfigPath, otelConfigBuffer.Bytes(), 0o600))
t.Cleanup(func() {
if t.Failed() {
contents, err := os.ReadFile(otelConfigPath)
if err != nil {
t.Logf("no otel config file")
return
}
t.Logf("Contents of otel config file:\n%s\n", string(contents))
}
})
fixture, err := define.NewFixtureFromLocalBuild(t, define.Version(), aTesting.WithAdditionalArgs([]string{"--config", otelConfigPath}))
require.NoError(t, err)

ctx, cancel := testcontext.WithDeadline(t, context.Background(), time.Now().Add(5*time.Minute))
defer cancel()
err = fixture.Prepare(ctx, fakeComponent)
require.NoError(t, err)

var fixtureWg sync.WaitGroup
fixtureWg.Add(1)
go func() {
defer fixtureWg.Done()
err = fixture.RunOtelWithClient(ctx)
}()

require.Eventually(t,
func() bool {
content, err := os.ReadFile(exporterOutputPath)
if err != nil || len(content) == 0 {
return false
}
return bytes.Contains(content, []byte(testMessage))
},
3*time.Minute, 1*time.Second,
fmt.Sprintf("there should be exported logs by now"))

cancel()
fixtureWg.Wait()
require.True(t, err == nil || err == context.Canceled || err == context.DeadlineExceeded, "Retrieved unexpected error: %s", err.Error())
}

0 comments on commit 89dca1d

Please sign in to comment.