Skip to content

Commit

Permalink
Fix compression rate degradation occurring after a dictionary overflo…
Browse files Browse the repository at this point in the history
…w for some workloads (#82)

This PR addresses the progressive degradation of the compression rate
observed in Lightstep's span-type data over time. To facilitate a deeper
understanding of the dynamics at play, a comprehensive suite of
instrumentation has been implemented, targeting the analysis of the
average compression rate in response to various schema modifications.

Additionally, a new CLI command has been introduced, expanding the
simulation capabilities to encompass diverse OTel Arrow stream life
cycles, including variations in batch sizes and the number of batches
per stream.

The root cause of the diminishing compression rate has been identified
as a dictionary overflow event. This overflow triggered an automatic
fallback to a column without dictionary encoding—a standard and often
appropriate response. However, scenarios have been identified where
maintaining dictionary encoding and resetting the dictionary may be more
beneficial. To ascertain the optimal response, a ratio is employed: the
number of distinct values in the dictionary divided by the number of
values inserted into the dictionary. This ratio serves as an indicator,
dictating when a dictionary reset is preferable over the default
fallback procedure. Empirical analysis has resulted in setting the
default threshold for this ratio at 0.3. Further research may refine
this threshold, adopting a more systematic approach to its
determination.

The following chart provides a comparative analysis, showcasing the
optimization's impact on compression efficiency gains before and after
its implementation.


![compression-efficiency-gain-after-optimization](https://github.com/open-telemetry/otel-arrow/assets/657994/ed3375c3-a553-476f-807f-2b45f6e29b6c)
  • Loading branch information
lquerel authored Nov 6, 2023
1 parent e7d4f6a commit bb84a55
Show file tree
Hide file tree
Showing 51 changed files with 1,062 additions and 249 deletions.
5 changes: 3 additions & 2 deletions pkg/benchmark/profileable/arrow/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/open-telemetry/otel-arrow/pkg/benchmark/dataset"
cfg "github.com/open-telemetry/otel-arrow/pkg/config"
"github.com/open-telemetry/otel-arrow/pkg/otel/arrow_record"
"github.com/open-telemetry/otel-arrow/pkg/otel/observer"
)

const OtelArrow = "OTel_ARROW"
Expand All @@ -45,7 +46,7 @@ type LogsProfileable struct {
stats bool
logsProducerOptions []cfg.Option

observer arrow_record.ProducerObserver
observer observer.ProducerObserver
}

func NewLogsProfileable(tags []string, dataset dataset.LogsDataset, config *benchmark.Config) *LogsProfileable {
Expand Down Expand Up @@ -77,7 +78,7 @@ func NewLogsProfileable(tags []string, dataset dataset.LogsDataset, config *benc
}
}

func (s *LogsProfileable) SetObserver(observer arrow_record.ProducerObserver) {
func (s *LogsProfileable) SetObserver(observer observer.ProducerObserver) {
s.observer = observer
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/benchmark/profileable/arrow/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/open-telemetry/otel-arrow/pkg/benchmark/dataset"
cfg "github.com/open-telemetry/otel-arrow/pkg/config"
"github.com/open-telemetry/otel-arrow/pkg/otel/arrow_record"
"github.com/open-telemetry/otel-arrow/pkg/otel/observer"
)

type MetricsProfileable struct {
Expand All @@ -42,7 +43,7 @@ type MetricsProfileable struct {
unaryRpcMode bool
options []cfg.Option

observer arrow_record.ProducerObserver
observer observer.ProducerObserver
}

func NewMetricsProfileable(tags []string, dataset dataset.MetricsDataset, config *benchmark.Config) *MetricsProfileable {
Expand Down Expand Up @@ -81,7 +82,7 @@ func (s *MetricsProfileable) EnableUnaryRpcMode() {
s.unaryRpcMode = true
}

func (s *MetricsProfileable) SetObserver(observer arrow_record.ProducerObserver) {
func (s *MetricsProfileable) SetObserver(observer observer.ProducerObserver) {
s.observer = observer
}

Expand Down
22 changes: 20 additions & 2 deletions pkg/benchmark/profileable/arrow/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/open-telemetry/otel-arrow/pkg/benchmark/dataset"
cfg "github.com/open-telemetry/otel-arrow/pkg/config"
"github.com/open-telemetry/otel-arrow/pkg/otel/arrow_record"
"github.com/open-telemetry/otel-arrow/pkg/otel/observer"
)

type TracesProfileable struct {
Expand All @@ -43,7 +44,20 @@ type TracesProfileable struct {
stats bool
tracesProducerOptions []cfg.Option

observer arrow_record.ProducerObserver
observer observer.ProducerObserver
}

func WithOption(tags []string, options ...cfg.Option) *TracesProfileable {
return &TracesProfileable{
tags: tags,
compression: benchmark.Zstd(),
producer: arrow_record.NewProducerWithOptions(options...),
consumer: arrow_record.NewConsumer(),
batchArrowRecords: make([]*v1.BatchArrowRecords, 0, 10),
pool: memory.NewGoAllocator(),
unaryRpcMode: false,
tracesProducerOptions: options,
}
}

func NewTraceProfileable(tags []string, dataset dataset.TraceDataset, config *benchmark.Config) *TracesProfileable {
Expand Down Expand Up @@ -73,7 +87,11 @@ func NewTraceProfileable(tags []string, dataset dataset.TraceDataset, config *be
}
}

func (s *TracesProfileable) SetObserver(observer arrow_record.ProducerObserver) {
func (s *TracesProfileable) SetDataset(dataset dataset.TraceDataset) {
s.dataset = dataset
}

func (s *TracesProfileable) SetObserver(observer observer.ProducerObserver) {
s.observer = observer
}

Expand Down
8 changes: 8 additions & 0 deletions pkg/benchmark/profileable/otlp/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ type TracesProfileable struct {
traces []ptrace.Traces
}

func New(compression benchmark.CompressionAlgorithm) *TracesProfileable {
return &TracesProfileable{compression: compression}
}

func NewTraceProfileable(dataset dataset.TraceDataset, compression benchmark.CompressionAlgorithm) *TracesProfileable {
return &TracesProfileable{dataset: dataset, compression: compression}
}
Expand All @@ -48,6 +52,10 @@ func (s *TracesProfileable) CompressionAlgorithm() benchmark.CompressionAlgorith
return s.compression
}

func (s *TracesProfileable) SetDataset(dataset dataset.TraceDataset) {
s.dataset = dataset
}

func (s *TracesProfileable) StartProfiling(io.Writer) {}

func (s *TracesProfileable) EndProfiling(io.Writer) {}
Expand Down
42 changes: 39 additions & 3 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"math"

"github.com/apache/arrow/go/v12/arrow/memory"

"github.com/open-telemetry/otel-arrow/pkg/otel/observer"
)

type OrderSpanBy int8
Expand Down Expand Up @@ -103,6 +105,14 @@ type Config struct {
// LimitIndexSize sets the maximum size of a dictionary index
// before it is no longer encoded as a dictionary.
LimitIndexSize uint64
// DictResetThreshold specifies the ratio under which a dictionary overflow
// is converted to a dictionary reset. This ratio is calculated as:
// (# of unique values in the dict) / (# of values inserted in the dict.)
// This ratio characterizes the efficiency of the dictionary. Smaller is the
// ratio, more efficient is the dictionary in term compression ratio because
// it means that the dictionary entries are reused more often.
DictResetThreshold float64

// Zstd enables the use of ZSTD compression for IPC messages.
Zstd bool // Use IPC ZSTD compression

Expand All @@ -128,6 +138,9 @@ type Config struct {
// OrderAttrs32By specifies how to order attributes in a batch
// (with 32bits attribute ID).
OrderAttrs32By OrderAttrs32By

// Observer is the optional observer to use for the producer.
Observer observer.ProducerObserver
}

type Option func(*Config)
Expand All @@ -140,13 +153,19 @@ type Option func(*Config)
// - Zstd: true
func DefaultConfig() *Config {
return &Config{
Pool: memory.NewGoAllocator(),
Pool: memory.NewGoAllocator(),

InitIndexSize: math.MaxUint16,
// The default dictionary index limit is set to 2^16 - 1
// to keep the overall memory usage of the encoder and decoder low.
LimitIndexSize: math.MaxUint16,
SchemaStats: false,
Zstd: true,
// The default dictionary reset threshold is set to 0.3 based on
// empirical observations. I suggest to run more controlled experiments
// to find a more optimal value for the majority of workloads.
DictResetThreshold: 0.3,

SchemaStats: false,
Zstd: true,

OrderSpanBy: OrderSpanByNameTraceID,
OrderAttrs16By: OrderAttrs16ByTypeKeyValueParentId,
Expand Down Expand Up @@ -307,3 +326,20 @@ func WithOrderAttrs16By(orderAttrs16By OrderAttrs16By) Option {
cfg.OrderAttrs16By = orderAttrs16By
}
}

// WithObserver sets the optional observer to use for the producer.
func WithObserver(observer observer.ProducerObserver) Option {
return func(cfg *Config) {
cfg.Observer = observer
}
}

// WithDictResetThreshold sets the ratio under which a dictionary overflow
// is converted to a dictionary reset. This ratio is calculated as:
//
// (# of unique values in the dict) / (# of values inserted in the dict.)
func WithDictResetThreshold(dictResetThreshold float64) Option {
return func(cfg *Config) {
cfg.DictResetThreshold = dictResetThreshold
}
}
74 changes: 56 additions & 18 deletions pkg/otel/arrow_record/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
config "github.com/open-telemetry/otel-arrow/pkg/otel/common/schema/config"
logsarrow "github.com/open-telemetry/otel-arrow/pkg/otel/logs/arrow"
metricsarrow "github.com/open-telemetry/otel-arrow/pkg/otel/metrics/arrow"
"github.com/open-telemetry/otel-arrow/pkg/otel/observer"
pstats "github.com/open-telemetry/otel-arrow/pkg/otel/stats"
tracesarrow "github.com/open-telemetry/otel-arrow/pkg/otel/traces/arrow"
"github.com/open-telemetry/otel-arrow/pkg/record_message"
Expand Down Expand Up @@ -82,11 +83,7 @@ type (
stats *pstats.ProducerStats

// Producer observer
observer ProducerObserver
}

ProducerObserver interface {
OnRecord(arrow.Record, record_message.PayloadType)
observer observer.ProducerObserver
}

consoleObserver struct {
Expand Down Expand Up @@ -135,20 +132,40 @@ func NewProducerWithOptions(options ...cfg.Option) *Producer {
stats.ProducerStats = conf.ProducerStats

// Record builders
metricsRecordBuilder := builder.NewRecordBuilderExt(conf.Pool, metricsarrow.MetricsSchema, config.NewDictionary(conf.LimitIndexSize), stats)
metricsRecordBuilder := builder.NewRecordBuilderExt(
conf.Pool,
metricsarrow.MetricsSchema,
config.NewDictionary(conf.LimitIndexSize, conf.DictResetThreshold),
stats,
conf.Observer,
)
metricsRecordBuilder.SetLabel("metrics")
logsRecordBuilder := builder.NewRecordBuilderExt(conf.Pool, logsarrow.LogsSchema, config.NewDictionary(conf.LimitIndexSize), stats)

logsRecordBuilder := builder.NewRecordBuilderExt(
conf.Pool,
logsarrow.LogsSchema,
config.NewDictionary(conf.LimitIndexSize, conf.DictResetThreshold),
stats,
conf.Observer,
)
logsRecordBuilder.SetLabel("logs")
tracesRecordBuilder := builder.NewRecordBuilderExt(conf.Pool, tracesarrow.TracesSchema, config.NewDictionary(conf.LimitIndexSize), stats)

tracesRecordBuilder := builder.NewRecordBuilderExt(
conf.Pool,
tracesarrow.TracesSchema,
config.NewDictionary(conf.LimitIndexSize, conf.DictResetThreshold),
stats,
conf.Observer,
)
tracesRecordBuilder.SetLabel("traces")

// Entity builders
metricsBuilder, err := metricsarrow.NewMetricsBuilder(metricsRecordBuilder, metricsarrow.NewConfig(conf), stats)
metricsBuilder, err := metricsarrow.NewMetricsBuilder(metricsRecordBuilder, metricsarrow.NewConfig(conf), stats, conf.Observer)
if err != nil {
panic(err)
}

logsBuilder, err := logsarrow.NewLogsBuilder(logsRecordBuilder, logsarrow.NewConfig(conf), stats)
logsBuilder, err := logsarrow.NewLogsBuilder(logsRecordBuilder, logsarrow.NewConfig(conf), stats, conf.Observer)
if err != nil {
panic(err)
}
Expand All @@ -163,7 +180,7 @@ func NewProducerWithOptions(options ...cfg.Option) *Producer {
traceCfg.Attrs.Event.Sorter = acommon.Attrs32FindOrderByFunc(conf.OrderAttrs32By)
traceCfg.Attrs.Link.Sorter = acommon.Attrs32FindOrderByFunc(conf.OrderAttrs32By)

tracesBuilder, err := tracesarrow.NewTracesBuilder(tracesRecordBuilder, traceCfg, stats)
tracesBuilder, err := tracesarrow.NewTracesBuilder(tracesRecordBuilder, traceCfg, stats, conf.Observer)
if err != nil {
panic(err)
}
Expand All @@ -182,12 +199,13 @@ func NewProducerWithOptions(options ...cfg.Option) *Producer {
logsRecordBuilder: logsRecordBuilder,
tracesRecordBuilder: tracesRecordBuilder,

stats: stats,
stats: stats,
observer: conf.Observer,
}
}

// SetObserver adds an observer to the producer.
func (p *Producer) SetObserver(observer ProducerObserver) {
func (p *Producer) SetObserver(observer observer.ProducerObserver) {
p.observer = observer
}

Expand All @@ -201,7 +219,7 @@ func (p *Producer) BatchArrowRecordsFromMetrics(metrics pmetric.Metrics) (*colar
// This is especially important after a schema update.
p.metricsBuilder.RelatedData().Reset()
return p.metricsBuilder, nil
}, metrics)
}, metrics, p.observer)
if err != nil {
return nil, werror.Wrap(err)
}
Expand Down Expand Up @@ -234,7 +252,7 @@ func (p *Producer) BatchArrowRecordsFromLogs(ls plog.Logs) (*colarspb.BatchArrow
record, err := recordBuilder[plog.Logs](func() (acommon.EntityBuilder[plog.Logs], error) {
p.logsBuilder.RelatedData().Reset()
return p.logsBuilder, nil
}, ls)
}, ls, p.observer)
if err != nil {
return nil, werror.Wrap(err)
}
Expand Down Expand Up @@ -265,7 +283,7 @@ func (p *Producer) BatchArrowRecordsFromTraces(ts ptrace.Traces) (*colarspb.Batc
record, err := recordBuilder[ptrace.Traces](func() (acommon.EntityBuilder[ptrace.Traces], error) {
p.tracesBuilder.RelatedData().Reset()
return p.tracesBuilder, nil
}, ts)
}, ts, p.observer)
if err != nil {
return nil, werror.Wrap(err)
}
Expand Down Expand Up @@ -501,7 +519,13 @@ func (p *Producer) RecordSizeStats() map[string]*pstats.RecordSizeStats {
return p.stats.RecordSizeStats()
}

func recordBuilder[T pmetric.Metrics | plog.Logs | ptrace.Traces](builder func() (acommon.EntityBuilder[T], error), entity T) (record arrow.Record, err error) {
// recordBuilder is a generic function that builds an Arrow Record from an OTel
// entity.
func recordBuilder[T pmetric.Metrics | plog.Logs | ptrace.Traces](
builder func() (acommon.EntityBuilder[T], error),
entity T,
observer observer.ProducerObserver,
) (record arrow.Record, err error) {
schemaNotUpToDateCount := 0

// Build an Arrow Record from an OTEL entity.
Expand Down Expand Up @@ -541,7 +565,7 @@ func recordBuilder[T pmetric.Metrics | plog.Logs | ptrace.Traces](builder func()
return record, werror.Wrap(err)
}

func NewConsoleObserver(maxRows, maxPrints int) ProducerObserver {
func NewConsoleObserver(maxRows, maxPrints int) observer.ProducerObserver {
return &consoleObserver{
maxRows: maxRows,
maxPrints: maxPrints,
Expand All @@ -560,3 +584,17 @@ func (o *consoleObserver) OnRecord(record arrow.Record, payloadType record_messa
carrow.PrintRecordWithProgression(payloadType.String(), record, o.maxRows, count, o.maxPrints)
}
}

func (o *consoleObserver) OnNewField(recordName string, fieldPath string) {}

func (o *consoleObserver) OnDictionaryUpgrade(recordName string, fieldPath string, prevIndexType, newIndexType arrow.DataType, card, total uint64) {
}

func (o *consoleObserver) OnDictionaryOverflow(recordName string, fieldPath string, card, total uint64) {
}

func (o *consoleObserver) OnSchemaUpdate(recordName string, old, new *arrow.Schema) {}

func (o *consoleObserver) OnDictionaryReset(recordName string, fieldPath string, indexType arrow.DataType, card, total uint64) {
}
func (o *consoleObserver) OnMetadataUpdate(recordName, metadataKey string) {}
4 changes: 2 additions & 2 deletions pkg/otel/common/arrow/all_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"github.com/open-telemetry/otel-arrow/pkg/otel/stats"
)

var DefaultDictConfig = cfg.NewDictionary(math.MaxUint16)
var DefaultDictConfig = cfg.NewDictionary(math.MaxUint16, 0.0)
var ProducerStats = stats.NewProducerStats()

func TestAttributesBuilder(t *testing.T) {
Expand All @@ -43,7 +43,7 @@ func TestAttributesBuilder(t *testing.T) {
schema := arrow.NewSchema([]arrow.Field{
{Name: constants.Attributes, Type: AttributesDT, Metadata: acommon.Metadata(acommon.Optional)},
}, nil)
rBuilder := builder.NewRecordBuilderExt(pool, schema, DefaultDictConfig, ProducerStats)
rBuilder := builder.NewRecordBuilderExt(pool, schema, DefaultDictConfig, ProducerStats, nil)
defer rBuilder.Release()

var record arrow.Record
Expand Down
3 changes: 2 additions & 1 deletion pkg/otel/common/arrow/dyn_attrs.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"go.opentelemetry.io/collector/pdata/pcommon"

"github.com/open-telemetry/otel-arrow/pkg/otel/constants"
"github.com/open-telemetry/otel-arrow/pkg/otel/observer"
)

const (
Expand Down Expand Up @@ -188,7 +189,7 @@ func (b *DynAttrsBuilder) IsEmpty() bool {
return len(b.parentIDColumn.values) == 0
}

func (b *DynAttrsBuilder) Build() (arrow.Record, error) {
func (b *DynAttrsBuilder) Build(observer observer.ProducerObserver) (arrow.Record, error) {
if b.newColumn {
b.sortColumns()
b.createBuilder()
Expand Down
Loading

0 comments on commit bb84a55

Please sign in to comment.