-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
OpenTelemetry Protocol with Apache Arrow Exporter component #31996
Changes from all commits
c8bfc04
a33a749
78b78ca
170910a
54511e6
72edc30
19b6344
f56207b
9db405a
52c4784
49b867b
640646a
2f94146
41bd121
ef9424d
27c980b
05d7259
ed18731
6c682c3
2bbffb8
12e3e69
b9379e6
ee96d91
2770c24
4cecc14
2b95e7b
935a08c
a38a63c
3a1484b
8af4b69
3bf5738
6f40a7b
cba162d
e7a8bf2
099d57c
64ea382
f2d2de2
af47d8d
2172639
0881db7
7e6db4e
d300433
7e540e9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
# Use this changelog template to create an entry for release notes. | ||
|
||
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' | ||
change_type: new_component | ||
|
||
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) | ||
component: OpenTelemetry Protocol with Apache Arrow Exporter | ||
|
||
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). | ||
note: Implementation copied from opentelemetry/otel-arrow repository @v0.20.0. | ||
|
||
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. | ||
issues: [26491] | ||
|
||
# (Optional) One or more lines of additional information to render under the primary note. | ||
# These lines will be padded with 2 spaces and then inserted directly into the document. | ||
# Use pipe (|) for multiline entries. | ||
subtext: | ||
|
||
# If your change doesn't affect end users or the exported elements of any package, | ||
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. | ||
# Optional: The change log or logs in which this entry should be included. | ||
# e.g. '[user]' or '[user, api]' | ||
# Include 'user' if the change is relevant to end users. | ||
# Include 'api' if there is a change to a library API. | ||
# Default: '[user]' | ||
change_logs: [user] |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,6 +15,8 @@ import ( | |
"go.opentelemetry.io/collector/config/configretry" | ||
"go.opentelemetry.io/collector/exporter/exporterhelper" | ||
"google.golang.org/grpc" | ||
|
||
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/otelarrowexporter/internal/arrow" | ||
) | ||
|
||
// Config defines configuration for OTLP exporter. | ||
|
@@ -26,12 +28,12 @@ type Config struct { | |
exporterhelper.TimeoutSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct. | ||
exporterhelper.QueueSettings `mapstructure:"sending_queue"` | ||
|
||
RetrySettings configretry.BackOffConfig `mapstructure:"retry_on_failure"` | ||
RetryConfig configretry.BackOffConfig `mapstructure:"retry_on_failure"` | ||
|
||
configgrpc.ClientConfig `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct. | ||
|
||
// Arrow includes settings specific to OTel Arrow. | ||
Arrow ArrowSettings `mapstructure:"arrow"` | ||
Arrow ArrowConfig `mapstructure:"arrow"` | ||
|
||
// UserDialOptions cannot be configured via `mapstructure` | ||
// schemes. This is useful for custom purposes where the | ||
|
@@ -40,9 +42,9 @@ type Config struct { | |
UserDialOptions []grpc.DialOption `mapstructure:"-"` | ||
} | ||
|
||
// ArrowSettings includes whether Arrow is enabled and the number of | ||
// ArrowConfig includes whether Arrow is enabled and the number of | ||
// concurrent Arrow streams. | ||
type ArrowSettings struct { | ||
type ArrowConfig struct { | ||
// NumStreams determines the number of OTel Arrow streams. | ||
NumStreams int `mapstructure:"num_streams"` | ||
|
||
|
@@ -65,32 +67,26 @@ type ArrowSettings struct { | |
// Note that `Zstd` applies to gRPC, not Arrow compression. | ||
PayloadCompression configcompression.Type `mapstructure:"payload_compression"` | ||
|
||
// Disabled prevents using OTel Arrow streams. The exporter | ||
// Disabled prevents using OTel-Arrow streams. The exporter | ||
// falls back to standard OTLP. | ||
Disabled bool `mapstructure:"disabled"` | ||
|
||
// DisableDowngrade prevents this exporter from fallback back | ||
// to standard OTLP. If the Arrow service is unavailable, it | ||
// will retry and/or fail. | ||
DisableDowngrade bool `mapstructure:"disable_downgrade"` | ||
|
||
// Prioritizer is a policy name for how load is distributed | ||
// across streams. | ||
Prioritizer arrow.PrioritizerName `mapstructure:"prioritizer"` | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This config option is not described in the README. It's not a big issue since the component is in development stability. |
||
} | ||
|
||
var _ component.Config = (*Config)(nil) | ||
|
||
// Validate checks if the exporter configuration is valid | ||
func (cfg *Config) Validate() error { | ||
if err := cfg.QueueSettings.Validate(); err != nil { | ||
return fmt.Errorf("queue settings has invalid configuration: %w", err) | ||
} | ||
if err := cfg.Arrow.Validate(); err != nil { | ||
return fmt.Errorf("arrow settings has invalid configuration: %w", err) | ||
} | ||
|
||
return nil | ||
} | ||
var _ component.ConfigValidator = (*ArrowConfig)(nil) | ||
|
||
// Validate returns an error when the number of streams is less than 1. | ||
func (cfg *ArrowSettings) Validate() error { | ||
func (cfg *ArrowConfig) Validate() error { | ||
if cfg.NumStreams < 1 { | ||
return fmt.Errorf("stream count must be > 0: %d", cfg.NumStreams) | ||
} | ||
|
@@ -103,6 +99,10 @@ func (cfg *ArrowSettings) Validate() error { | |
return fmt.Errorf("zstd encoder: invalid configuration: %w", err) | ||
} | ||
|
||
if err := cfg.Prioritizer.Validate(); err != nil { | ||
return fmt.Errorf("invalid prioritizer: %w", err) | ||
} | ||
|
||
// The cfg.PayloadCompression field is validated by the underlying library, | ||
// but we only support Zstd or none. | ||
switch cfg.PayloadCompression { | ||
|
@@ -113,7 +113,7 @@ func (cfg *ArrowSettings) Validate() error { | |
return nil | ||
} | ||
|
||
func (cfg *ArrowSettings) toArrowProducerOptions() (arrowOpts []config.Option) { | ||
func (cfg *ArrowConfig) toArrowProducerOptions() (arrowOpts []config.Option) { | ||
switch cfg.PayloadCompression { | ||
case configcompression.TypeZstd: | ||
arrowOpts = append(arrowOpts, config.WithZstd()) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,7 +25,7 @@ import ( | |
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/otelarrowexporter/internal/metadata" | ||
) | ||
|
||
// NewFactory creates a factory for OTel-Arrow exporter. | ||
// NewFactory creates a factory for OTLP exporter. | ||
func NewFactory() exporter.Factory { | ||
return exporter.NewFactory( | ||
metadata.Type, | ||
|
@@ -39,9 +39,8 @@ func NewFactory() exporter.Factory { | |
func createDefaultConfig() component.Config { | ||
return &Config{ | ||
TimeoutSettings: exporterhelper.NewDefaultTimeoutSettings(), | ||
RetrySettings: configretry.NewDefaultBackOffConfig(), | ||
RetryConfig: configretry.NewDefaultBackOffConfig(), | ||
QueueSettings: exporterhelper.NewDefaultQueueSettings(), | ||
|
||
ClientConfig: configgrpc.ClientConfig{ | ||
Headers: map[string]configopaque.String{}, | ||
// Default to zstd compression | ||
|
@@ -54,11 +53,12 @@ func createDefaultConfig() component.Config { | |
// destination. | ||
BalancerName: "round_robin", | ||
}, | ||
Arrow: ArrowSettings{ | ||
Arrow: ArrowConfig{ | ||
NumStreams: runtime.NumCPU(), | ||
MaxStreamLifetime: time.Hour, | ||
|
||
Zstd: zstd.DefaultEncoderConfig(), | ||
Zstd: zstd.DefaultEncoderConfig(), | ||
Prioritizer: arrow.DefaultPrioritizer, | ||
|
||
// PayloadCompression is off by default because gRPC | ||
// compression is on by default, above. | ||
|
@@ -67,14 +67,14 @@ func createDefaultConfig() component.Config { | |
} | ||
} | ||
|
||
func (e *baseExporter) helperOptions() []exporterhelper.Option { | ||
func (oce *baseExporter) helperOptions() []exporterhelper.Option { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not clear why this variable was renamed - what is oce stand for? 🤔 |
||
return []exporterhelper.Option{ | ||
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), | ||
exporterhelper.WithTimeout(e.config.TimeoutSettings), | ||
exporterhelper.WithRetry(e.config.RetrySettings), | ||
exporterhelper.WithQueue(e.config.QueueSettings), | ||
exporterhelper.WithStart(e.start), | ||
exporterhelper.WithShutdown(e.shutdown), | ||
exporterhelper.WithTimeout(oce.config.TimeoutSettings), | ||
exporterhelper.WithRetry(oce.config.RetryConfig), | ||
exporterhelper.WithQueue(oce.config.QueueSettings), | ||
exporterhelper.WithStart(oce.start), | ||
exporterhelper.WithShutdown(oce.shutdown), | ||
} | ||
} | ||
|
||
|
@@ -97,13 +97,13 @@ func createTracesExporter( | |
set exporter.CreateSettings, | ||
cfg component.Config, | ||
) (exporter.Traces, error) { | ||
exp, err := newExporter(cfg, set, createArrowTracesStream) | ||
oce, err := newExporter(cfg, set, createArrowTracesStream) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not clear why this variable was renamed - what is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The OTLP-general stuff up two levels was derived from the core |
||
if err != nil { | ||
return nil, err | ||
} | ||
return exporterhelper.NewTracesExporter(ctx, exp.settings, exp.config, | ||
exp.pushTraces, | ||
exp.helperOptions()..., | ||
return exporterhelper.NewTracesExporter(ctx, oce.settings, oce.config, | ||
oce.pushTraces, | ||
oce.helperOptions()..., | ||
) | ||
} | ||
|
||
|
@@ -116,13 +116,13 @@ func createMetricsExporter( | |
set exporter.CreateSettings, | ||
cfg component.Config, | ||
) (exporter.Metrics, error) { | ||
exp, err := newExporter(cfg, set, createArrowMetricsStream) | ||
oce, err := newExporter(cfg, set, createArrowMetricsStream) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return exporterhelper.NewMetricsExporter(ctx, exp.settings, exp.config, | ||
exp.pushMetrics, | ||
exp.helperOptions()..., | ||
return exporterhelper.NewMetricsExporter(ctx, oce.settings, oce.config, | ||
oce.pushMetrics, | ||
oce.helperOptions()..., | ||
) | ||
} | ||
|
||
|
@@ -135,12 +135,12 @@ func createLogsExporter( | |
set exporter.CreateSettings, | ||
cfg component.Config, | ||
) (exporter.Logs, error) { | ||
exp, err := newExporter(cfg, set, createArrowLogsStream) | ||
oce, err := newExporter(cfg, set, createArrowLogsStream) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return exporterhelper.NewLogsExporter(ctx, exp.settings, exp.config, | ||
exp.pushLogs, | ||
exp.helperOptions()..., | ||
return exporterhelper.NewLogsExporter(ctx, oce.settings, oce.config, | ||
oce.pushLogs, | ||
oce.helperOptions()..., | ||
) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.