From 9181e956f4fdd5586424e15599dd8e2e56ae404f Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Tue, 4 Jun 2024 16:07:56 -0700 Subject: [PATCH] Add a memory limit integration test --- collector/test/e2e_test.go | 283 ++++++++++++++++++++++++++++++------- collector/test/go.mod | 3 +- 2 files changed, 236 insertions(+), 50 deletions(-) diff --git a/collector/test/e2e_test.go b/collector/test/e2e_test.go index 7492b00b..ac8e68d5 100644 --- a/collector/test/e2e_test.go +++ b/collector/test/e2e_test.go @@ -6,8 +6,10 @@ package test import ( "context" "encoding/json" + "errors" "fmt" "math/rand" + "strings" "sync" "testing" "time" @@ -15,11 +17,14 @@ import ( "github.com/open-telemetry/otel-arrow/collector/exporter/otelarrowexporter" "github.com/open-telemetry/otel-arrow/collector/receiver/otelarrowreceiver" "github.com/open-telemetry/otel-arrow/collector/testutil" + "github.com/open-telemetry/otel-arrow/pkg/datagen" "github.com/open-telemetry/otel-arrow/pkg/otel/assert" + "github.com/open-telemetry/otel-arrow/pkg/otel/common/arrow" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/pdata/pcommon" @@ -27,14 +32,43 @@ import ( "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" "go.opentelemetry.io/collector/receiver" "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "go.uber.org/zap/zaptest/observer" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) +type testParams struct { + threadCount int + requestCount int +} + +var normalParams = testParams{ + threadCount: 10, + requestCount: 100, +} + +var memoryLimitParams = testParams{ + threadCount: 10, + requestCount: 10, +} + type testConsumer struct { - sink consumertest.TracesSink + sink consumertest.TracesSink + recvLogs *observer.ObservedLogs + expLogs *observer.ObservedLogs } var _ consumer.Traces = &testConsumer{} +type ExpConfig = otelarrowexporter.Config +type RecvConfig = otelarrowreceiver.Config +type CfgFunc func(*ExpConfig, *RecvConfig) +type GenFunc func(int) ptrace.Traces +type MkGen func() GenFunc +type EndFunc func(t *testing.T, tp testParams, testCon *testConsumer, expect [][]ptrace.Traces) +type ConsumerErrFunc func(t *testing.T, err error) + func (*testConsumer) Capabilities() consumer.Capabilities { return consumer.Capabilities{} } @@ -44,11 +78,22 @@ func (tc *testConsumer) ConsumeTraces(ctx context.Context, td ptrace.Traces) err return tc.sink.ConsumeTraces(ctx, td) } -func TestIntegrationSimpleTraces(t *testing.T) { - const ( - threadCount = 10 - requestCount = 100 - ) +func testLoggerSettings(t *testing.T) (component.TelemetrySettings, *observer.ObservedLogs) { + tset := componenttest.NewNopTelemetrySettings() + + core, obslogs := observer.New(zapcore.InfoLevel) + + // Note: if you want to see these logs in development, use: + // tset.Logger = zap.New(zapcore.NewTee(core, zaptest.NewLogger(t).Core())) + // Also see failureMemoryLimitEnding() for explicit tests based on the + // logs observer. + tset.Logger = zap.New(core) + + return tset, obslogs +} + +func basicTestConfig(t *testing.T, cfgF CfgFunc) (*testConsumer, exporter.Traces, receiver.Traces) { + ctx := context.Background() efact := otelarrowexporter.NewFactory() rfact := otelarrowreceiver.NewFactory() @@ -56,8 +101,8 @@ func TestIntegrationSimpleTraces(t *testing.T) { ecfg := efact.CreateDefaultConfig() rcfg := rfact.CreateDefaultConfig() - receiverCfg := rcfg.(*otelarrowreceiver.Config) - exporterCfg := ecfg.(*otelarrowexporter.Config) + receiverCfg := rcfg.(*RecvConfig) + exporterCfg := ecfg.(*ExpConfig) addr := testutil.GetAvailableLocalAddress(t) @@ -70,28 +115,39 @@ func TestIntegrationSimpleTraces(t *testing.T) { exporterCfg.RetryConfig.Enabled = false exporterCfg.Arrow.NumStreams = 1 - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - tset := componenttest.NewNopTelemetrySettings() - tset.Logger, _ = zap.NewDevelopment() + if cfgF != nil { + cfgF(exporterCfg, receiverCfg) + } - host := componenttest.NewNopHost() + expTset, expLogs := testLoggerSettings(t) + recvTset, recvLogs := testLoggerSettings(t) - testCon := &testConsumer{} + testCon := &testConsumer{ + recvLogs: recvLogs, + expLogs: expLogs, + } receiver, err := rfact.CreateTracesReceiver(ctx, receiver.CreateSettings{ ID: component.MustNewID("otelarrowreceiver"), - TelemetrySettings: tset, + TelemetrySettings: recvTset, }, receiverCfg, testCon) require.NoError(t, err) exporter, err := efact.CreateTracesExporter(ctx, exporter.CreateSettings{ ID: component.MustNewID("otelarrowexporter"), - TelemetrySettings: tset, + TelemetrySettings: expTset, }, exporterCfg) require.NoError(t, err) + return testCon, exporter, receiver + +} + +func testIntegrationTraces(ctx context.Context, t *testing.T, tp testParams, cfgf CfgFunc, mkgen MkGen, errf ConsumerErrFunc, endf EndFunc) { + host := componenttest.NewNopHost() + + testCon, exporter, receiver := basicTestConfig(t, cfgf) + var startWG sync.WaitGroup var exporterShutdownWG sync.WaitGroup var startExporterShutdownWG sync.WaitGroup @@ -123,41 +179,17 @@ func TestIntegrationSimpleTraces(t *testing.T) { startWG.Wait() var clientDoneWG sync.WaitGroup // wait for client to finish - var expect [threadCount][]ptrace.Traces + expect := make([][]ptrace.Traces, tp.threadCount) - for num := 0; num < threadCount; num++ { + for num := 0; num < tp.threadCount; num++ { clientDoneWG.Add(1) go func() { defer clientDoneWG.Done() - for i := 0; i < requestCount; i++ { - td := ptrace.NewTraces() - td.ResourceSpans().AppendEmpty().Resource().Attributes().PutStr("resource-attr", fmt.Sprint("resource-attr-val-", i)) - - ss := td.ResourceSpans().At(0).ScopeSpans().AppendEmpty().Spans() - span := ss.AppendEmpty() - - span.SetName("operationA") - span.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Now())) - span.SetEndTimestamp(pcommon.NewTimestampFromTime(time.Now())) - - span.SetTraceID(testutil.UInt64ToTraceID(rand.Uint64(), rand.Uint64())) - span.SetSpanID(testutil.UInt64ToSpanID(rand.Uint64())) - evs := span.Events() - ev0 := evs.AppendEmpty() - ev0.SetTimestamp(pcommon.NewTimestampFromTime(time.Now())) - ev0.SetName("event-with-attr") - ev0.Attributes().PutStr("span-event-attr", "span-event-attr-val") - ev0.SetDroppedAttributesCount(2) - ev1 := evs.AppendEmpty() - ev1.SetTimestamp(pcommon.NewTimestampFromTime(time.Now())) - ev1.SetName("event") - ev1.SetDroppedAttributesCount(2) - span.SetDroppedEventsCount(1) - status := span.Status() - status.SetCode(ptrace.StatusCodeError) - status.SetMessage("status-cancelled") - - require.NoError(t, exporter.ConsumeTraces(ctx, td)) + generator := mkgen() + for i := 0; i < tp.requestCount; i++ { + td := generator(i) + + errf(t, exporter.ConsumeTraces(ctx, td)) expect[num] = append(expect[num], td) } }() @@ -172,8 +204,59 @@ func TestIntegrationSimpleTraces(t *testing.T) { // wait for receiver to shut down receiverShutdownWG.Wait() + endf(t, tp, testCon, expect[:]) +} + +func makeTestTraces(i int) ptrace.Traces { + td := ptrace.NewTraces() + td.ResourceSpans().AppendEmpty().Resource().Attributes().PutStr("resource-attr", fmt.Sprint("resource-attr-val-", i)) + + ss := td.ResourceSpans().At(0).ScopeSpans().AppendEmpty().Spans() + span := ss.AppendEmpty() + + span.SetName("operationA") + span.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Now())) + span.SetEndTimestamp(pcommon.NewTimestampFromTime(time.Now())) + + span.SetTraceID(testutil.UInt64ToTraceID(rand.Uint64(), rand.Uint64())) + span.SetSpanID(testutil.UInt64ToSpanID(rand.Uint64())) + evs := span.Events() + ev0 := evs.AppendEmpty() + ev0.SetTimestamp(pcommon.NewTimestampFromTime(time.Now())) + ev0.SetName("event-with-attr") + ev0.Attributes().PutStr("span-event-attr", "span-event-attr-val") + ev0.SetDroppedAttributesCount(2) + ev1 := evs.AppendEmpty() + ev1.SetTimestamp(pcommon.NewTimestampFromTime(time.Now())) + ev1.SetName("event") + ev1.SetDroppedAttributesCount(2) + span.SetDroppedEventsCount(1) + status := span.Status() + status.SetCode(ptrace.StatusCodeError) + status.SetMessage("status-cancelled") + + return td +} + +func bulkyGenFunc() MkGen { + return func() GenFunc { + entropy := datagen.NewTestEntropy(int64(rand.Uint64())) //nolint:gosec // only used for testing + + tracesGen := datagen.NewTracesGenerator( + entropy, + entropy.NewStandardResourceAttributes(), + entropy.NewStandardInstrumentationScopes(), + ) + return func(_ int) ptrace.Traces { + return tracesGen.Generate(1000, time.Minute) + } + } + +} + +func standardEnding(t *testing.T, tp testParams, testCon *testConsumer, expect [][]ptrace.Traces) { // Check for matching request count and data - require.Equal(t, requestCount*threadCount, testCon.sink.SpanCount()) + require.Equal(t, tp.requestCount*tp.threadCount, testCon.sink.SpanCount()) var expectJSON []json.Marshaler for _, tdn := range expect { @@ -189,3 +272,105 @@ func TestIntegrationSimpleTraces(t *testing.T) { asserter := assert.NewStdUnitTest(t) assert.Equiv(asserter, expectJSON, receivedJSON) } + +// logSigs computes a signature of a structured log message emitted by +// the component via the Zap observer. The encoding is the message, +// "|||", followed by the field names in order, separated by "///". +// +// Specifically, we expect "arrow stream error" messages. When this is +// the case, the corresponding message is returned as a slice. +func logSigs(obs *observer.ObservedLogs) (map[string]int, []string) { + counts := map[string]int{} + var msgs []string + for _, rl := range obs.All() { + var attrs []string + for _, f := range rl.Context { + attrs = append(attrs, f.Key) + + if rl.Message == "arrow stream error" && f.Key == "message" { + msgs = append(msgs, f.String) + } + } + var sig strings.Builder + sig.WriteString(rl.Message) + sig.WriteString("|||") + sig.WriteString(strings.Join(attrs, "///")) + counts[sig.String()]++ + } + return counts, msgs +} + +func countMemoryLimitErrors(msgs []string) (cnt int) { + for _, msg := range msgs { + if _, ok := arrow.NewLimitErrorFromError(errors.New(msg)); ok { + cnt++ + } + } + return +} + +func failureMemoryLimitEnding(t *testing.T, _ testParams, testCon *testConsumer, _ [][]ptrace.Traces) { + require.Equal(t, 0, testCon.sink.SpanCount()) + + eSigs, eMsgs := logSigs(testCon.expLogs) + rSigs, rMsgs := logSigs(testCon.recvLogs) + + // Test for arrow stream errors. + + require.Less(t, 0, eSigs["arrow stream error|||code///message///where"], "should have exporter arrow stream errors: %v", eSigs) + require.Less(t, 0, rSigs["arrow stream error|||code///message///where"], "should have receiver arrow stream errors: %v", rSigs) + + // Ensure the errors include memory limit errors. + + require.Less(t, 0, countMemoryLimitErrors(rMsgs), "should have memory limit errors: %v", rMsgs) + require.Less(t, 0, countMemoryLimitErrors(eMsgs), "should have memory limit errors: %v", eMsgs) +} + +func consumerSuccess(t *testing.T, err error) { + require.NoError(t, err) +} + +func consumerFailure(t *testing.T, err error) { + require.Error(t, err) + + // there should be no permanent errors anywhere in this test. + require.True(t, !consumererror.IsPermanent(err), + "should not be permanent: %v", err) + + stat, ok := status.FromError(err) + require.True(t, ok, "should be a status error: %v", err) + + switch stat.Code() { + case codes.ResourceExhausted, codes.Canceled: + // Cool + default: + // Not cool + t.Fatalf("unexpected status code %v", stat) + } +} + +func TestIntegrationTracesSimple(t *testing.T) { + for _, n := range []int{1, 2, 4, 8} { + t.Run(fmt.Sprint(n), func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + testIntegrationTraces(ctx, t, normalParams, func(ecfg *ExpConfig, rcfg *RecvConfig) { + ecfg.Arrow.NumStreams = n + }, func() GenFunc { return makeTestTraces }, consumerSuccess, standardEnding) + }) + } +} + +func TestIntegrationMemoryLimited(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + go func() { + time.Sleep(5 * time.Second) + cancel() + }() + testIntegrationTraces(ctx, t, memoryLimitParams, func(ecfg *ExpConfig, rcfg *RecvConfig) { + rcfg.Arrow.MemoryLimitMiB = 1 + ecfg.Arrow.NumStreams = 10 + ecfg.TimeoutSettings.Timeout = 5 * time.Second + }, bulkyGenFunc(), consumerFailure, failureMemoryLimitEnding) +} diff --git a/collector/test/go.mod b/collector/test/go.mod index 2e0ce44c..a453883c 100644 --- a/collector/test/go.mod +++ b/collector/test/go.mod @@ -14,6 +14,7 @@ require ( go.opentelemetry.io/collector/pdata v1.5.0 go.opentelemetry.io/collector/receiver v0.98.0 go.uber.org/zap v1.27.0 + google.golang.org/grpc v1.63.2 ) require ( @@ -22,6 +23,7 @@ require ( github.com/axiomhq/hyperloglog v0.0.0-20230201085229-3ddf4bad03dc // indirect github.com/bahlo/generic-list-go v0.2.0 // indirect github.com/beorn7/perks v1.0.1 // indirect + github.com/brianvoe/gofakeit/v6 v6.17.0 // indirect github.com/buger/jsonparser v1.1.1 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect @@ -88,7 +90,6 @@ require ( golang.org/x/tools v0.15.0 // indirect golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda // indirect - google.golang.org/grpc v1.63.2 // indirect google.golang.org/protobuf v1.33.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect )