Skip to content

Commit

Permalink
fix: fix compression rate degradation happening after a dictionary ov…
Browse files Browse the repository at this point in the history
…erflow event for some workload
  • Loading branch information
lquerel committed Nov 4, 2023
1 parent 0a19246 commit e4ebf06
Show file tree
Hide file tree
Showing 18 changed files with 218 additions and 38 deletions.
30 changes: 27 additions & 3 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,14 @@ type Config struct {
// LimitIndexSize sets the maximum size of a dictionary index
// before it is no longer encoded as a dictionary.
LimitIndexSize uint64
// DictResetThreshold specifies the ratio under which a dictionary overflow
// is converted to a dictionary reset. This ratio is calculated as:
// (# of unique values in the dict) / (# of values inserted in the dict.)
// This ratio characterizes the efficiency of the dictionary. Smaller is the
// ratio, more efficient is the dictionary in term compression ratio because
// it means that the dictionary entries are reused more often.
DictResetThreshold float64

// Zstd enables the use of ZSTD compression for IPC messages.
Zstd bool // Use IPC ZSTD compression

Expand Down Expand Up @@ -145,13 +153,19 @@ type Option func(*Config)
// - Zstd: true
func DefaultConfig() *Config {
return &Config{
Pool: memory.NewGoAllocator(),
Pool: memory.NewGoAllocator(),

InitIndexSize: math.MaxUint16,
// The default dictionary index limit is set to 2^16 - 1
// to keep the overall memory usage of the encoder and decoder low.
LimitIndexSize: math.MaxUint16,
SchemaStats: false,
Zstd: true,
// The default dictionary reset threshold is set to 0.3 based on
// empirical observations. I suggest to run more controlled experiments
// to find a more optimal value for the majority of workloads.
DictResetThreshold: 0.3,

SchemaStats: false,
Zstd: true,

OrderSpanBy: OrderSpanByNameTraceID,
OrderAttrs16By: OrderAttrs16ByTypeKeyValueParentId,
Expand Down Expand Up @@ -319,3 +333,13 @@ func WithObserver(observer observer.ProducerObserver) Option {
cfg.Observer = observer
}
}

// WithDictResetThreshold sets the ratio under which a dictionary overflow
// is converted to a dictionary reset. This ratio is calculated as:
//
// (# of unique values in the dict) / (# of values inserted in the dict.)
func WithDictResetThreshold(dictResetThreshold float64) Option {
return func(cfg *Config) {
cfg.DictResetThreshold = dictResetThreshold
}
}
30 changes: 27 additions & 3 deletions pkg/otel/arrow_record/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,31 @@ func NewProducerWithOptions(options ...cfg.Option) *Producer {
stats.ProducerStats = conf.ProducerStats

// Record builders
metricsRecordBuilder := builder.NewRecordBuilderExt(conf.Pool, metricsarrow.MetricsSchema, config.NewDictionary(conf.LimitIndexSize), stats, conf.Observer)
metricsRecordBuilder := builder.NewRecordBuilderExt(
conf.Pool,
metricsarrow.MetricsSchema,
config.NewDictionary(conf.LimitIndexSize, conf.DictResetThreshold),
stats,
conf.Observer,
)
metricsRecordBuilder.SetLabel("metrics")
logsRecordBuilder := builder.NewRecordBuilderExt(conf.Pool, logsarrow.LogsSchema, config.NewDictionary(conf.LimitIndexSize), stats, conf.Observer)

logsRecordBuilder := builder.NewRecordBuilderExt(
conf.Pool,
logsarrow.LogsSchema,
config.NewDictionary(conf.LimitIndexSize, conf.DictResetThreshold),
stats,
conf.Observer,
)
logsRecordBuilder.SetLabel("logs")
tracesRecordBuilder := builder.NewRecordBuilderExt(conf.Pool, tracesarrow.TracesSchema, config.NewDictionary(conf.LimitIndexSize), stats, conf.Observer)

tracesRecordBuilder := builder.NewRecordBuilderExt(
conf.Pool,
tracesarrow.TracesSchema,
config.NewDictionary(conf.LimitIndexSize, conf.DictResetThreshold),
stats,
conf.Observer,
)
tracesRecordBuilder.SetLabel("traces")

// Entity builders
Expand Down Expand Up @@ -574,3 +594,7 @@ func (o *consoleObserver) OnDictionaryOverflow(recordName string, fieldPath stri
}

func (o *consoleObserver) OnSchemaUpdate(recordName string, old, new *arrow.Schema) {}

func (o *consoleObserver) OnDictionaryReset(recordName string, fieldPath string, indexType arrow.DataType, card, total uint64) {
}
func (o *consoleObserver) OnMetadataUpdate(recordName, metadataKey string) {}
2 changes: 1 addition & 1 deletion pkg/otel/common/arrow/all_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"github.com/open-telemetry/otel-arrow/pkg/otel/stats"
)

var DefaultDictConfig = cfg.NewDictionary(math.MaxUint16)
var DefaultDictConfig = cfg.NewDictionary(math.MaxUint16, 0.0)
var ProducerStats = stats.NewProducerStats()

func TestAttributesBuilder(t *testing.T) {
Expand Down
8 changes: 7 additions & 1 deletion pkg/otel/common/arrow/related_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,13 @@ func (m *RelatedRecordsManager) Declare(
rrBuilder func(b *builder.RecordBuilderExt) RelatedRecordBuilder,
observer observer.ProducerObserver,
) RelatedRecordBuilder {
builderExt := builder.NewRecordBuilderExt(m.cfg.Pool, schema, config.NewDictionary(m.cfg.LimitIndexSize), m.stats, observer)
builderExt := builder.NewRecordBuilderExt(
m.cfg.Pool,
schema,
config.NewDictionary(m.cfg.LimitIndexSize, m.cfg.DictResetThreshold),
m.stats,
observer,
)
builderExt.SetLabel(payloadType.SchemaPrefix())
rBuilder := rrBuilder(builderExt)
m.builders = append(m.builders, rBuilder)
Expand Down
2 changes: 1 addition & 1 deletion pkg/otel/common/arrow_test/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ var (
producerStats = stats.NewProducerStats()
)

var DictConfig = config.NewDictionary(math.MaxUint16)
var DictConfig = config.NewDictionary(math.MaxUint16, 0.0)

func TestTimestampOnly(t *testing.T) {
pool := memory.NewCheckedAllocator(memory.NewGoAllocator())
Expand Down
2 changes: 1 addition & 1 deletion pkg/otel/common/otlp/any_value_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
"github.com/open-telemetry/otel-arrow/pkg/otel/stats"
)

var DefaultDictConfig = cfg.NewDictionary(math.MaxUint16)
var DefaultDictConfig = cfg.NewDictionary(math.MaxUint16, 0.0)
var producerStats = stats.NewProducerStats()

func TestEmptyAnyValue(t *testing.T) {
Expand Down
17 changes: 10 additions & 7 deletions pkg/otel/common/schema/config/dictionary.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@ import "math"
// if MaxCard is equal to 0, then the dictionary field will be converted to its
// base type no matter what.
type Dictionary struct {
MinCard uint64
MaxCard uint64
MinCard uint64
MaxCard uint64
ResetThreshold float64
}

// NewDictionary creates a new dictionary configuration with the given maximum
// cardinality.
func NewDictionary(maxCard uint64) *Dictionary {
func NewDictionary(maxCard uint64, resetThreshold float64) *Dictionary {
// If `maxCard` is 0 (no dictionary configuration), then the dictionary
// field will be converted to its base type no matter what. So, the minimum
// cardinality will be set to 0.
Expand All @@ -42,8 +43,9 @@ func NewDictionary(maxCard uint64) *Dictionary {
minCard = maxCard
}
return &Dictionary{
MinCard: minCard,
MaxCard: maxCard,
MinCard: minCard,
MaxCard: maxCard,
ResetThreshold: resetThreshold,
}
}

Expand All @@ -57,7 +59,8 @@ func NewDictionaryFrom(minCard uint64, dicProto *Dictionary) *Dictionary {
minCard = dicProto.MaxCard
}
return &Dictionary{
MinCard: minCard,
MaxCard: dicProto.MaxCard,
MinCard: minCard,
MaxCard: dicProto.MaxCard,
ResetThreshold: dicProto.ResetThreshold,
}
}
25 changes: 19 additions & 6 deletions pkg/otel/common/schema/transform/dictionary.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ var (
// a given index type.
// If the index type is nil, the dictionary is downgraded to its value type.
type DictionaryField struct {
// Configuration for this dictionary.
// This configuration could be shared between multiple dictionaries.
config *cfg.Dictionary

// Path of the dictionary field.
path string

// Dictionary ID
Expand Down Expand Up @@ -71,6 +76,7 @@ func NewDictionaryField(
events *events.Events,
) *DictionaryField {
df := DictionaryField{
config: config,
path: path,
DictID: dictID,
cardinality: 0,
Expand Down Expand Up @@ -172,12 +178,19 @@ func (t *DictionaryField) updateIndexType(stats *stats.RecordBuilderStats) {
t.currentIndex++
}
if t.currentIndex >= len(t.indexTypes) {
t.indexTypes = nil
t.indexMaxCard = nil
t.currentIndex = 0
t.schemaUpdateRequest.Inc(&update.DictionaryOverflowEvent{FieldName: t.path, PrevIndexType: prevIndexType, NewIndexType: t.IndexType(), Cardinality: t.cardinality, Total: t.cumulativeTotal})
t.events.DictionariesWithOverflow[t.path] = true
stats.DictionaryOverflowDetected++
ratio := float64(t.cardinality) / float64(t.cumulativeTotal)
if ratio < t.config.ResetThreshold {
t.currentIndex = len(t.indexTypes) - 1
t.schemaUpdateRequest.Inc(&update.DictionaryResetEvent{FieldName: t.path, IndexType: t.IndexType(), Cardinality: t.cardinality, Total: t.cumulativeTotal})
t.cumulativeTotal = 0
} else {
t.indexTypes = nil
t.indexMaxCard = nil
t.currentIndex = 0
t.schemaUpdateRequest.Inc(&update.DictionaryOverflowEvent{FieldName: t.path, PrevIndexType: prevIndexType, NewIndexType: t.IndexType(), Cardinality: t.cardinality, Total: t.cumulativeTotal})
t.events.DictionariesWithOverflow[t.path] = true
stats.DictionaryOverflowDetected++
}
} else if t.currentIndex != currentIndex {
t.schemaUpdateRequest.Inc(&update.DictionaryUpgradeEvent{FieldName: t.path, PrevIndexType: prevIndexType, NewIndexType: t.IndexType(), Cardinality: t.cardinality, Total: t.cumulativeTotal})
t.events.DictionariesIndexTypeChanged[t.path] = t.indexTypes[t.currentIndex].Name()
Expand Down
8 changes: 4 additions & 4 deletions pkg/otel/common/schema/transform/dictionary_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestNoDictionary(t *testing.T) {
func TestDictUint8Overflow(t *testing.T) {
rbStats := &stats.RecordBuilderStats{}
schemaUpdateRequest := update.NewSchemaUpdateRequest()
dictConfig := cfg.NewDictionary(math.MaxUint8)
dictConfig := cfg.NewDictionary(math.MaxUint8, 0.0)

dict := NewDictionaryField("", "1", dictConfig, schemaUpdateRequest, evts)
assert.Equal(t, arrow.PrimitiveTypes.Uint8, dict.IndexType(), "index type should be uint8")
Expand All @@ -73,7 +73,7 @@ func TestDictUint8Overflow(t *testing.T) {
func TestDictUint16Overflow(t *testing.T) {
rbStats := &stats.RecordBuilderStats{}
schemaUpdateRequest := update.NewSchemaUpdateRequest()
dictConfig := cfg.NewDictionary(math.MaxUint16)
dictConfig := cfg.NewDictionary(math.MaxUint16, 0.0)

dict := NewDictionaryField("", "1", dictConfig, schemaUpdateRequest, evts)
assert.Equal(t, arrow.PrimitiveTypes.Uint8, dict.IndexType(), "index type should be uint8")
Expand Down Expand Up @@ -104,7 +104,7 @@ func TestDictUint16Overflow(t *testing.T) {
func TestDictUint32Overflow(t *testing.T) {
rbStats := &stats.RecordBuilderStats{}
schemaUpdateRequest := update.NewSchemaUpdateRequest()
dictConfig := cfg.NewDictionary(math.MaxUint32)
dictConfig := cfg.NewDictionary(math.MaxUint32, 0.0)

dict := NewDictionaryField("", "1", dictConfig, schemaUpdateRequest, evts)
assert.Equal(t, arrow.PrimitiveTypes.Uint8, dict.IndexType(), "index type should be uint8")
Expand Down Expand Up @@ -144,7 +144,7 @@ func TestDictUint32Overflow(t *testing.T) {
func TestDictUint64Overflow(t *testing.T) {
rbStats := &stats.RecordBuilderStats{}
schemaUpdateRequest := update.NewSchemaUpdateRequest()
dictConfig := cfg.NewDictionary(math.MaxUint64)
dictConfig := cfg.NewDictionary(math.MaxUint64, 0.0)

dict := NewDictionaryField("", "1", dictConfig, schemaUpdateRequest, evts)
assert.Equal(t, arrow.PrimitiveTypes.Uint8, dict.IndexType(), "index type should be uint8")
Expand Down
25 changes: 24 additions & 1 deletion pkg/otel/common/schema/update/schema_update_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,16 @@ type DictionaryUpgradeEvent struct {
Total uint64
}

// DictionaryResetEvent is an event that is triggered when a dictionary is reset
// instead of being overflowed. This happens when dictionary entries are reused
// in average more than a specific threshold.
type DictionaryResetEvent struct {
FieldName string
IndexType arrow.DataType
Cardinality uint64
Total uint64
}

// DictionaryOverflowEvent is an event that is triggered when a dictionary
// overflows its index type.
type DictionaryOverflowEvent struct {
Expand Down Expand Up @@ -122,6 +132,19 @@ func (e *DictionaryOverflowEvent) Notify(recordName string, observer observer.Pr
)
}

func (e *DictionaryResetEvent) Notify(recordName string, observer observer.ProducerObserver) {
observer.OnDictionaryReset(
recordName,
e.FieldName,
e.IndexType,
e.Cardinality,
e.Total,
)
}

func (e *MetadataEvent) Notify(recordName string, observer observer.ProducerObserver) {
// ToDo
observer.OnMetadataUpdate(
recordName,
e.MetadataKey,
)
}
2 changes: 1 addition & 1 deletion pkg/otel/logs/arrow/all_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (
"github.com/open-telemetry/otel-arrow/pkg/record_message"
)

var DefaultDictConfig = cfg.NewDictionary(math.MaxUint16)
var DefaultDictConfig = cfg.NewDictionary(math.MaxUint16, 0.0)

func TestLogs(t *testing.T) {
t.Parallel()
Expand Down
2 changes: 1 addition & 1 deletion pkg/otel/logs/validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (
)

var (
DefaultDictConfig = cfg.NewDictionary(math.MaxUint16)
DefaultDictConfig = cfg.NewDictionary(math.MaxUint16, 0.0)
producerStats = stats.NewProducerStats()
)

Expand Down
2 changes: 1 addition & 1 deletion pkg/otel/metrics/validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (
"github.com/open-telemetry/otel-arrow/pkg/record_message"
)

var DefaultDictConfig = cfg.NewDictionary(math.MaxUint16)
var DefaultDictConfig = cfg.NewDictionary(math.MaxUint16, 0.0)

// TestMetricsEncodingDecoding tests the conversion of OTLP metrics to Arrow and back to OTLP.
// The initial OTLP metrics are generated from a synthetic dataset.
Expand Down
8 changes: 8 additions & 0 deletions pkg/otel/observer/producer_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,14 @@ type ProducerObserver interface {
// OnSchemaUpdate is called when the schema is updated.
OnSchemaUpdate(recordName string, old, new *arrow.Schema)

// OnDictionaryReset is called when a dictionary is reset instead of being
// overflowed. This happens when dictionary entries are reused in average
// more than a specific threshold.
OnDictionaryReset(recordName string, fieldPath string, indexType arrow.DataType, card, total uint64)

// OnMetadataUpdate is called when schema metadata are updated.
OnMetadataUpdate(recordName, metadataKey string)

// OnRecord is called when a record is produced.
OnRecord(arrow.Record, record_message.PayloadType)
}
2 changes: 1 addition & 1 deletion pkg/otel/traces/arrow/all_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import (
)

var (
DefaultDictConfig = cfg.NewDictionary(math.MaxUint16)
DefaultDictConfig = cfg.NewDictionary(math.MaxUint16, 0.0)
)

func TestStatus(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/otel/traces/validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import (
"github.com/open-telemetry/otel-arrow/pkg/record_message"
)

var DefaultDictConfig = cfg.NewDictionary(math.MaxUint16)
var DefaultDictConfig = cfg.NewDictionary(math.MaxUint16, 0.0)
var ProducerStats = stats.NewProducerStats()

// TestTracesEncodingDecoding tests the conversion of OTLP traces to OTel Arrow traces
Expand Down
2 changes: 1 addition & 1 deletion tools/mem_benchmark/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func main() {
Report("TRACES", traces.TracesSchema)
}

var DictConfig = config.NewDictionary(math.MaxUint16)
var DictConfig = config.NewDictionary(math.MaxUint16, 0.0)

func Report(name string, schema *arrow.Schema) {
pool := memory.NewGoAllocator()
Expand Down
Loading

0 comments on commit e4ebf06

Please sign in to comment.