From e4ebf06f6915e820a1241e8f3069ec1ab39aaca9 Mon Sep 17 00:00:00 2001 From: querel Date: Fri, 3 Nov 2023 17:36:47 -0700 Subject: [PATCH] fix: fix compression rate degradation happening after a dictionary overflow event for some workload --- pkg/config/config.go | 30 ++++++- pkg/otel/arrow_record/producer.go | 30 ++++++- pkg/otel/common/arrow/all_test.go | 2 +- pkg/otel/common/arrow/related_data.go | 8 +- pkg/otel/common/arrow_test/schema_test.go | 2 +- pkg/otel/common/otlp/any_value_test.go | 2 +- pkg/otel/common/schema/config/dictionary.go | 17 ++-- .../common/schema/transform/dictionary.go | 25 ++++-- .../schema/transform/dictionary_test.go | 8 +- .../schema/update/schema_update_request.go | 25 +++++- pkg/otel/logs/arrow/all_test.go | 2 +- pkg/otel/logs/validation_test.go | 2 +- pkg/otel/metrics/validation_test.go | 2 +- pkg/otel/observer/producer_observer.go | 8 ++ pkg/otel/traces/arrow/all_test.go | 2 +- pkg/otel/traces/validation_test.go | 2 +- tools/mem_benchmark/main.go | 2 +- tools/trace_producer_simu/main.go | 87 ++++++++++++++++++- 18 files changed, 218 insertions(+), 38 deletions(-) diff --git a/pkg/config/config.go b/pkg/config/config.go index f6cc6bc5..c5e3a28d 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -105,6 +105,14 @@ type Config struct { // LimitIndexSize sets the maximum size of a dictionary index // before it is no longer encoded as a dictionary. LimitIndexSize uint64 + // DictResetThreshold specifies the ratio under which a dictionary overflow + // is converted to a dictionary reset. This ratio is calculated as: + // (# of unique values in the dict) / (# of values inserted in the dict.) + // This ratio characterizes the efficiency of the dictionary. Smaller is the + // ratio, more efficient is the dictionary in term compression ratio because + // it means that the dictionary entries are reused more often. + DictResetThreshold float64 + // Zstd enables the use of ZSTD compression for IPC messages. Zstd bool // Use IPC ZSTD compression @@ -145,13 +153,19 @@ type Option func(*Config) // - Zstd: true func DefaultConfig() *Config { return &Config{ - Pool: memory.NewGoAllocator(), + Pool: memory.NewGoAllocator(), + InitIndexSize: math.MaxUint16, // The default dictionary index limit is set to 2^16 - 1 // to keep the overall memory usage of the encoder and decoder low. LimitIndexSize: math.MaxUint16, - SchemaStats: false, - Zstd: true, + // The default dictionary reset threshold is set to 0.3 based on + // empirical observations. I suggest to run more controlled experiments + // to find a more optimal value for the majority of workloads. + DictResetThreshold: 0.3, + + SchemaStats: false, + Zstd: true, OrderSpanBy: OrderSpanByNameTraceID, OrderAttrs16By: OrderAttrs16ByTypeKeyValueParentId, @@ -319,3 +333,13 @@ func WithObserver(observer observer.ProducerObserver) Option { cfg.Observer = observer } } + +// WithDictResetThreshold sets the ratio under which a dictionary overflow +// is converted to a dictionary reset. This ratio is calculated as: +// +// (# of unique values in the dict) / (# of values inserted in the dict.) +func WithDictResetThreshold(dictResetThreshold float64) Option { + return func(cfg *Config) { + cfg.DictResetThreshold = dictResetThreshold + } +} diff --git a/pkg/otel/arrow_record/producer.go b/pkg/otel/arrow_record/producer.go index cea6be58..8c3ad1a8 100644 --- a/pkg/otel/arrow_record/producer.go +++ b/pkg/otel/arrow_record/producer.go @@ -132,11 +132,31 @@ func NewProducerWithOptions(options ...cfg.Option) *Producer { stats.ProducerStats = conf.ProducerStats // Record builders - metricsRecordBuilder := builder.NewRecordBuilderExt(conf.Pool, metricsarrow.MetricsSchema, config.NewDictionary(conf.LimitIndexSize), stats, conf.Observer) + metricsRecordBuilder := builder.NewRecordBuilderExt( + conf.Pool, + metricsarrow.MetricsSchema, + config.NewDictionary(conf.LimitIndexSize, conf.DictResetThreshold), + stats, + conf.Observer, + ) metricsRecordBuilder.SetLabel("metrics") - logsRecordBuilder := builder.NewRecordBuilderExt(conf.Pool, logsarrow.LogsSchema, config.NewDictionary(conf.LimitIndexSize), stats, conf.Observer) + + logsRecordBuilder := builder.NewRecordBuilderExt( + conf.Pool, + logsarrow.LogsSchema, + config.NewDictionary(conf.LimitIndexSize, conf.DictResetThreshold), + stats, + conf.Observer, + ) logsRecordBuilder.SetLabel("logs") - tracesRecordBuilder := builder.NewRecordBuilderExt(conf.Pool, tracesarrow.TracesSchema, config.NewDictionary(conf.LimitIndexSize), stats, conf.Observer) + + tracesRecordBuilder := builder.NewRecordBuilderExt( + conf.Pool, + tracesarrow.TracesSchema, + config.NewDictionary(conf.LimitIndexSize, conf.DictResetThreshold), + stats, + conf.Observer, + ) tracesRecordBuilder.SetLabel("traces") // Entity builders @@ -574,3 +594,7 @@ func (o *consoleObserver) OnDictionaryOverflow(recordName string, fieldPath stri } func (o *consoleObserver) OnSchemaUpdate(recordName string, old, new *arrow.Schema) {} + +func (o *consoleObserver) OnDictionaryReset(recordName string, fieldPath string, indexType arrow.DataType, card, total uint64) { +} +func (o *consoleObserver) OnMetadataUpdate(recordName, metadataKey string) {} diff --git a/pkg/otel/common/arrow/all_test.go b/pkg/otel/common/arrow/all_test.go index e09e664c..21b6a810 100644 --- a/pkg/otel/common/arrow/all_test.go +++ b/pkg/otel/common/arrow/all_test.go @@ -31,7 +31,7 @@ import ( "github.com/open-telemetry/otel-arrow/pkg/otel/stats" ) -var DefaultDictConfig = cfg.NewDictionary(math.MaxUint16) +var DefaultDictConfig = cfg.NewDictionary(math.MaxUint16, 0.0) var ProducerStats = stats.NewProducerStats() func TestAttributesBuilder(t *testing.T) { diff --git a/pkg/otel/common/arrow/related_data.go b/pkg/otel/common/arrow/related_data.go index daef60cb..dbcd6e05 100644 --- a/pkg/otel/common/arrow/related_data.go +++ b/pkg/otel/common/arrow/related_data.go @@ -227,7 +227,13 @@ func (m *RelatedRecordsManager) Declare( rrBuilder func(b *builder.RecordBuilderExt) RelatedRecordBuilder, observer observer.ProducerObserver, ) RelatedRecordBuilder { - builderExt := builder.NewRecordBuilderExt(m.cfg.Pool, schema, config.NewDictionary(m.cfg.LimitIndexSize), m.stats, observer) + builderExt := builder.NewRecordBuilderExt( + m.cfg.Pool, + schema, + config.NewDictionary(m.cfg.LimitIndexSize, m.cfg.DictResetThreshold), + m.stats, + observer, + ) builderExt.SetLabel(payloadType.SchemaPrefix()) rBuilder := rrBuilder(builderExt) m.builders = append(m.builders, rBuilder) diff --git a/pkg/otel/common/arrow_test/schema_test.go b/pkg/otel/common/arrow_test/schema_test.go index 17476a67..158d8648 100644 --- a/pkg/otel/common/arrow_test/schema_test.go +++ b/pkg/otel/common/arrow_test/schema_test.go @@ -96,7 +96,7 @@ var ( producerStats = stats.NewProducerStats() ) -var DictConfig = config.NewDictionary(math.MaxUint16) +var DictConfig = config.NewDictionary(math.MaxUint16, 0.0) func TestTimestampOnly(t *testing.T) { pool := memory.NewCheckedAllocator(memory.NewGoAllocator()) diff --git a/pkg/otel/common/otlp/any_value_test.go b/pkg/otel/common/otlp/any_value_test.go index 39550f66..26214291 100644 --- a/pkg/otel/common/otlp/any_value_test.go +++ b/pkg/otel/common/otlp/any_value_test.go @@ -35,7 +35,7 @@ import ( "github.com/open-telemetry/otel-arrow/pkg/otel/stats" ) -var DefaultDictConfig = cfg.NewDictionary(math.MaxUint16) +var DefaultDictConfig = cfg.NewDictionary(math.MaxUint16, 0.0) var producerStats = stats.NewProducerStats() func TestEmptyAnyValue(t *testing.T) { diff --git a/pkg/otel/common/schema/config/dictionary.go b/pkg/otel/common/schema/config/dictionary.go index e52e573d..37131d89 100644 --- a/pkg/otel/common/schema/config/dictionary.go +++ b/pkg/otel/common/schema/config/dictionary.go @@ -27,13 +27,14 @@ import "math" // if MaxCard is equal to 0, then the dictionary field will be converted to its // base type no matter what. type Dictionary struct { - MinCard uint64 - MaxCard uint64 + MinCard uint64 + MaxCard uint64 + ResetThreshold float64 } // NewDictionary creates a new dictionary configuration with the given maximum // cardinality. -func NewDictionary(maxCard uint64) *Dictionary { +func NewDictionary(maxCard uint64, resetThreshold float64) *Dictionary { // If `maxCard` is 0 (no dictionary configuration), then the dictionary // field will be converted to its base type no matter what. So, the minimum // cardinality will be set to 0. @@ -42,8 +43,9 @@ func NewDictionary(maxCard uint64) *Dictionary { minCard = maxCard } return &Dictionary{ - MinCard: minCard, - MaxCard: maxCard, + MinCard: minCard, + MaxCard: maxCard, + ResetThreshold: resetThreshold, } } @@ -57,7 +59,8 @@ func NewDictionaryFrom(minCard uint64, dicProto *Dictionary) *Dictionary { minCard = dicProto.MaxCard } return &Dictionary{ - MinCard: minCard, - MaxCard: dicProto.MaxCard, + MinCard: minCard, + MaxCard: dicProto.MaxCard, + ResetThreshold: dicProto.ResetThreshold, } } diff --git a/pkg/otel/common/schema/transform/dictionary.go b/pkg/otel/common/schema/transform/dictionary.go index 680ea7b2..d4c6fed5 100644 --- a/pkg/otel/common/schema/transform/dictionary.go +++ b/pkg/otel/common/schema/transform/dictionary.go @@ -39,6 +39,11 @@ var ( // a given index type. // If the index type is nil, the dictionary is downgraded to its value type. type DictionaryField struct { + // Configuration for this dictionary. + // This configuration could be shared between multiple dictionaries. + config *cfg.Dictionary + + // Path of the dictionary field. path string // Dictionary ID @@ -71,6 +76,7 @@ func NewDictionaryField( events *events.Events, ) *DictionaryField { df := DictionaryField{ + config: config, path: path, DictID: dictID, cardinality: 0, @@ -172,12 +178,19 @@ func (t *DictionaryField) updateIndexType(stats *stats.RecordBuilderStats) { t.currentIndex++ } if t.currentIndex >= len(t.indexTypes) { - t.indexTypes = nil - t.indexMaxCard = nil - t.currentIndex = 0 - t.schemaUpdateRequest.Inc(&update.DictionaryOverflowEvent{FieldName: t.path, PrevIndexType: prevIndexType, NewIndexType: t.IndexType(), Cardinality: t.cardinality, Total: t.cumulativeTotal}) - t.events.DictionariesWithOverflow[t.path] = true - stats.DictionaryOverflowDetected++ + ratio := float64(t.cardinality) / float64(t.cumulativeTotal) + if ratio < t.config.ResetThreshold { + t.currentIndex = len(t.indexTypes) - 1 + t.schemaUpdateRequest.Inc(&update.DictionaryResetEvent{FieldName: t.path, IndexType: t.IndexType(), Cardinality: t.cardinality, Total: t.cumulativeTotal}) + t.cumulativeTotal = 0 + } else { + t.indexTypes = nil + t.indexMaxCard = nil + t.currentIndex = 0 + t.schemaUpdateRequest.Inc(&update.DictionaryOverflowEvent{FieldName: t.path, PrevIndexType: prevIndexType, NewIndexType: t.IndexType(), Cardinality: t.cardinality, Total: t.cumulativeTotal}) + t.events.DictionariesWithOverflow[t.path] = true + stats.DictionaryOverflowDetected++ + } } else if t.currentIndex != currentIndex { t.schemaUpdateRequest.Inc(&update.DictionaryUpgradeEvent{FieldName: t.path, PrevIndexType: prevIndexType, NewIndexType: t.IndexType(), Cardinality: t.cardinality, Total: t.cumulativeTotal}) t.events.DictionariesIndexTypeChanged[t.path] = t.indexTypes[t.currentIndex].Name() diff --git a/pkg/otel/common/schema/transform/dictionary_test.go b/pkg/otel/common/schema/transform/dictionary_test.go index a4365d7f..b3d6f6a1 100644 --- a/pkg/otel/common/schema/transform/dictionary_test.go +++ b/pkg/otel/common/schema/transform/dictionary_test.go @@ -51,7 +51,7 @@ func TestNoDictionary(t *testing.T) { func TestDictUint8Overflow(t *testing.T) { rbStats := &stats.RecordBuilderStats{} schemaUpdateRequest := update.NewSchemaUpdateRequest() - dictConfig := cfg.NewDictionary(math.MaxUint8) + dictConfig := cfg.NewDictionary(math.MaxUint8, 0.0) dict := NewDictionaryField("", "1", dictConfig, schemaUpdateRequest, evts) assert.Equal(t, arrow.PrimitiveTypes.Uint8, dict.IndexType(), "index type should be uint8") @@ -73,7 +73,7 @@ func TestDictUint8Overflow(t *testing.T) { func TestDictUint16Overflow(t *testing.T) { rbStats := &stats.RecordBuilderStats{} schemaUpdateRequest := update.NewSchemaUpdateRequest() - dictConfig := cfg.NewDictionary(math.MaxUint16) + dictConfig := cfg.NewDictionary(math.MaxUint16, 0.0) dict := NewDictionaryField("", "1", dictConfig, schemaUpdateRequest, evts) assert.Equal(t, arrow.PrimitiveTypes.Uint8, dict.IndexType(), "index type should be uint8") @@ -104,7 +104,7 @@ func TestDictUint16Overflow(t *testing.T) { func TestDictUint32Overflow(t *testing.T) { rbStats := &stats.RecordBuilderStats{} schemaUpdateRequest := update.NewSchemaUpdateRequest() - dictConfig := cfg.NewDictionary(math.MaxUint32) + dictConfig := cfg.NewDictionary(math.MaxUint32, 0.0) dict := NewDictionaryField("", "1", dictConfig, schemaUpdateRequest, evts) assert.Equal(t, arrow.PrimitiveTypes.Uint8, dict.IndexType(), "index type should be uint8") @@ -144,7 +144,7 @@ func TestDictUint32Overflow(t *testing.T) { func TestDictUint64Overflow(t *testing.T) { rbStats := &stats.RecordBuilderStats{} schemaUpdateRequest := update.NewSchemaUpdateRequest() - dictConfig := cfg.NewDictionary(math.MaxUint64) + dictConfig := cfg.NewDictionary(math.MaxUint64, 0.0) dict := NewDictionaryField("", "1", dictConfig, schemaUpdateRequest, evts) assert.Equal(t, arrow.PrimitiveTypes.Uint8, dict.IndexType(), "index type should be uint8") diff --git a/pkg/otel/common/schema/update/schema_update_request.go b/pkg/otel/common/schema/update/schema_update_request.go index a347625c..74d8a16e 100644 --- a/pkg/otel/common/schema/update/schema_update_request.go +++ b/pkg/otel/common/schema/update/schema_update_request.go @@ -52,6 +52,16 @@ type DictionaryUpgradeEvent struct { Total uint64 } +// DictionaryResetEvent is an event that is triggered when a dictionary is reset +// instead of being overflowed. This happens when dictionary entries are reused +// in average more than a specific threshold. +type DictionaryResetEvent struct { + FieldName string + IndexType arrow.DataType + Cardinality uint64 + Total uint64 +} + // DictionaryOverflowEvent is an event that is triggered when a dictionary // overflows its index type. type DictionaryOverflowEvent struct { @@ -122,6 +132,19 @@ func (e *DictionaryOverflowEvent) Notify(recordName string, observer observer.Pr ) } +func (e *DictionaryResetEvent) Notify(recordName string, observer observer.ProducerObserver) { + observer.OnDictionaryReset( + recordName, + e.FieldName, + e.IndexType, + e.Cardinality, + e.Total, + ) +} + func (e *MetadataEvent) Notify(recordName string, observer observer.ProducerObserver) { - // ToDo + observer.OnMetadataUpdate( + recordName, + e.MetadataKey, + ) } diff --git a/pkg/otel/logs/arrow/all_test.go b/pkg/otel/logs/arrow/all_test.go index 6c6e5d1f..2e2e88b8 100644 --- a/pkg/otel/logs/arrow/all_test.go +++ b/pkg/otel/logs/arrow/all_test.go @@ -39,7 +39,7 @@ import ( "github.com/open-telemetry/otel-arrow/pkg/record_message" ) -var DefaultDictConfig = cfg.NewDictionary(math.MaxUint16) +var DefaultDictConfig = cfg.NewDictionary(math.MaxUint16, 0.0) func TestLogs(t *testing.T) { t.Parallel() diff --git a/pkg/otel/logs/validation_test.go b/pkg/otel/logs/validation_test.go index 09905e9d..a0817475 100644 --- a/pkg/otel/logs/validation_test.go +++ b/pkg/otel/logs/validation_test.go @@ -39,7 +39,7 @@ import ( ) var ( - DefaultDictConfig = cfg.NewDictionary(math.MaxUint16) + DefaultDictConfig = cfg.NewDictionary(math.MaxUint16, 0.0) producerStats = stats.NewProducerStats() ) diff --git a/pkg/otel/metrics/validation_test.go b/pkg/otel/metrics/validation_test.go index 84842124..b6b457c3 100644 --- a/pkg/otel/metrics/validation_test.go +++ b/pkg/otel/metrics/validation_test.go @@ -38,7 +38,7 @@ import ( "github.com/open-telemetry/otel-arrow/pkg/record_message" ) -var DefaultDictConfig = cfg.NewDictionary(math.MaxUint16) +var DefaultDictConfig = cfg.NewDictionary(math.MaxUint16, 0.0) // TestMetricsEncodingDecoding tests the conversion of OTLP metrics to Arrow and back to OTLP. // The initial OTLP metrics are generated from a synthetic dataset. diff --git a/pkg/otel/observer/producer_observer.go b/pkg/otel/observer/producer_observer.go index f90b24f7..9d21a879 100644 --- a/pkg/otel/observer/producer_observer.go +++ b/pkg/otel/observer/producer_observer.go @@ -24,6 +24,14 @@ type ProducerObserver interface { // OnSchemaUpdate is called when the schema is updated. OnSchemaUpdate(recordName string, old, new *arrow.Schema) + // OnDictionaryReset is called when a dictionary is reset instead of being + // overflowed. This happens when dictionary entries are reused in average + // more than a specific threshold. + OnDictionaryReset(recordName string, fieldPath string, indexType arrow.DataType, card, total uint64) + + // OnMetadataUpdate is called when schema metadata are updated. + OnMetadataUpdate(recordName, metadataKey string) + // OnRecord is called when a record is produced. OnRecord(arrow.Record, record_message.PayloadType) } diff --git a/pkg/otel/traces/arrow/all_test.go b/pkg/otel/traces/arrow/all_test.go index f381aa60..4a391ef8 100644 --- a/pkg/otel/traces/arrow/all_test.go +++ b/pkg/otel/traces/arrow/all_test.go @@ -42,7 +42,7 @@ import ( ) var ( - DefaultDictConfig = cfg.NewDictionary(math.MaxUint16) + DefaultDictConfig = cfg.NewDictionary(math.MaxUint16, 0.0) ) func TestStatus(t *testing.T) { diff --git a/pkg/otel/traces/validation_test.go b/pkg/otel/traces/validation_test.go index de191f76..88900e06 100644 --- a/pkg/otel/traces/validation_test.go +++ b/pkg/otel/traces/validation_test.go @@ -41,7 +41,7 @@ import ( "github.com/open-telemetry/otel-arrow/pkg/record_message" ) -var DefaultDictConfig = cfg.NewDictionary(math.MaxUint16) +var DefaultDictConfig = cfg.NewDictionary(math.MaxUint16, 0.0) var ProducerStats = stats.NewProducerStats() // TestTracesEncodingDecoding tests the conversion of OTLP traces to OTel Arrow traces diff --git a/tools/mem_benchmark/main.go b/tools/mem_benchmark/main.go index 6d2ba50d..a1cc7667 100644 --- a/tools/mem_benchmark/main.go +++ b/tools/mem_benchmark/main.go @@ -72,7 +72,7 @@ func main() { Report("TRACES", traces.TracesSchema) } -var DictConfig = config.NewDictionary(math.MaxUint16) +var DictConfig = config.NewDictionary(math.MaxUint16, 0.0) func Report(name string, schema *arrow.Schema) { pool := memory.NewGoAllocator() diff --git a/tools/trace_producer_simu/main.go b/tools/trace_producer_simu/main.go index 6c5e6179..bceea892 100644 --- a/tools/trace_producer_simu/main.go +++ b/tools/trace_producer_simu/main.go @@ -19,8 +19,10 @@ import ( "fmt" "math" "os" + "testing" "github.com/apache/arrow/go/v12/arrow" + "github.com/apache/arrow/go/v12/arrow/memory" arrowpb "github.com/open-telemetry/otel-arrow/api/experimental/arrow/v1" "github.com/open-telemetry/otel-arrow/pkg/benchmark" @@ -58,7 +60,7 @@ func (o *SimObserver) OnDictionaryUpgrade(recordName string, fieldPath string, p o.Label = o.Label + fmt.Sprintf("DictUpgrade %s.%s", recordName, fieldPath) } func (o *SimObserver) OnDictionaryOverflow(recordName string, fieldPath string, card, total uint64) { - fmt.Printf("OnDictionaryOverflow: %s.%s (card: %d, total: %d)\n", recordName, fieldPath, card, total) + fmt.Printf("OnDictionaryOverflow: %s.%s (card: %d, total: %d, ratio: %f)\n", recordName, fieldPath, card, total, float64(card)/float64(total)) if o.Label != "" { o.Label += ", " } @@ -71,6 +73,20 @@ func (o *SimObserver) OnSchemaUpdate(recordName string, old, new *arrow.Schema) } o.Label = o.Label + "Schema Update" } +func (o *SimObserver) OnDictionaryReset(recordName string, fieldPath string, indexType arrow.DataType, card, total uint64) { + fmt.Printf("OnDictionaryReset: %s.%s (type: %s, card: %d, total: %d, ratio: %f)\n", recordName, fieldPath, indexType, card, total, float64(card)/float64(total)) + if o.Label != "" { + o.Label += ", " + } + o.Label = o.Label + fmt.Sprintf("DictReset %s.%s", recordName, fieldPath) +} +func (o *SimObserver) OnMetadataUpdate(recordName, metadataKey string) { + fmt.Printf("OnMetadataUpdate: %s (metadata-key: %s)\n", recordName, metadataKey) + if o.Label != "" { + o.Label += ", " + } + o.Label = o.Label + fmt.Sprintf("MetadataUpdate %s key=%s", recordName, metadataKey) +} // This command simulates an OTel Arrow producer running for different // configurations of batch size and stream duration. @@ -80,6 +96,8 @@ func main() { maxBatchesPerStream := flag.Int("max-batches-per-stream", 10, "Maximum number of batches per stream") verbose := flag.Bool("verbose", false, "Verbose mode") output := flag.String("output", "compression-efficiency-gain.csv", "Output file") + dictResetThreshold := flag.Float64("dict-reset-threshold", 0.3, "Dictionary reset threshold (0.3 by default)") + checkMemoryLeak := flag.Bool("check-memory-leak", false, "Check memory leak") // Statistics related flags (no statistics by default) schemaStats := flag.Bool("schema-stats", false, "Display Arrow schema statistics") @@ -168,16 +186,57 @@ func main() { commonOptions = append(commonOptions, config.WithDumpRecordRows(arrowpb.ArrowPayloadType_SPAN_LINK_ATTRS.String(), *spanLinkAttrs)) } + if *checkMemoryLeak { + // Create a closure having the signature of a test function. + run := func(t *testing.T) { + Run(t, dictResetThreshold, commonOptions, inputFiles, batchSize, maxBatchesPerStream, verbose, err, fOutput) + } + + testing.Main(matchString, []testing.InternalTest{ + {"Test", run}, + }, nil, nil) + } else { + Run(nil, dictResetThreshold, commonOptions, inputFiles, batchSize, maxBatchesPerStream, verbose, err, fOutput) + } +} + +func Run(t *testing.T, dictResetThreshold *float64, commonOptions []config.Option, inputFiles []string, batchSize *int, maxBatchesPerStream *int, verbose *bool, err error, fOutput *os.File) { + // Initialize the allocator based the memory leak check flag + // If t is nil -> standard Go allocator + // If t is not nil -> checked allocator + var pool memory.Allocator = memory.NewGoAllocator() + deferFunc := func() {} + + if t != nil { + checkedPool := memory.NewCheckedAllocator(pool) + deferFunc = func() { + checkedPool.AssertSize(t, 0) + println("No memory leak detected") + } + pool = checkedPool + println("Memory leak check enabled") + } + defer deferFunc() + simObserver := &SimObserver{} options := append([]config.Option{ config.WithZstd(), config.WithObserver(simObserver), + config.WithDictResetThreshold(*dictResetThreshold), + config.WithAllocator(pool), }, commonOptions...) var otlpProfile *otlp.TracesProfileable var otelArrowProfile *parrow.TracesProfileable batchesPerStreamCount := 0 + // Data structure to compute Compression Efficiency Gain (CEG) moving + // average + cegWindowSize := 50 + cegWindowData := make([]float64, cegWindowSize) + currentCegIndex := 0 + cegCount := 0 + for i := range inputFiles { ds := dataset.NewRealTraceDataset(inputFiles[i], benchmark.CompressionTypeZstd, "json", []string{"trace_id"}) fmt.Printf("Dataset '%s' loaded %d spans\n", inputFiles[i], ds.Len()) @@ -215,9 +274,25 @@ func main() { fmt.Printf(">>> OTLP compression ratio=%5.2f vs OTel_ARROW compression ratio=%5.2f (batch: #%06d)\n", float64(otlpUncompressed)/float64(otlpCompressed), float64(otlpUncompressed)/float64(otelArrowCompressed), batchesPerStreamCount) } } - otelArrowImprovement := 100.0 - (otlpCompressionImprovement/otelArrowCompressionImprovement)*100.0 - fmt.Printf("OTel Arrow compression improvement=%5.2f%% (batch: #%06d)\n", otelArrowImprovement, batchesPerStreamCount) - if _, err = fOutput.WriteString(fmt.Sprintf("%d\t%f\t%s\n", batchesPerStreamCount, otelArrowImprovement, simObserver.Label)); err != nil { + // Compute Compression Efficiency Gain (CEG) + ceg := 100.0 - (otlpCompressionImprovement/otelArrowCompressionImprovement)*100.0 + cegMovingAvg := 0.0 + cegWindowData[currentCegIndex] = ceg + currentCegIndex = (currentCegIndex + 1) % cegWindowSize + if cegCount < cegWindowSize { + cegCount++ + for i := 0; i < cegCount; i++ { + cegMovingAvg += cegWindowData[i] + } + cegMovingAvg /= float64(cegCount) + } else { + for i := 0; i < cegWindowSize; i++ { + cegMovingAvg += cegWindowData[i] + } + cegMovingAvg /= float64(cegWindowSize) + } + fmt.Printf("OTel Arrow Compression Efficiency Gain(CEG)=%5.2f%%, CEG moving avg=%5.2f%% (batch: #%06d)\n", ceg, cegMovingAvg, batchesPerStreamCount) + if _, err = fOutput.WriteString(fmt.Sprintf("%d\t%f\t%s\n", batchesPerStreamCount, ceg, simObserver.Label)); err != nil { panic(err) } simObserver.Label = "" @@ -230,6 +305,10 @@ func main() { otelArrowProfile.EndProfiling(os.Stdout) } +func matchString(a, b string) (bool, error) { + return a == b, nil +} + func OtlpStream(prevStream *otlp.TracesProfileable, batchSize int) *otlp.TracesProfileable { if prevStream != nil { prevStream.EndProfiling(os.Stdout)