Skip to content

Commit

Permalink
update README
Browse files Browse the repository at this point in the history
  • Loading branch information
jmacd committed Nov 3, 2023
1 parent 6f652b4 commit 1db4c89
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 68 deletions.
68 changes: 34 additions & 34 deletions collector/compression/zstd/zstd.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,6 @@ type EncoderConfig struct {
}

type DecoderConfig struct {
// Level is symmetric with encoder config. Although the
// decoder object does not use this configuration, it is the
// key used to lookup configuration corresponding with the
// same setting on the encoder, so provides a means of
// multi-configuration.
Level Level `mapstructure:"level"`
Concurrency uint `mapstructure:"concurrency"`
MemoryLimitMiB uint32 `mapstructure:"memory_limit_mib"`
MaxWindowSizeMiB uint32 `mapstructure:"max_window_size_mib"`
Expand Down Expand Up @@ -105,19 +99,18 @@ func DefaultEncoderConfig() EncoderConfig {

func DefaultDecoderConfig() DecoderConfig {
return DecoderConfig{
Level: DefaultLevel, // Default speed
Concurrency: 1, // Avoids extra CPU/memory
MemoryLimitMiB: 512, // More conservative than library default
MaxWindowSizeMiB: 32, // Corresponds w/ "best" level default
Concurrency: 1, // Avoids extra CPU/memory
MemoryLimitMiB: 128, // More conservative than library default
MaxWindowSizeMiB: 32, // Corresponds w/ "best" level default
}
}

func validate(level Level, f func() error) error {
if level > MaxLevel {
return fmt.Errorf("level out of range [0,10]: %d", level)
}
if level == 0 {
return nil
if level < MinLevel {
return fmt.Errorf("level out of range [0,10]: %d", level)
}
return f()
}
Expand All @@ -134,7 +127,7 @@ func (cfg EncoderConfig) Validate() error {
}

func (cfg DecoderConfig) Validate() error {
return validate(cfg.Level, func() error {
return validate(MinLevel, func() error {
var buf bytes.Buffer
test, err := zstdlib.NewReader(&buf, cfg.options()...)
if test != nil {
Expand All @@ -147,47 +140,58 @@ func (cfg DecoderConfig) Validate() error {
func init() {
staticInstances.lock.Lock()
defer staticInstances.lock.Unlock()
resetLibrary()
}

func resetLibrary() {
for level := Level(MinLevel); level <= MaxLevel; level++ {
var combi combined
combi.enc.cfg = DefaultEncoderConfig()
combi.dec.cfg = DefaultDecoderConfig()
combi.enc.cfg.Level = level
combi.dec.cfg.Level = level
encoding.RegisterCompressor(&combi)
staticInstances.byLevel[level] = &combi
}
}

func SetEncoderConfig(cfg EncoderConfig) error {
if err := cfg.Validate(); err != nil || cfg.Level == 0 {
if err := cfg.Validate(); err != nil {
return err
}

updateOne := func(enc *encoder) {
enc.lock.Lock()
defer enc.lock.Unlock()
enc.cfg = cfg
enc.pool.Reset()
}

staticInstances.lock.Lock()
defer staticInstances.lock.Unlock()

enc := &staticInstances.byLevel[cfg.Level].enc
enc.lock.Lock()
defer enc.lock.Unlock()

enc.cfg = cfg
enc.pool.Reset()
updateOne(&staticInstances.byLevel[cfg.Level].enc)
return nil
}

func SetDecoderConfig(cfg DecoderConfig) error {
if err := cfg.Validate(); err != nil || cfg.Level == 0 {
if err := cfg.Validate(); err != nil {
return err
}
updateOne := func(dec *decoder) {
dec.lock.Lock()
defer dec.lock.Unlock()
dec.cfg = cfg
dec.pool.Reset()
}

staticInstances.lock.Lock()
defer staticInstances.lock.Unlock()

dec := &staticInstances.byLevel[cfg.Level].dec
dec.lock.Lock()
defer dec.lock.Unlock()

dec.cfg = cfg
dec.pool.Reset()
for level := MinLevel; level <= MaxLevel; level++ {
updateOne(&staticInstances.byLevel[level].dec)
}
return nil

}

func (cfg EncoderConfig) options() (opts []zstdlib.EOption) {
Expand All @@ -214,12 +218,8 @@ func (cfg EncoderConfig) Name() string {
}

func (cfg EncoderConfig) CallOption() grpc.CallOption {
if cfg.Level == 0 {
// The caller is meant to avoid configuring this call
// option entirely when level is zero. If somehow
// this happens, use the well-known configuration in
// OTC and not the invalid configuration we have here.
return grpc.UseCompressor("zstd")
if cfg.Level < MinLevel || cfg.Level > MaxLevel {
return grpc.UseCompressor(EncoderConfig{Level: DefaultLevel}.Name())
}
return grpc.UseCompressor(cfg.Name())
}
Expand Down
60 changes: 35 additions & 25 deletions collector/compression/zstd/zstd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,69 +10,64 @@ import (
"time"

"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/encoding"
)

func resetTest() {
TTL = time.Minute
resetLibrary()
}

func TestCompressorNonNil(t *testing.T) {
defer resetTest()

for i := 1; i <= 10; i++ {
require.NotNil(t, encoding.GetCompressor(fmt.Sprint(NamePrefix, i)))
}
require.Nil(t, encoding.GetCompressor(fmt.Sprint(NamePrefix, MinLevel-1)))
require.Nil(t, encoding.GetCompressor(fmt.Sprint(NamePrefix, MaxLevel+1)))
}

func TestConfigLevelZeroNoop(t *testing.T) {
// Test that level zero misconfiguration is a noop, because
// there is no level-zero compressor. The same configuration
// at level 1 is an error.
require.NoError(t, SetEncoderConfig(EncoderConfig{
Level: 0,
WindowSizeMiB: 1024,
}))
func TestConfigLibraryError(t *testing.T) {
defer resetTest()

require.Error(t, SetEncoderConfig(EncoderConfig{
Level: 1,
WindowSizeMiB: 1024,
}))
require.NoError(t, SetDecoderConfig(DecoderConfig{
Level: 0,
MaxWindowSizeMiB: 1024,
}))
require.NoError(t, SetDecoderConfig(DecoderConfig{
Level: 1,
MaxWindowSizeMiB: 1024,
}))
}

func TestInvalidCompressorLevel(t *testing.T) {
defer resetTest()

require.Error(t, SetEncoderConfig(EncoderConfig{
Level: 12,
Concurrency: 10,
WindowSizeMiB: 16,
}))
require.Error(t, SetDecoderConfig(DecoderConfig{
Level: 12,
Concurrency: 10,
MaxWindowSizeMiB: 16,
MemoryLimitMiB: 256,
}))
}

func TestAllCompressorOptions(t *testing.T) {
defer resetTest()

require.NoError(t, SetEncoderConfig(EncoderConfig{
Level: 9,
Concurrency: 10,
WindowSizeMiB: 16,
}))
require.NoError(t, SetDecoderConfig(DecoderConfig{
Level: 9,
Concurrency: 10,
MaxWindowSizeMiB: 16,
MemoryLimitMiB: 256,
}))
}

func TestCompressorReset(t *testing.T) {
TTL = time.Minute
defer resetTest()

// Get compressor configs 1 and 2.
comp1 := encoding.GetCompressor("zstdarrow1").(*combined)
Expand Down Expand Up @@ -125,7 +120,7 @@ func TestCompressorReset(t *testing.T) {
}

func TestDecompressorReset(t *testing.T) {
TTL = time.Minute
defer resetTest()

// Get compressor configs 3 and 4.
comp3 := encoding.GetCompressor("zstdarrow3").(*combined)
Expand All @@ -151,7 +146,6 @@ func TestDecompressorReset(t *testing.T) {
cpyCfg3 := decCfg3
cpyCfg3.MaxWindowSizeMiB = 128

require.Equal(t, Level(3), cpyCfg3.Level)
require.NotEqual(t, cpyCfg3, decCfg3, "see %v %v", cpyCfg3, decCfg3)

require.NoError(t, SetDecoderConfig(cpyCfg3))
Expand All @@ -160,12 +154,28 @@ func TestDecompressorReset(t *testing.T) {
require.Equal(t, comp3, encoding.GetCompressor("zstdarrow3").(*combined))
require.Equal(t, comp4, encoding.GetCompressor("zstdarrow4").(*combined))

// Level 4 is unchanged
require.Equal(t, decCfg4, comp4.dec.getConfig())
// Level 4 is _also changed_
require.Equal(t, cpyCfg3, comp4.dec.getConfig())
require.NotEqual(t, decCfg4, comp4.dec.getConfig())

// Level 3 is changed
require.NotEqual(t, decCfg3, comp3.dec.getConfig(), "see %v %v", decCfg3, comp3.dec.getConfig())

// Unlike the encoder test, which has an explicit Close() to its advantage,
// we aren't testing the behavior of the finalizer that puts back into the MRU.
}

func TestGRPCCallOption(t *testing.T) {
cfgN := func(l Level) EncoderConfig {
return EncoderConfig{
Level: l,
}
}
cfg2 := cfgN(2)
require.Equal(t, cfg2.Name(), cfg2.CallOption().(grpc.CompressorCallOption).CompressorType)

cfgD := cfgN(DefaultLevel)
cfg13 := cfgN(13)
// Invalid maps to default call option
require.Equal(t, cfgD.Name(), cfg13.CallOption().(grpc.CompressorCallOption).CompressorType)
}
45 changes: 44 additions & 1 deletion collector/exporter/otelarrowexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -171,14 +171,57 @@ of data being returned to the exporter will be instrumented:
- `exporter_recv`: uncompressed bytes received, prior to compression
- `exporter_recv_wire`: compressed bytes received, on the wire.

### Experimental Configuration
### Compression Configuration

The exporter supports configuring Zstd compression at both the gRPC
and the Arrow level. The exporter metrics described above will be
correct in either case. The default settings are subject to change as
we gain experience.

The gRPC-level Zstd compression can be configured, however there is an
important caveat. The gRPC-Go library requires that compressor
implementations be registered statically. These libraries use
compressors named `zstdarrow1`, `zstdarrow2`, ..., `zstdarrow10`,
supporting 10 configurable compression levels. Note, however that
these configurations are static and only one unique configuration is
possible per level. It is possible to configure multiple OTel-Arrow
exporters with different Zstd configuration simply by using distinct
levels.

Under `arrow`, the `zstd` sub-configuration has the following fields:

- `level`: in the range 1-10 determines a number of defaults (default 5)
- `window_size_mib`: size of the Zstd window in MiB, 0 indicates to determine based on level (default 0)
- `concurrency`: controls background CPU used for compression, 0 indicates to let `zstd` library decide (default 1)

The exporter supports configuring compression at the [Arrow
columnar-protocol
level](https://arrow.apache.org/docs/format/Columnar.html#format-ipc).

- `payload_compression`: compression applied at the Arrow IPC level, "none" by default, "zstd" supported.

Compression settings at the Arrow IPC level cannot be further configured.

For example, two exporters may be configured with multiple zstd
configurations, provided they use different levels:

```yaml
exporters:
otelarrow/best:
compression: zstd # describes gRPC-level compression (default "zstd")
arrow:
zstd:
level: 10 # describes gRPC-level compression level (default 5)
payload_compression: zstd # describes Arrow-IPC compression (default "none")
otelarrow/fastest:
compression: zstd
arrow:
zstd:
level: 1 # 1 is the "fastest" compression level
```

### Experimental Configuration

The exporter uses the signal-specific Arrow stream methods (i.e.,
`ArrowTraces`, `ArrowLogs`, and `ArrowMetrics`) by default. There is
an option to use the generic `ArrowStream` method instead.
Expand Down
3 changes: 1 addition & 2 deletions collector/exporter/otelarrowexporter/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,7 @@ func (e *baseExporter) start(ctx context.Context, host component.Host) (err erro

arrowCallOpts := e.callOptions

if e.config.GRPCClientSettings.Compression == configcompression.Zstd &&
e.config.Arrow.Zstd.Level != 0 {
if e.config.GRPCClientSettings.Compression == configcompression.Zstd {
// ignore the error below b/c Validate() was called
_ = zstd.SetEncoderConfig(e.config.Arrow.Zstd)
// use the configured compressor.
Expand Down
11 changes: 10 additions & 1 deletion collector/receiver/otelarrowreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,21 @@ Several common configuration structures provide additional capabilities automati

In the `arrow` configuration block, the following settings are available:

- `memory_limit` (default: 128MiB): limits the amount of concurrent memory used by Arrow data buffers.
- `memory_limit_mib` (default: 128): limits the amount of concurrent memory used by Arrow data buffers.

When the limit is reached, the receiver will return RESOURCE_EXHAUSTED
error codes to the receiver, which are [conditionally retryable, see
exporter retry configuration](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md).

### Compression Configuration

In the `arrow` configuration block, `zstd` sub-section applies to all
compression levels used by exporters:

- `memory_limit_mib` limits memory dedicated to Zstd decompression, per stream (default 128)
- `max_window_size_mib`: maximum size of the Zstd window in MiB, 0 indicates to determine based on level (default 32)
- `concurrency`: controls background CPU used for decompression, 0 indicates to let `zstd` library decide (default 1)

### Keepalive configuration

As a gRPC streaming service, the OTel Arrow receiver is able to limit
Expand Down
6 changes: 1 addition & 5 deletions collector/receiver/otelarrowreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,7 @@ type ArrowSettings struct {
MemoryLimitMiB uint64 `mapstructure:"memory_limit_mib"`

// Zstd settings apply to OTel-Arrow use of gRPC specifically.
// Note that when multiple Otel-Arrow exporters are configured
// their settings will be applied in arbitrary order.
// Identical Zstd settings are recommended when multiple
// OTel-Arrow exporters are in use.
Zstd zstd.DecoderConfig `mapstructure:"zstd"`
Zstd []zstd.DecoderConfig `mapstructure:"zstd"`
}

// Config defines configuration for OTel Arrow receiver.
Expand Down

0 comments on commit 1db4c89

Please sign in to comment.