From 301fe2f906f314d5a6b21b5a3c88eefc87ef2056 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Thu, 14 Dec 2023 21:28:28 -0800 Subject: [PATCH] [carbonreceiver] Hide unnecessary public API (#29895) Signed-off-by: Bogdan Drutu --- .chloggen/carbonreceiver.yaml | 22 ++++++++++ receiver/carbonreceiver/config.go | 41 +++-------------- receiver/carbonreceiver/config_test.go | 15 +++++++ receiver/carbonreceiver/factory.go | 9 +++- .../client/plaintext_client.go | 2 +- .../{ => internal}/transport/server.go | 2 +- .../{ => internal}/transport/server_test.go | 26 +++++++++-- .../{ => internal}/transport/tcp_server.go | 16 +------ .../{ => internal}/transport/udp_server.go | 2 +- receiver/carbonreceiver/protocol/config.go | 26 +++++------ .../carbonreceiver/protocol/config_test.go | 18 +++++--- receiver/carbonreceiver/receiver.go | 2 +- receiver/carbonreceiver/receiver_test.go | 20 +-------- receiver/carbonreceiver/reporter.go | 2 +- .../carbonreceiver/transport/mock_reporter.go | 44 ------------------- receiver/wavefrontreceiver/config.go | 8 ++++ receiver/wavefrontreceiver/factory.go | 9 +++- 17 files changed, 117 insertions(+), 147 deletions(-) create mode 100755 .chloggen/carbonreceiver.yaml rename receiver/carbonreceiver/{transport => internal}/client/plaintext_client.go (98%) rename receiver/carbonreceiver/{ => internal}/transport/server.go (97%) rename receiver/carbonreceiver/{ => internal}/transport/server_test.go (79%) rename receiver/carbonreceiver/{ => internal}/transport/tcp_server.go (92%) rename receiver/carbonreceiver/{ => internal}/transport/udp_server.go (97%) delete mode 100644 receiver/carbonreceiver/transport/mock_reporter.go diff --git a/.chloggen/carbonreceiver.yaml b/.chloggen/carbonreceiver.yaml new file mode 100755 index 000000000000..26c5033d7a94 --- /dev/null +++ b/.chloggen/carbonreceiver.yaml @@ -0,0 +1,22 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'breaking' + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: "carbonreceiver" + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Hide unnecessary public API + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [29895] + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/receiver/carbonreceiver/config.go b/receiver/carbonreceiver/config.go index 673118a3a7bd..b60a9a0476c2 100644 --- a/receiver/carbonreceiver/config.go +++ b/receiver/carbonreceiver/config.go @@ -4,23 +4,16 @@ package carbonreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver" import ( - "fmt" + "errors" "time" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/confignet" - "go.opentelemetry.io/collector/confmap" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver/protocol" ) -const ( - // parserConfigSection is the name that must be used for the parser settings - // in the configuration struct. The metadata mapstructure for the parser - // should use the same string. - parserConfigSection = "parser" -) - -var _ confmap.Unmarshaler = (*Config)(nil) +var _ component.ConfigValidator = (*Config)(nil) // Config defines configuration for the Carbon receiver. type Config struct { @@ -35,29 +28,9 @@ type Config struct { Parser *protocol.Config `mapstructure:"parser"` } -func (cfg *Config) Unmarshal(componentParser *confmap.Conf) error { - if componentParser == nil { - // The section is empty nothing to do, using the default config. - return nil +func (cfg *Config) Validate() error { + if cfg.TCPIdleTimeout < 0 { + return errors.New("'tcp_idle_timeout' must be non-negative") } - - // Unmarshal but not exact yet so the different keys under config do not - // trigger errors, this is needed so that the types of protocol and transport - // are read. - if err := componentParser.Unmarshal(cfg); err != nil { - return err - } - - // Unmarshal the protocol, so the type of config can be properly set. - vParserCfg, errSub := componentParser.Sub(parserConfigSection) - if errSub != nil { - return errSub - } - - if err := protocol.LoadParserConfig(vParserCfg, cfg.Parser); err != nil { - return fmt.Errorf("error on %q section: %w", parserConfigSection, err) - } - - // Unmarshal exact to validate the config keys. - return componentParser.Unmarshal(cfg, confmap.WithErrorUnused()) + return nil } diff --git a/receiver/carbonreceiver/config_test.go b/receiver/carbonreceiver/config_test.go index 2d906f6b5867..fa56a4535775 100644 --- a/receiver/carbonreceiver/config_test.go +++ b/receiver/carbonreceiver/config_test.go @@ -92,3 +92,18 @@ func TestLoadConfig(t *testing.T) { }) } } + +func TestConfigValidate(t *testing.T) { + cfg := &Config{ + NetAddr: confignet.NetAddr{ + Endpoint: "localhost:2003", + Transport: "tcp", + }, + TCPIdleTimeout: -1 * time.Second, + Parser: &protocol.Config{ + Type: "plaintext", + Config: &protocol.PlaintextConfig{}, + }, + } + assert.Error(t, cfg.Validate()) +} diff --git a/receiver/carbonreceiver/factory.go b/receiver/carbonreceiver/factory.go index 084b97cd990c..94e566d99931 100644 --- a/receiver/carbonreceiver/factory.go +++ b/receiver/carbonreceiver/factory.go @@ -5,6 +5,7 @@ package carbonreceiver // import "github.com/open-telemetry/opentelemetry-collec import ( "context" + "time" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/confignet" @@ -13,7 +14,11 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver/internal/metadata" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver/protocol" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver/transport" +) + +const ( + // tcpIdleTimeoutDefault is the default timeout for idle TCP connections. + tcpIdleTimeoutDefault = 30 * time.Second ) // This file implements factory for Carbon receiver. @@ -32,7 +37,7 @@ func createDefaultConfig() component.Config { Endpoint: "localhost:2003", Transport: "tcp", }, - TCPIdleTimeout: transport.TCPIdleTimeoutDefault, + TCPIdleTimeout: tcpIdleTimeoutDefault, Parser: &protocol.Config{ Type: "plaintext", Config: &protocol.PlaintextConfig{}, diff --git a/receiver/carbonreceiver/transport/client/plaintext_client.go b/receiver/carbonreceiver/internal/client/plaintext_client.go similarity index 98% rename from receiver/carbonreceiver/transport/client/plaintext_client.go rename to receiver/carbonreceiver/internal/client/plaintext_client.go index 9afb3de4a82f..d40a3491b232 100644 --- a/receiver/carbonreceiver/transport/client/plaintext_client.go +++ b/receiver/carbonreceiver/internal/client/plaintext_client.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package client // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver/transport/client" +package client // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver/internal/client" import ( "fmt" diff --git a/receiver/carbonreceiver/transport/server.go b/receiver/carbonreceiver/internal/transport/server.go similarity index 97% rename from receiver/carbonreceiver/transport/server.go rename to receiver/carbonreceiver/internal/transport/server.go index 126834967793..0b6ba5440db3 100644 --- a/receiver/carbonreceiver/transport/server.go +++ b/receiver/carbonreceiver/internal/transport/server.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package transport // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver/transport" +package transport // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver/internal/transport" import ( "context" diff --git a/receiver/carbonreceiver/transport/server_test.go b/receiver/carbonreceiver/internal/transport/server_test.go similarity index 79% rename from receiver/carbonreceiver/transport/server_test.go rename to receiver/carbonreceiver/internal/transport/server_test.go index f864e0555a3d..26b7c2ef8008 100644 --- a/receiver/carbonreceiver/transport/server_test.go +++ b/receiver/carbonreceiver/internal/transport/server_test.go @@ -4,6 +4,7 @@ package transport import ( + "context" "runtime" "sync" "testing" @@ -14,8 +15,8 @@ import ( "go.opentelemetry.io/collector/consumer/consumertest" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver/internal/client" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver/protocol" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver/transport/client" ) func Test_Server_ListenAndServe(t *testing.T) { @@ -52,7 +53,8 @@ func Test_Server_ListenAndServe(t *testing.T) { mc := new(consumertest.MetricsSink) p, err := (&protocol.PlaintextConfig{}).BuildParser() require.NoError(t, err) - mr := NewMockReporter(1) + mr := &mockReporter{} + mr.wgMetricsProcessed.Add(1) wgListenAndServe := sync.WaitGroup{} wgListenAndServe.Add(1) @@ -76,7 +78,7 @@ func Test_Server_ListenAndServe(t *testing.T) { err = gc.Disconnect() assert.NoError(t, err) - mr.WaitAllOnMetricsProcessedCalls() + mr.wgMetricsProcessed.Wait() // Keep trying until we're timed out or got a result assert.Eventually(t, func() bool { @@ -96,3 +98,21 @@ func Test_Server_ListenAndServe(t *testing.T) { }) } } + +// mockReporter provides a Reporter that provides some useful functionalities for +// tests (eg.: wait for certain number of messages). +type mockReporter struct { + wgMetricsProcessed sync.WaitGroup +} + +func (m *mockReporter) OnDataReceived(ctx context.Context) context.Context { + return ctx +} + +func (m *mockReporter) OnTranslationError(context.Context, error) {} + +func (m *mockReporter) OnMetricsProcessed(context.Context, int, error) { + m.wgMetricsProcessed.Done() +} + +func (m *mockReporter) OnDebugf(string, ...any) {} diff --git a/receiver/carbonreceiver/transport/tcp_server.go b/receiver/carbonreceiver/internal/transport/tcp_server.go similarity index 92% rename from receiver/carbonreceiver/transport/tcp_server.go rename to receiver/carbonreceiver/internal/transport/tcp_server.go index 0149afef318c..66fb42ac6535 100644 --- a/receiver/carbonreceiver/transport/tcp_server.go +++ b/receiver/carbonreceiver/internal/transport/tcp_server.go @@ -1,13 +1,12 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package transport // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver/transport" +package transport // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver/internal/transport" import ( "bufio" "context" "errors" - "fmt" "io" "net" "strings" @@ -20,11 +19,6 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver/protocol" ) -const ( - // TCPIdleTimeoutDefault is the default timeout for idle TCP connections. - TCPIdleTimeoutDefault = 30 * time.Second -) - type tcpServer struct { ln net.Listener wg sync.WaitGroup @@ -39,14 +33,6 @@ func NewTCPServer( addr string, idleTimeout time.Duration, ) (Server, error) { - if idleTimeout < 0 { - return nil, fmt.Errorf("invalid idle timeout: %v", idleTimeout) - } - - if idleTimeout == 0 { - idleTimeout = TCPIdleTimeoutDefault - } - ln, err := net.Listen("tcp", addr) if err != nil { return nil, err diff --git a/receiver/carbonreceiver/transport/udp_server.go b/receiver/carbonreceiver/internal/transport/udp_server.go similarity index 97% rename from receiver/carbonreceiver/transport/udp_server.go rename to receiver/carbonreceiver/internal/transport/udp_server.go index fc58b1b1c659..0f1679e617a3 100644 --- a/receiver/carbonreceiver/transport/udp_server.go +++ b/receiver/carbonreceiver/internal/transport/udp_server.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package transport // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver/transport" +package transport // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver/internal/transport" import ( "bytes" diff --git a/receiver/carbonreceiver/protocol/config.go b/receiver/carbonreceiver/protocol/config.go index 01be0b2338d2..08d576287c9c 100644 --- a/receiver/carbonreceiver/protocol/config.go +++ b/receiver/carbonreceiver/protocol/config.go @@ -35,12 +35,7 @@ func init() { }) } -const ( - // configSection is the name that must be used for the config settings - // in the configuration struct. The metadata mapstructure for the parser - // should use the same string. - configSection = "config" -) +var _ confmap.Unmarshaler = (*Config)(nil) // Config is the general configuration for the parser to be used. type Config struct { @@ -57,10 +52,14 @@ type ParserConfig interface { BuildParser() (Parser, error) } -// LoadParserConfig is used to load the parser configuration according to the -// specified parser type. It expects the passed viper to be pointing at the level -// of the Config reference. -func LoadParserConfig(cp *confmap.Conf, cfg *Config) error { +// Unmarshal is used to load the parser configuration according to the +// specified parser type. +func (cfg *Config) Unmarshal(cp *confmap.Conf) error { + // If type is configured then use that, otherwise use default. + if configuredType, ok := cp.Get("type").(string); ok { + cfg.Type = configuredType + } + defaultCfgFn, ok := parserMap[cfg.Type] if !ok { return fmt.Errorf( @@ -71,10 +70,5 @@ func LoadParserConfig(cp *confmap.Conf, cfg *Config) error { cfg.Config = defaultCfgFn() - vParserCfg, errSub := cp.Sub(configSection) - if errSub != nil { - return errSub - } - - return vParserCfg.Unmarshal(cfg.Config, confmap.WithErrorUnused()) + return cp.Unmarshal(cfg, confmap.WithErrorUnused()) } diff --git a/receiver/carbonreceiver/protocol/config_test.go b/receiver/carbonreceiver/protocol/config_test.go index 9cc9e12f3913..587365b151d7 100644 --- a/receiver/carbonreceiver/protocol/config_test.go +++ b/receiver/carbonreceiver/protocol/config_test.go @@ -14,14 +14,12 @@ func TestLoadParserConfig(t *testing.T) { tests := []struct { name string cfgMap map[string]any - cfg Config want Config wantErr bool }{ { name: "unknow_type", - cfgMap: map[string]any{"type": "unknow"}, - cfg: Config{Type: "unknown"}, + cfgMap: map[string]any{"type": "unknown"}, want: Config{Type: "unknown"}, wantErr: true, }, @@ -35,7 +33,6 @@ func TestLoadParserConfig(t *testing.T) { "rules": []any{map[string]any{"regexp": "(?.*test)"}}, }, }, - cfg: Config{Type: "regex"}, want: Config{ Type: "regex", Config: &RegexParserConfig{ @@ -47,19 +44,26 @@ func TestLoadParserConfig(t *testing.T) { { name: "default_regex", cfgMap: map[string]any{"type": "regex"}, - cfg: Config{Type: "regex"}, want: Config{ Type: "regex", Config: &RegexParserConfig{}, }, }, + { + name: "plaintext", + cfgMap: map[string]any{"type": "plaintext"}, + want: Config{ + Type: "plaintext", + Config: &PlaintextConfig{}, + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { v := confmap.NewFromStringMap(tt.cfgMap) - got := tt.cfg // Not strictly necessary but it makes easier to debug issues. - err := LoadParserConfig(v, &got) + got := Config{} + err := got.Unmarshal(v) assert.Equal(t, tt.want, got) assert.Equal(t, tt.wantErr, err != nil) }) diff --git a/receiver/carbonreceiver/receiver.go b/receiver/carbonreceiver/receiver.go index 666a8aeeadd1..c66891719eab 100644 --- a/receiver/carbonreceiver/receiver.go +++ b/receiver/carbonreceiver/receiver.go @@ -13,8 +13,8 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/receiver" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver/internal/transport" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver/protocol" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver/transport" ) var ( diff --git a/receiver/carbonreceiver/receiver_test.go b/receiver/carbonreceiver/receiver_test.go index c355fb9e2588..7e1e2ff3dd7d 100644 --- a/receiver/carbonreceiver/receiver_test.go +++ b/receiver/carbonreceiver/receiver_test.go @@ -22,8 +22,8 @@ import ( "go.opentelemetry.io/otel/sdk/trace/tracetest" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver/internal/client" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver/protocol" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver/transport/client" ) func Test_carbonreceiver_New(t *testing.T) { @@ -136,24 +136,6 @@ func Test_carbonreceiver_Start(t *testing.T) { }, wantErr: errors.New("unsupported transport \"unknown_transp\""), }, - { - name: "negative_tcp_idle_timeout", - args: args{ - config: Config{ - NetAddr: confignet.NetAddr{ - Endpoint: "localhost:2003", - Transport: "tcp", - }, - TCPIdleTimeout: -1 * time.Second, - Parser: &protocol.Config{ - Type: "plaintext", - Config: &protocol.PlaintextConfig{}, - }, - }, - nextConsumer: consumertest.NewNop(), - }, - wantErr: errors.New("invalid idle timeout: -1s"), - }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/receiver/carbonreceiver/reporter.go b/receiver/carbonreceiver/reporter.go index b33e6d5c6174..e41ba919d37d 100644 --- a/receiver/carbonreceiver/reporter.go +++ b/receiver/carbonreceiver/reporter.go @@ -11,7 +11,7 @@ import ( "go.opentelemetry.io/otel/trace" "go.uber.org/zap" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver/transport" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver/internal/transport" ) // reporter struct implements the transport.Reporter interface to give consistent diff --git a/receiver/carbonreceiver/transport/mock_reporter.go b/receiver/carbonreceiver/transport/mock_reporter.go deleted file mode 100644 index 4c59251e87fd..000000000000 --- a/receiver/carbonreceiver/transport/mock_reporter.go +++ /dev/null @@ -1,44 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package transport // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver/transport" - -import ( - "context" - "sync" -) - -// MockReporter provides a Reporter that provides some useful functionalities for -// tests (eg.: wait for certain number of messages). -type MockReporter struct { - wgMetricsProcessed sync.WaitGroup -} - -var _ Reporter = (*MockReporter)(nil) - -// NewMockReporter returns a new instance of a MockReporter. -func NewMockReporter(expectedOnMetricsProcessedCalls int) *MockReporter { - m := MockReporter{} - m.wgMetricsProcessed.Add(expectedOnMetricsProcessedCalls) - return &m -} - -func (m *MockReporter) OnDataReceived(ctx context.Context) context.Context { - return ctx -} - -func (m *MockReporter) OnTranslationError(_ context.Context, _ error) { -} - -func (m *MockReporter) OnMetricsProcessed(_ context.Context, _ int, _ error) { - m.wgMetricsProcessed.Done() -} - -func (m *MockReporter) OnDebugf(_ string, _ ...any) { -} - -// WaitAllOnMetricsProcessedCalls blocks until the number of expected calls -// specified at creation of the reporter is completed. -func (m *MockReporter) WaitAllOnMetricsProcessedCalls() { - m.wgMetricsProcessed.Wait() -} diff --git a/receiver/wavefrontreceiver/config.go b/receiver/wavefrontreceiver/config.go index 3641bc5e2c43..6b0b20cad5fd 100644 --- a/receiver/wavefrontreceiver/config.go +++ b/receiver/wavefrontreceiver/config.go @@ -4,6 +4,7 @@ package wavefrontreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/wavefrontreceiver" import ( + "errors" "time" "go.opentelemetry.io/collector/config/confignet" @@ -20,3 +21,10 @@ type Config struct { // tags in the CollectD format from the metric name. The default is false. ExtractCollectdTags bool `mapstructure:"extract_collectd_tags"` } + +func (cfg *Config) Validate() error { + if cfg.TCPIdleTimeout < 0 { + return errors.New("'tcp_idle_timeout' must be non-negative") + } + return nil +} diff --git a/receiver/wavefrontreceiver/factory.go b/receiver/wavefrontreceiver/factory.go index 6c9f44bcabc0..7da77a461942 100644 --- a/receiver/wavefrontreceiver/factory.go +++ b/receiver/wavefrontreceiver/factory.go @@ -6,16 +6,21 @@ package wavefrontreceiver // import "github.com/open-telemetry/opentelemetry-col import ( "context" "fmt" + "time" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/confignet" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/receiver" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver/transport" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/wavefrontreceiver/internal/metadata" ) +const ( + // tcpIdleTimeoutDefault is the default timeout for idle TCP connections. + tcpIdleTimeoutDefault = 30 * time.Second +) + // This file implements factory for the Wavefront receiver. // NewFactory creates a factory for WaveFront receiver. @@ -31,7 +36,7 @@ func createDefaultConfig() component.Config { TCPAddr: confignet.TCPAddr{ Endpoint: "localhost:2003", }, - TCPIdleTimeout: transport.TCPIdleTimeoutDefault, + TCPIdleTimeout: tcpIdleTimeoutDefault, } }