Skip to content

Commit

Permalink
[chore] Fix windowseventlogreceiver test flakiness (open-telemetry#34793
Browse files Browse the repository at this point in the history
)

**Description:**
There are 3 main issues being addressed on this PR:

* There is a race between start and generating the events in the tests:
the start require goroutines so a delay before generating the events
will be advisable (not perfect but good enough per my tests)
* The tests assume that only the tests are generating events in that
time window, that is impossible to guarantee on a windows box, so
instead of checking for exactly one event they need to get new events
and check the event provider instead
* The exclusion test checks if zero events were delivered by the
receiver, it should check if one not filtered out was received, and this
one should be generated after the event from the provider that should be
excluded.

**Link to tracking Issue:**
Fixes open-telemetry#34687

**Testing:**
Multiple local runs, we should do a few runs on CI given the runner
characteristics.

**Documentation:**
N/A
  • Loading branch information
pjanotti authored and f7o committed Sep 12, 2024
1 parent 393c0df commit 8423a05
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 122 deletions.
6 changes: 4 additions & 2 deletions receiver/windowseventlogreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ require (
golang.org/x/sys v0.24.0
)

require go.opentelemetry.io/collector/consumer/consumertest v0.107.1-0.20240821120936-6764622672bc
require (
go.opentelemetry.io/collector/consumer/consumertest v0.107.1-0.20240821120936-6764622672bc
go.opentelemetry.io/collector/pdata v1.13.1-0.20240821120936-6764622672bc
)

require (
github.com/beorn7/perks v1.0.1 // indirect
Expand Down Expand Up @@ -55,7 +58,6 @@ require (
go.opentelemetry.io/collector/consumer/consumerprofiles v0.107.1-0.20240821120936-6764622672bc // indirect
go.opentelemetry.io/collector/extension v0.107.1-0.20240821120936-6764622672bc // indirect
go.opentelemetry.io/collector/featuregate v1.13.1-0.20240821120936-6764622672bc // indirect
go.opentelemetry.io/collector/pdata v1.13.1-0.20240821120936-6764622672bc // indirect
go.opentelemetry.io/collector/pdata/pprofile v0.107.1-0.20240821120936-6764622672bc // indirect
go.opentelemetry.io/otel v1.28.0 // indirect
go.opentelemetry.io/otel/exporters/prometheus v0.50.0 // indirect
Expand Down
266 changes: 146 additions & 120 deletions receiver/windowseventlogreceiver/receiver_windows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"context"
"encoding/xml"
"path/filepath"
"reflect"
"testing"
"time"

Expand All @@ -18,7 +19,10 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/confmap/confmaptest"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/receiver/receivertest"
"golang.org/x/sys/windows/registry"
"golang.org/x/sys/windows/svc/eventlog"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/consumerretry"
Expand Down Expand Up @@ -70,6 +74,10 @@ func TestCreateWithInvalidInputConfig(t *testing.T) {

func TestReadWindowsEventLogger(t *testing.T) {
logMessage := "Test log"
src := "otel-windowseventlogreceiver-test"
uninstallEventSource, err := assertEventSourceInstallation(t, src)
defer uninstallEventSource()
require.NoError(t, err)

ctx := context.Background()
factory := newFactoryAdapter()
Expand All @@ -85,13 +93,8 @@ func TestReadWindowsEventLogger(t *testing.T) {
defer func() {
require.NoError(t, receiver.Shutdown(ctx))
}()

src := "otel"
err = eventlog.InstallAsEventCreate(src, eventlog.Info|eventlog.Warning|eventlog.Error)
require.NoError(t, err)
defer func() {
require.NoError(t, eventlog.Remove(src))
}()
// Start launches nested goroutines, give them a chance to run before logging the test event(s).
time.Sleep(3 * time.Second)

logger, err := eventlog.Open(src)
require.NoError(t, err)
Expand All @@ -100,19 +103,8 @@ func TestReadWindowsEventLogger(t *testing.T) {
err = logger.Info(10, logMessage)
require.NoError(t, err)

logsReceived := func() bool {
return sink.LogRecordCount() == 1
}

// logs sometimes take a while to be written, so a substantial wait buffer is needed
require.Eventually(t, logsReceived, 20*time.Second, 200*time.Millisecond)
results := sink.AllLogs()
require.Len(t, results, 1)

records := results[0].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords()
require.Equal(t, 1, records.Len())

record := records.At(0)
records := requireExpectedLogRecords(t, sink, src, 1)
record := records[0]
body := record.Body().Map().AsRaw()

require.Equal(t, logMessage, body["message"])
Expand All @@ -134,6 +126,10 @@ func TestReadWindowsEventLogger(t *testing.T) {

func TestReadWindowsEventLoggerRaw(t *testing.T) {
logMessage := "Test log"
src := "otel-windowseventlogreceiver-test"
uninstallEventSource, err := assertEventSourceInstallation(t, src)
defer uninstallEventSource()
require.NoError(t, err)

ctx := context.Background()
factory := newFactoryAdapter()
Expand All @@ -150,13 +146,8 @@ func TestReadWindowsEventLoggerRaw(t *testing.T) {
defer func() {
require.NoError(t, receiver.Shutdown(ctx))
}()

src := "otel"
err = eventlog.InstallAsEventCreate(src, eventlog.Info|eventlog.Warning|eventlog.Error)
defer func() {
require.NoError(t, eventlog.Remove(src))
}()
require.NoError(t, err)
// Start launches nested goroutines, give them a chance to run before logging the test event(s).
time.Sleep(3 * time.Second)

logger, err := eventlog.Open(src)
require.NoError(t, err)
Expand All @@ -165,19 +156,8 @@ func TestReadWindowsEventLoggerRaw(t *testing.T) {
err = logger.Info(10, logMessage)
require.NoError(t, err)

logsReceived := func() bool {
return sink.LogRecordCount() == 1
}

// logs sometimes take a while to be written, so a substantial wait buffer is needed
require.Eventually(t, logsReceived, 20*time.Second, 200*time.Millisecond)
results := sink.AllLogs()
require.Len(t, results, 1)

records := results[0].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords()
require.Equal(t, 1, records.Len())

record := records.At(0)
records := requireExpectedLogRecords(t, sink, src, 1)
record := records[0]
body := record.Body().AsString()
bodyStruct := struct {
Data string `xml:"EventData>Data"`
Expand All @@ -188,91 +168,68 @@ func TestReadWindowsEventLoggerRaw(t *testing.T) {
require.Equal(t, logMessage, bodyStruct.Data)
}

func TestReadWindowsEventLoggerWithExcludeProvider(t *testing.T) {
func TestExcludeProvider(t *testing.T) {
logMessage := "Test log"
src := "otel"

ctx := context.Background()
factory := newFactoryAdapter()
createSettings := receivertest.NewNopSettings()
cfg := createTestConfig()
cfg.InputConfig.ExcludeProviders = []string{src}
sink := new(consumertest.LogsSink)

receiver, err := factory.CreateLogsReceiver(ctx, createSettings, cfg, sink)
require.NoError(t, err)

err = receiver.Start(ctx, componenttest.NewNopHost())
require.NoError(t, err)
defer func() {
require.NoError(t, receiver.Shutdown(ctx))
}()

err = eventlog.InstallAsEventCreate(src, eventlog.Info|eventlog.Warning|eventlog.Error)
require.NoError(t, err)
defer func() {
require.NoError(t, eventlog.Remove(src))
}()

logger, err := eventlog.Open(src)
require.NoError(t, err)
defer logger.Close()

err = logger.Info(10, logMessage)
require.NoError(t, err)

logsReceived := func() bool {
return sink.LogRecordCount() == 0
excludedSrc := "otel-excluded-windowseventlogreceiver-test"
notExcludedSrc := "otel-windowseventlogreceiver-test"
testSources := []string{excludedSrc, notExcludedSrc}

for _, src := range testSources {
uninstallEventSource, err := assertEventSourceInstallation(t, src)
defer uninstallEventSource()
require.NoError(t, err)
}

// logs sometimes take a while to be written, so a substantial wait buffer is needed
require.Eventually(t, logsReceived, 10*time.Second, 200*time.Millisecond)
results := sink.AllLogs()
require.Len(t, results, 0)
}

func TestReadWindowsEventLoggerRawWithExcludeProvider(t *testing.T) {
logMessage := "Test log"
src := "otel"

ctx := context.Background()
factory := newFactoryAdapter()
createSettings := receivertest.NewNopSettings()
cfg := createTestConfig()
cfg.InputConfig.Raw = true
cfg.InputConfig.ExcludeProviders = []string{src}
sink := new(consumertest.LogsSink)

receiver, err := factory.CreateLogsReceiver(ctx, createSettings, cfg, sink)
require.NoError(t, err)

err = receiver.Start(ctx, componenttest.NewNopHost())
require.NoError(t, err)
defer func() {
require.NoError(t, receiver.Shutdown(ctx))
}()

err = eventlog.InstallAsEventCreate(src, eventlog.Info|eventlog.Warning|eventlog.Error)
defer func() {
require.NoError(t, eventlog.Remove(src))
}()
require.NoError(t, err)

logger, err := eventlog.Open(src)
require.NoError(t, err)
defer logger.Close()

err = logger.Info(10, logMessage)
require.NoError(t, err)

logsReceived := func() bool {
return sink.LogRecordCount() == 0
tests := []struct {
name string
raw bool
}{
{
name: "with_EventXML",
},
{
name: "with_Raw",
raw: true,
},
}

// logs sometimes take a while to be written, so a substantial wait buffer is needed
require.Eventually(t, logsReceived, 10*time.Second, 200*time.Millisecond)
results := sink.AllLogs()
require.Len(t, results, 0)
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx := context.Background()
factory := newFactoryAdapter()
createSettings := receivertest.NewNopSettings()
cfg := createTestConfig()
cfg.InputConfig.Raw = tt.raw
cfg.InputConfig.ExcludeProviders = []string{excludedSrc}
sink := new(consumertest.LogsSink)

receiver, err := factory.CreateLogsReceiver(ctx, createSettings, cfg, sink)
require.NoError(t, err)

err = receiver.Start(ctx, componenttest.NewNopHost())
require.NoError(t, err)
defer func() {
require.NoError(t, receiver.Shutdown(ctx))
}()
// Start launches nested goroutines, give them a chance to run before logging the test event(s).
time.Sleep(3 * time.Second)

for _, src := range testSources {
logger, err := eventlog.Open(src)
require.NoError(t, err)
defer logger.Close()

err = logger.Info(10, logMessage)
require.NoError(t, err)
}

records := requireExpectedLogRecords(t, sink, notExcludedSrc, 1)
assert.NotEmpty(t, records)

records = filterAllLogRecordsBySource(t, sink, excludedSrc)
assert.Empty(t, records)
})
}
}

func createTestConfig() *WindowsLogConfig {
Expand All @@ -289,3 +246,72 @@ func createTestConfig() *WindowsLogConfig {
}(),
}
}

// assertEventSourceInstallation installs an event source and verifies that the registry key was created.
// It returns a function that can be used to uninstall the event source, that function is never nil
func assertEventSourceInstallation(t *testing.T, src string) (uninstallEventSource func(), err error) {
err = eventlog.InstallAsEventCreate(src, eventlog.Info|eventlog.Warning|eventlog.Error)
uninstallEventSource = func() {
assert.NoError(t, eventlog.Remove(src))
}
assert.NoError(t, err)
assert.EventuallyWithT(t, func(c *assert.CollectT) {
rk, err := registry.OpenKey(registry.LOCAL_MACHINE, "SYSTEM\\CurrentControlSet\\Services\\EventLog\\Application\\"+src, registry.QUERY_VALUE)
assert.NoError(c, err)
defer rk.Close()
_, _, err = rk.GetIntegerValue("TypesSupported")
assert.NoError(c, err)
}, 10*time.Second, 250*time.Millisecond)

return
}

func requireExpectedLogRecords(t *testing.T, sink *consumertest.LogsSink, expectedEventSrc string, expectedEventCount int) []plog.LogRecord {
var actualLogRecords []plog.LogRecord

// logs sometimes take a while to be written, so a substantial wait buffer is needed
require.EventuallyWithT(t, func(c *assert.CollectT) {
actualLogRecords = filterAllLogRecordsBySource(t, sink, expectedEventSrc)
assert.Equal(c, expectedEventCount, len(actualLogRecords))
}, 10*time.Second, 250*time.Millisecond)

return actualLogRecords
}

func filterAllLogRecordsBySource(t *testing.T, sink *consumertest.LogsSink, src string) (filteredLogRecords []plog.LogRecord) {
for _, logs := range sink.AllLogs() {
resourceLogsSlice := logs.ResourceLogs()
for i := 0; i < resourceLogsSlice.Len(); i++ {
resourceLogs := resourceLogsSlice.At(i)
scopeLogsSlice := resourceLogs.ScopeLogs()
for j := 0; j < scopeLogsSlice.Len(); j++ {
logRecords := scopeLogsSlice.At(j).LogRecords()
for k := 0; k < logRecords.Len(); k++ {
logRecord := logRecords.At(k)
if extractEventSourceFromLogRecord(t, logRecord) == src {
filteredLogRecords = append(filteredLogRecords, logRecord)
}
}
}
}
}

return
}

func extractEventSourceFromLogRecord(t *testing.T, logRecord plog.LogRecord) string {
eventMap := logRecord.Body().Map()
if !reflect.DeepEqual(eventMap, pcommon.Map{}) {
eventProviderMap := eventMap.AsRaw()["provider"]
if providerMap, ok := eventProviderMap.(map[string]any); ok {
return providerMap["name"].(string)
}
require.Fail(t, "Failed to extract event source from log record")
}

// This is a raw event log record. Extract the event source from the XML body string.
bodyString := logRecord.Body().AsString()
var eventXML windows.EventXML
require.NoError(t, xml.Unmarshal([]byte(bodyString), &eventXML))
return eventXML.Provider.Name
}

0 comments on commit 8423a05

Please sign in to comment.