Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[testbed] Add batcher perf tests for heavy processing #36901

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
279 changes: 133 additions & 146 deletions testbed/tests/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package tests
import (
"fmt"
"slices"
"strings"
"testing"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil"
Expand All @@ -33,106 +34,88 @@ type batcherTestSpec struct {
}

func TestLog10kDPSNoProcessors(t *testing.T) {
tests := []batcherTestSpec{
{
name: "No batching, no queue",
resourceSpec: testbed.ResourceSpec{
ExpectedMaxCPU: 30,
ExpectedMaxRAM: 120,
},
},
{
name: "No batching, queue",
withQueue: true,
resourceSpec: testbed.ResourceSpec{
ExpectedMaxCPU: 30,
ExpectedMaxRAM: 120,
},
},
{
name: "Batch size 1000 with batch processor, no queue",
batchSize: 1000,
withBatchProcessor: true,
resourceSpec: testbed.ResourceSpec{
ExpectedMaxCPU: 30,
ExpectedMaxRAM: 120,
},
},
{
name: "Batch size 1000 with batch processor, queue",
batchSize: 1000,
withBatchProcessor: true,
withQueue: true,
resourceSpec: testbed.ResourceSpec{
ExpectedMaxCPU: 30,
ExpectedMaxRAM: 120,
},
},
resourceSpec := testbed.ResourceSpec{
ExpectedMaxCPU: 30,
ExpectedMaxRAM: 120,
}
processors := []ProcessorNameAndConfigBody{}
tests := getBatcherTestSpecs(resourceSpec, processors)

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
runBatcherPerfTest(t, test)
})
}
}

func TestLog10kDPSWithProcessors(t *testing.T) {
resourceSpec := testbed.ResourceSpec{
ExpectedMaxCPU: 30,
ExpectedMaxRAM: 120,
}
processors := []ProcessorNameAndConfigBody{
{
name: "Batch size 1000 with exporter batcher, no queue",
withExporterBatcher: true,
batchSize: 1000,
resourceSpec: testbed.ResourceSpec{
ExpectedMaxCPU: 30,
ExpectedMaxRAM: 120,
},
Name: "filter",
Body: `
filter:
logs:
log_record:
- not IsMatch(attributes["batch_index"], "batch_.+")
`,
},
{
name: "Batch size 1000 with exporter batcher, queue",
withExporterBatcher: true,
withQueue: true,
batchSize: 1000,
resourceSpec: testbed.ResourceSpec{
ExpectedMaxCPU: 30,
ExpectedMaxRAM: 120,
},
Name: "transform",
Body: `
transform:
log_statements:
- context: log
statements:
- set(resource.attributes["batch_index"], attributes["batch_index"])
- set(attributes["counter"], ExtractPatterns(body, "Load Generator Counter (?P<counter>.+)"))
`,
},
}
tests := getBatcherTestSpecs(resourceSpec, processors)

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
sender := testbed.NewOTLPLogsDataSender(testbed.DefaultHost, testutil.GetAvailablePort(t))
receiver := testbed.NewOTLPDataReceiver(testutil.GetAvailablePort(t))
receiver.WithRetry(`
retry_on_failure:
enabled: true
`)
if test.withQueue {
receiver.WithQueue(`
sending_queue:
enabled: true
`)
}
runBatcherPerfTest(t, test)
})
}
}

if test.withExporterBatcher {
receiver.WithBatcher(fmt.Sprintf(`
batcher:
enabled: true
min_size_items: %d
`, test.batchSize))
}
func TestLog10kDPSWithHeavyProcessing(t *testing.T) {
ottlStatementCount := 50
transformProcessor := ProcessorNameAndConfigBody{
Name: "transform",
Body: `
transform:
log_statements:
- context: log
statements:
`,
}
for i := 0; i < ottlStatementCount; i++ {
transformProcessor.Body += strings.Repeat(" ", ottlStatementCount) + "- set(attributes[\"counter\"], ExtractPatterns(body, \"Load Generator Counter (?P<counter>.+)\"))\n"
}
processors := []ProcessorNameAndConfigBody{transformProcessor}
resourceSpec := testbed.ResourceSpec{
ExpectedMaxCPU: 120,
ExpectedMaxRAM: 120,
}
tests := getBatcherTestSpecs(resourceSpec, processors)

processors := slices.Clone(test.processors)
if test.withBatchProcessor {
processors = slices.Insert(processors, 0, ProcessorNameAndConfigBody{
Name: "batch",
Body: fmt.Sprintf(`
batch:
send_batch_size: %d
`, test.batchSize),
})
}
loadOptions := &testbed.LoadOptions{
Parallel: 10,
ItemsPerBatch: 10,
}
Scenario10kItemsPerSecond(t, sender, receiver, test.resourceSpec, performanceResultsSummary, processors, test.extensions, loadOptions)
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
runBatcherPerfTest(t, test)
})
}

}

func TestLog10kDPSWithProcessors(t *testing.T) {
processors := []ProcessorNameAndConfigBody{
func TestLog10kDPSWith20Processors(t *testing.T) {
processorCount := 20
initialProcessors := []ProcessorNameAndConfigBody{
{
Name: "filter",
Body: `
Expand All @@ -154,107 +137,111 @@ func TestLog10kDPSWithProcessors(t *testing.T) {
`,
},
}
tests := []batcherTestSpec{
processors := make([]ProcessorNameAndConfigBody, 0, processorCount)
for i := 0; i < processorCount/len(initialProcessors); i++ {
for _, processor := range initialProcessors {
processorCopy := processor
processorCopy.Name = fmt.Sprintf("%s/%d", processor.Name, i)
processorCopy.Body = strings.ReplaceAll(processorCopy.Body, processor.Name, processorCopy.Name)
processors = append(processors, processorCopy)
}
}
resourceSpec := testbed.ResourceSpec{
ExpectedMaxCPU: 50,
ExpectedMaxRAM: 120,
}
tests := getBatcherTestSpecs(resourceSpec, processors)

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
runBatcherPerfTest(t, test)
})
}
}

func getBatcherTestSpecs(resourceSpec testbed.ResourceSpec, processors []ProcessorNameAndConfigBody) []batcherTestSpec {
testSpecs := []batcherTestSpec{
{
name: "No batching, no queue",
processors: processors,
resourceSpec: testbed.ResourceSpec{
ExpectedMaxCPU: 30,
ExpectedMaxRAM: 120,
},
name: "No batching, no queue",
resourceSpec: resourceSpec,
processors: processors,
},
{
name: "No batching, queue",
processors: processors,
withQueue: true,
resourceSpec: testbed.ResourceSpec{
ExpectedMaxCPU: 30,
ExpectedMaxRAM: 120,
},
name: "No batching, queue",
withQueue: true,
resourceSpec: resourceSpec,
processors: processors,
},
{
name: "Batch size 1000 with batch processor, no queue",
processors: processors,
batchSize: 1000,
withBatchProcessor: true,
resourceSpec: testbed.ResourceSpec{
ExpectedMaxCPU: 30,
ExpectedMaxRAM: 120,
},
resourceSpec: resourceSpec,
processors: processors,
},
{
name: "Batch size 1000 with batch processor, queue",
processors: processors,
batchSize: 1000,
withBatchProcessor: true,
withQueue: true,
resourceSpec: testbed.ResourceSpec{
ExpectedMaxCPU: 30,
ExpectedMaxRAM: 120,
},
resourceSpec: resourceSpec,
processors: processors,
},
{
name: "Batch size 1000 with exporter batcher, no queue",
processors: processors,
withExporterBatcher: true,
batchSize: 1000,
resourceSpec: testbed.ResourceSpec{
ExpectedMaxCPU: 30,
ExpectedMaxRAM: 120,
},
resourceSpec: resourceSpec,
processors: processors,
},
{
name: "Batch size 1000 with exporter batcher, queue",
processors: processors,
withExporterBatcher: true,
withQueue: true,
batchSize: 1000,
resourceSpec: testbed.ResourceSpec{
ExpectedMaxCPU: 30,
ExpectedMaxRAM: 120,
},
resourceSpec: resourceSpec,
processors: processors,
},
}
return testSpecs
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
sender := testbed.NewOTLPLogsDataSender(testbed.DefaultHost, testutil.GetAvailablePort(t))
receiver := testbed.NewOTLPDataReceiver(testutil.GetAvailablePort(t))
receiver.WithRetry(`
func runBatcherPerfTest(t *testing.T, spec batcherTestSpec) {
t.Helper()
sender := testbed.NewOTLPLogsDataSender(testbed.DefaultHost, testutil.GetAvailablePort(t))
receiver := testbed.NewOTLPDataReceiver(testutil.GetAvailablePort(t))
receiver.WithRetry(`
retry_on_failure:
enabled: true
`)
if test.withQueue {
receiver.WithQueue(`
if spec.withQueue {
receiver.WithQueue(`
sending_queue:
enabled: true
queue_size: 10
`)
}
}

if test.withExporterBatcher {
receiver.WithBatcher(fmt.Sprintf(`
if spec.withExporterBatcher {
receiver.WithBatcher(fmt.Sprintf(`
batcher:
enabled: true
min_size_items: %d
`, test.batchSize))
}
`, spec.batchSize))
}

testProcessors := slices.Clone(test.processors)
if test.withBatchProcessor {
processors = slices.Insert(testProcessors, 0, ProcessorNameAndConfigBody{
Name: "batch",
Body: fmt.Sprintf(`
processors := slices.Clone(spec.processors)
if spec.withBatchProcessor {
processors = slices.Insert(processors, 0, ProcessorNameAndConfigBody{
Name: "batch",
Body: fmt.Sprintf(`
batch:
send_batch_size: %d
`, test.batchSize),
})
}
loadOptions := &testbed.LoadOptions{
Parallel: 10,
ItemsPerBatch: 10,
}
Scenario10kItemsPerSecond(t, sender, receiver, test.resourceSpec, performanceResultsSummary, testProcessors, test.extensions, loadOptions)
`, spec.batchSize),
})
}
loadOptions := &testbed.LoadOptions{
Parallel: 10,
ItemsPerBatch: 10,
}
Scenario10kItemsPerSecond(t, sender, receiver, spec.resourceSpec, performanceResultsSummary, processors, spec.extensions, loadOptions)
}
Loading