diff --git a/testbed/tests/batcher_test.go b/testbed/tests/batcher_test.go index 1de09f555534..6596d5a11709 100644 --- a/testbed/tests/batcher_test.go +++ b/testbed/tests/batcher_test.go @@ -15,6 +15,7 @@ package tests import ( "fmt" "slices" + "strings" "testing" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil" @@ -33,106 +34,88 @@ type batcherTestSpec struct { } func TestLog10kDPSNoProcessors(t *testing.T) { - tests := []batcherTestSpec{ - { - name: "No batching, no queue", - resourceSpec: testbed.ResourceSpec{ - ExpectedMaxCPU: 30, - ExpectedMaxRAM: 120, - }, - }, - { - name: "No batching, queue", - withQueue: true, - resourceSpec: testbed.ResourceSpec{ - ExpectedMaxCPU: 30, - ExpectedMaxRAM: 120, - }, - }, - { - name: "Batch size 1000 with batch processor, no queue", - batchSize: 1000, - withBatchProcessor: true, - resourceSpec: testbed.ResourceSpec{ - ExpectedMaxCPU: 30, - ExpectedMaxRAM: 120, - }, - }, - { - name: "Batch size 1000 with batch processor, queue", - batchSize: 1000, - withBatchProcessor: true, - withQueue: true, - resourceSpec: testbed.ResourceSpec{ - ExpectedMaxCPU: 30, - ExpectedMaxRAM: 120, - }, - }, + resourceSpec := testbed.ResourceSpec{ + ExpectedMaxCPU: 30, + ExpectedMaxRAM: 120, + } + processors := []ProcessorNameAndConfigBody{} + tests := getBatcherTestSpecs(resourceSpec, processors) + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + runBatcherPerfTest(t, test) + }) + } +} + +func TestLog10kDPSWithProcessors(t *testing.T) { + resourceSpec := testbed.ResourceSpec{ + ExpectedMaxCPU: 30, + ExpectedMaxRAM: 120, + } + processors := []ProcessorNameAndConfigBody{ { - name: "Batch size 1000 with exporter batcher, no queue", - withExporterBatcher: true, - batchSize: 1000, - resourceSpec: testbed.ResourceSpec{ - ExpectedMaxCPU: 30, - ExpectedMaxRAM: 120, - }, + Name: "filter", + Body: ` + filter: + logs: + log_record: + - not IsMatch(attributes["batch_index"], "batch_.+") +`, }, { - name: "Batch size 1000 with exporter batcher, queue", - withExporterBatcher: true, - withQueue: true, - batchSize: 1000, - resourceSpec: testbed.ResourceSpec{ - ExpectedMaxCPU: 30, - ExpectedMaxRAM: 120, - }, + Name: "transform", + Body: ` + transform: + log_statements: + - context: log + statements: + - set(resource.attributes["batch_index"], attributes["batch_index"]) + - set(attributes["counter"], ExtractPatterns(body, "Load Generator Counter (?P.+)")) +`, }, } + tests := getBatcherTestSpecs(resourceSpec, processors) for _, test := range tests { t.Run(test.name, func(t *testing.T) { - sender := testbed.NewOTLPLogsDataSender(testbed.DefaultHost, testutil.GetAvailablePort(t)) - receiver := testbed.NewOTLPDataReceiver(testutil.GetAvailablePort(t)) - receiver.WithRetry(` - retry_on_failure: - enabled: true -`) - if test.withQueue { - receiver.WithQueue(` - sending_queue: - enabled: true -`) - } + runBatcherPerfTest(t, test) + }) + } +} - if test.withExporterBatcher { - receiver.WithBatcher(fmt.Sprintf(` - batcher: - enabled: true - min_size_items: %d -`, test.batchSize)) - } +func TestLog10kDPSWithHeavyProcessing(t *testing.T) { + ottlStatementCount := 50 + transformProcessor := ProcessorNameAndConfigBody{ + Name: "transform", + Body: ` + transform: + log_statements: + - context: log + statements: +`, + } + for i := 0; i < ottlStatementCount; i++ { + transformProcessor.Body += strings.Repeat(" ", ottlStatementCount) + "- set(attributes[\"counter\"], ExtractPatterns(body, \"Load Generator Counter (?P.+)\"))\n" + } + processors := []ProcessorNameAndConfigBody{transformProcessor} + resourceSpec := testbed.ResourceSpec{ + ExpectedMaxCPU: 120, + ExpectedMaxRAM: 120, + } + tests := getBatcherTestSpecs(resourceSpec, processors) - processors := slices.Clone(test.processors) - if test.withBatchProcessor { - processors = slices.Insert(processors, 0, ProcessorNameAndConfigBody{ - Name: "batch", - Body: fmt.Sprintf(` - batch: - send_batch_size: %d -`, test.batchSize), - }) - } - loadOptions := &testbed.LoadOptions{ - Parallel: 10, - ItemsPerBatch: 10, - } - Scenario10kItemsPerSecond(t, sender, receiver, test.resourceSpec, performanceResultsSummary, processors, test.extensions, loadOptions) + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + runBatcherPerfTest(t, test) }) } + } -func TestLog10kDPSWithProcessors(t *testing.T) { - processors := []ProcessorNameAndConfigBody{ +func TestLog10kDPSWith20Processors(t *testing.T) { + processorCount := 20 + initialProcessors := []ProcessorNameAndConfigBody{ { Name: "filter", Body: ` @@ -154,107 +137,111 @@ func TestLog10kDPSWithProcessors(t *testing.T) { `, }, } - tests := []batcherTestSpec{ + processors := make([]ProcessorNameAndConfigBody, 0, processorCount) + for i := 0; i < processorCount/len(initialProcessors); i++ { + for _, processor := range initialProcessors { + processorCopy := processor + processorCopy.Name = fmt.Sprintf("%s/%d", processor.Name, i) + processorCopy.Body = strings.ReplaceAll(processorCopy.Body, processor.Name, processorCopy.Name) + processors = append(processors, processorCopy) + } + } + resourceSpec := testbed.ResourceSpec{ + ExpectedMaxCPU: 50, + ExpectedMaxRAM: 120, + } + tests := getBatcherTestSpecs(resourceSpec, processors) + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + runBatcherPerfTest(t, test) + }) + } +} + +func getBatcherTestSpecs(resourceSpec testbed.ResourceSpec, processors []ProcessorNameAndConfigBody) []batcherTestSpec { + testSpecs := []batcherTestSpec{ { - name: "No batching, no queue", - processors: processors, - resourceSpec: testbed.ResourceSpec{ - ExpectedMaxCPU: 30, - ExpectedMaxRAM: 120, - }, + name: "No batching, no queue", + resourceSpec: resourceSpec, + processors: processors, }, { - name: "No batching, queue", - processors: processors, - withQueue: true, - resourceSpec: testbed.ResourceSpec{ - ExpectedMaxCPU: 30, - ExpectedMaxRAM: 120, - }, + name: "No batching, queue", + withQueue: true, + resourceSpec: resourceSpec, + processors: processors, }, { name: "Batch size 1000 with batch processor, no queue", - processors: processors, batchSize: 1000, withBatchProcessor: true, - resourceSpec: testbed.ResourceSpec{ - ExpectedMaxCPU: 30, - ExpectedMaxRAM: 120, - }, + resourceSpec: resourceSpec, + processors: processors, }, { name: "Batch size 1000 with batch processor, queue", - processors: processors, batchSize: 1000, withBatchProcessor: true, withQueue: true, - resourceSpec: testbed.ResourceSpec{ - ExpectedMaxCPU: 30, - ExpectedMaxRAM: 120, - }, + resourceSpec: resourceSpec, + processors: processors, }, { name: "Batch size 1000 with exporter batcher, no queue", - processors: processors, withExporterBatcher: true, batchSize: 1000, - resourceSpec: testbed.ResourceSpec{ - ExpectedMaxCPU: 30, - ExpectedMaxRAM: 120, - }, + resourceSpec: resourceSpec, + processors: processors, }, { name: "Batch size 1000 with exporter batcher, queue", - processors: processors, withExporterBatcher: true, withQueue: true, batchSize: 1000, - resourceSpec: testbed.ResourceSpec{ - ExpectedMaxCPU: 30, - ExpectedMaxRAM: 120, - }, + resourceSpec: resourceSpec, + processors: processors, }, } + return testSpecs +} - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - sender := testbed.NewOTLPLogsDataSender(testbed.DefaultHost, testutil.GetAvailablePort(t)) - receiver := testbed.NewOTLPDataReceiver(testutil.GetAvailablePort(t)) - receiver.WithRetry(` +func runBatcherPerfTest(t *testing.T, spec batcherTestSpec) { + t.Helper() + sender := testbed.NewOTLPLogsDataSender(testbed.DefaultHost, testutil.GetAvailablePort(t)) + receiver := testbed.NewOTLPDataReceiver(testutil.GetAvailablePort(t)) + receiver.WithRetry(` retry_on_failure: enabled: true `) - if test.withQueue { - receiver.WithQueue(` + if spec.withQueue { + receiver.WithQueue(` sending_queue: enabled: true - queue_size: 10 `) - } + } - if test.withExporterBatcher { - receiver.WithBatcher(fmt.Sprintf(` + if spec.withExporterBatcher { + receiver.WithBatcher(fmt.Sprintf(` batcher: enabled: true min_size_items: %d -`, test.batchSize)) - } +`, spec.batchSize)) + } - testProcessors := slices.Clone(test.processors) - if test.withBatchProcessor { - processors = slices.Insert(testProcessors, 0, ProcessorNameAndConfigBody{ - Name: "batch", - Body: fmt.Sprintf(` + processors := slices.Clone(spec.processors) + if spec.withBatchProcessor { + processors = slices.Insert(processors, 0, ProcessorNameAndConfigBody{ + Name: "batch", + Body: fmt.Sprintf(` batch: send_batch_size: %d -`, test.batchSize), - }) - } - loadOptions := &testbed.LoadOptions{ - Parallel: 10, - ItemsPerBatch: 10, - } - Scenario10kItemsPerSecond(t, sender, receiver, test.resourceSpec, performanceResultsSummary, testProcessors, test.extensions, loadOptions) +`, spec.batchSize), }) } + loadOptions := &testbed.LoadOptions{ + Parallel: 10, + ItemsPerBatch: 10, + } + Scenario10kItemsPerSecond(t, sender, receiver, spec.resourceSpec, performanceResultsSummary, processors, spec.extensions, loadOptions) }