Skip to content

Commit

Permalink
Remove support for HTTP/OTLP from otelarrow receiver (#120)
Browse files Browse the repository at this point in the history
See #43.
If we decided to support Arrow over HTTP we could bring this back.
This removes a lot of code that was not a functional part of the Arrow
project which it inherited from the branch history with `otlpreceiver`.
  • Loading branch information
jmacd authored Dec 12, 2023
1 parent b7ed304 commit 579abeb
Show file tree
Hide file tree
Showing 19 changed files with 146 additions and 1,530 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
go-version: "1.21"

- name: Build all modules
run: make build || true
run: make build

- name: Test all modules
run: make test || true
run: make test
31 changes: 2 additions & 29 deletions collector/receiver/otelarrowreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ Receives telemetry data using the
[OTel-Arrow](https://github.com/open-telemetry/otel-arrow) protocol
via gRPC and standard [OTLP](
https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/otlp.md)
protocol via gRPC or HTTP.
protocol via gRPC.

## Getting Started

Expand Down Expand Up @@ -65,7 +65,7 @@ receivers:
...
```
- `endpoint` (default = 0.0.0.0:4317 for grpc protocol, 0.0.0.0:4318 http protocol):
- `endpoint` (default = 0.0.0.0:4317 for grpc protocol):
host:port to which the receiver is going to receive data. The valid syntax is
described at https://github.com/grpc/grpc/blob/master/doc/naming.md.

Expand Down Expand Up @@ -182,10 +182,6 @@ of detail. At the normal level, these metrics are introduced:
- `arrow_memory_inuse`: UpDownCounter of memory in use by current streams
- `arrow_schema_resets`: Counter of times the schema was adjusted, by data type.
When the configured metrics level is "detailed" unique consumer ID
corresponding with a single gRPC stream will be attached to the
counters, for example:
```
service
...
Expand All @@ -195,26 +191,3 @@ service
...
level: detailed
```
We recommend to use the "lowmemory" temporality preference combined
with detail-level metrics in the OTel SDK. (TODO: Document how to do
this after the Collector's OTel SDK is configurable.)
## HTTP-specific documentation
To enable optional OTLP/HTTP support, the HTTP protocol must be
explicitly listed. It will use port 4318 by default. The OTel Arrow
protocol is not currently supported over HTTP.
```
receivers:
otelarrow:
protocols:
http:
```
See the core OTLP receiver for documentation specific to HTTP
connections, including:
- [Writing with HTTP/JSON](https://github.com/open-telemetry/opentelemetry-collector/tree/main/receiver/otlpreceiver#writing-with-httpjson)
- [CORS (Cross-origin resource sharing)](https://github.com/open-telemetry/opentelemetry-collector/tree/main/receiver/otlpreceiver#cors-cross-origin-resource-sharing)
74 changes: 5 additions & 69 deletions collector/receiver/otelarrowreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,42 +6,24 @@ package otelarrowreceiver // import "github.com/open-telemetry/otel-arrow/collec
import (
"errors"
"fmt"
"net/url"
"path"

"github.com/open-telemetry/otel-arrow/collector/compression/zstd"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/confmap"
)

const (
// Confmap values.
protoGRPC = "protocols::grpc"
protoHTTP = "protocols::http"
protoArrowOldMemoryLimit = "protocols::arrow::memory_limit"
protoArrowMemoryLimitMiB = "protocols::arrow::memory_limit_mib"
)

type httpServerSettings struct {
*confighttp.HTTPServerSettings `mapstructure:",squash"`

// The URL path to receive traces on. If omitted "/v1/traces" will be used.
TracesURLPath string `mapstructure:"traces_url_path,omitempty"`

// The URL path to receive metrics on. If omitted "/v1/metrics" will be used.
MetricsURLPath string `mapstructure:"metrics_url_path,omitempty"`

// The URL path to receive logs on. If omitted "/v1/logs" will be used.
LogsURLPath string `mapstructure:"logs_url_path,omitempty"`
}

// Protocols is the configuration for the supported protocols.
type Protocols struct {
GRPC *configgrpc.GRPCServerSettings `mapstructure:"grpc"`
HTTP *httpServerSettings `mapstructure:"http"`
Arrow *ArrowSettings `mapstructure:"arrow"`
GRPC configgrpc.GRPCServerSettings `mapstructure:"grpc"`
Arrow ArrowSettings `mapstructure:"arrow"`
}

// ArrowSettings support configuring the Arrow receiver.
Expand All @@ -60,7 +42,7 @@ type ArrowSettings struct {

// Config defines configuration for OTel Arrow receiver.
type Config struct {
// Protocols is the configuration for the supported protocols, currently gRPC and HTTP (Proto and JSON).
// Protocols is the configuration for gRPC and Arrow.
Protocols `mapstructure:"protocols"`
}

Expand All @@ -69,16 +51,8 @@ var _ confmap.Unmarshaler = (*Config)(nil)

// Validate checks the receiver configuration is valid
func (cfg *Config) Validate() error {
if cfg.GRPC == nil && cfg.HTTP == nil {
return errors.New("must specify at least one protocol when using the OTel Arrow receiver")
}
if cfg.Arrow != nil && cfg.GRPC == nil {
return errors.New("must specify at gRPC protocol when using the OTLP Arrow receiver")
}
if cfg.Arrow != nil {
if err := cfg.Arrow.Validate(); err != nil {
return err
}
if err := cfg.Arrow.Validate(); err != nil {
return err
}
return nil
}
Expand Down Expand Up @@ -106,49 +80,11 @@ func (cfg *Config) Unmarshal(conf *confmap.Conf) error {
return err
}

// Note: since this is the OTel-Arrow exporter, not the core component,
// we allow a configuration that is free of an explicit protocol, i.e.,
// we assume gRPC but we do not assume HTTP, whereas the core component
// also has:
//
// if !conf.IsSet(protoGRPC) {
// cfg.GRPC = nil
// }

// Allow the deprecated field, when explicitly set, to unset
// the new default value.
if conf.IsSet(protoArrowOldMemoryLimit) && !conf.IsSet(protoArrowMemoryLimitMiB) {
cfg.Arrow.MemoryLimitMiB = 0
}

if !conf.IsSet(protoHTTP) {
cfg.HTTP = nil
} else {
var err error

if cfg.HTTP.TracesURLPath, err = sanitizeURLPath(cfg.HTTP.TracesURLPath); err != nil {
return err
}
if cfg.HTTP.MetricsURLPath, err = sanitizeURLPath(cfg.HTTP.MetricsURLPath); err != nil {
return err
}
if cfg.HTTP.LogsURLPath, err = sanitizeURLPath(cfg.HTTP.LogsURLPath); err != nil {
return err
}
}

return nil
}

// Verify signal URL path sanity
func sanitizeURLPath(urlPath string) (string, error) {
u, err := url.Parse(urlPath)
if err != nil {
return "", fmt.Errorf("invalid HTTP URL path set for signal: %w", err)
}

if !path.IsAbs(u.Path) {
u.Path = "/" + u.Path
}
return u.Path, nil
}
113 changes: 5 additions & 108 deletions collector/receiver/otelarrowreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/confmap/confmaptest"
Expand All @@ -26,7 +25,6 @@ func TestUnmarshalDefaultConfig(t *testing.T) {
cfg := factory.CreateDefaultConfig()
assert.NoError(t, component.UnmarshalConfig(cm, cfg))
defaultCfg := factory.CreateDefaultConfig().(*Config)
defaultCfg.HTTP = nil
assert.Equal(t, defaultCfg, cfg)
}

Expand All @@ -38,43 +36,9 @@ func TestUnmarshalConfigOnlyGRPC(t *testing.T) {
assert.NoError(t, component.UnmarshalConfig(cm, cfg))

defaultOnlyGRPC := factory.CreateDefaultConfig().(*Config)
defaultOnlyGRPC.HTTP = nil
assert.Equal(t, defaultOnlyGRPC, cfg)
}

func TestUnmarshalConfigOnlyHTTP(t *testing.T) {
cm, err := confmaptest.LoadConf(filepath.Join("testdata", "only_http.yaml"))
require.NoError(t, err)
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
assert.NoError(t, component.UnmarshalConfig(cm, cfg))

defaultOnlyHTTP := factory.CreateDefaultConfig().(*Config)
assert.Equal(t, defaultOnlyHTTP, cfg)
}

func TestUnmarshalConfigOnlyHTTPNull(t *testing.T) {
cm, err := confmaptest.LoadConf(filepath.Join("testdata", "only_http_null.yaml"))
require.NoError(t, err)
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
assert.NoError(t, component.UnmarshalConfig(cm, cfg))

defaultOnlyHTTP := factory.CreateDefaultConfig().(*Config)
assert.Equal(t, defaultOnlyHTTP, cfg)
}

func TestUnmarshalConfigOnlyHTTPEmptyMap(t *testing.T) {
cm, err := confmaptest.LoadConf(filepath.Join("testdata", "only_http_empty_map.yaml"))
require.NoError(t, err)
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
assert.NoError(t, component.UnmarshalConfig(cm, cfg))

defaultOnlyHTTP := factory.CreateDefaultConfig().(*Config)
assert.Equal(t, defaultOnlyHTTP, cfg)
}

func TestUnmarshalOldMemoryLimitConfig(t *testing.T) {
cm, err := confmaptest.LoadConf(filepath.Join("testdata", "oldmemlimit.yaml"))
require.NoError(t, err)
Expand All @@ -84,7 +48,6 @@ func TestUnmarshalOldMemoryLimitConfig(t *testing.T) {
expectCfg := factory.CreateDefaultConfig().(*Config)
// The number in config is <1MB, so Validate() rounds up.
expectCfg.Arrow.MemoryLimitMiB = 1
expectCfg.HTTP = nil
assert.NoError(t, cfg.(*Config).Validate())
assert.Equal(t, expectCfg, cfg)
}
Expand All @@ -107,7 +70,7 @@ func TestUnmarshalConfig(t *testing.T) {
assert.Equal(t,
&Config{
Protocols: Protocols{
GRPC: &configgrpc.GRPCServerSettings{
GRPC: configgrpc.GRPCServerSettings{
NetAddr: confignet.NetAddr{
Endpoint: "0.0.0.0:4317",
Transport: "tcp",
Expand Down Expand Up @@ -136,25 +99,7 @@ func TestUnmarshalConfig(t *testing.T) {
},
},
},
HTTP: &httpServerSettings{
HTTPServerSettings: &confighttp.HTTPServerSettings{
Endpoint: "0.0.0.0:4318",
TLSSetting: &configtls.TLSServerSetting{
TLSSetting: configtls.TLSSetting{
CertFile: "test.crt",
KeyFile: "test.key",
},
},
CORS: &confighttp.CORSSettings{
AllowedOrigins: []string{"https://*.test.com", "https://test.com"},
MaxAge: 7200,
},
},
TracesURLPath: "/traces",
MetricsURLPath: "/v2/metrics",
LogsURLPath: "/log/ingest",
},
Arrow: &ArrowSettings{
Arrow: ArrowSettings{
MemoryLimitMiB: 123,
},
},
Expand All @@ -171,22 +116,14 @@ func TestUnmarshalConfigUnix(t *testing.T) {
assert.Equal(t,
&Config{
Protocols: Protocols{
GRPC: &configgrpc.GRPCServerSettings{
GRPC: configgrpc.GRPCServerSettings{
NetAddr: confignet.NetAddr{
Endpoint: "/tmp/grpc_otlp.sock",
Transport: "unix",
},
ReadBufferSize: 512 * 1024,
},
HTTP: &httpServerSettings{
HTTPServerSettings: &confighttp.HTTPServerSettings{
Endpoint: "/tmp/http_otlp.sock",
},
TracesURLPath: defaultTracesURLPath,
MetricsURLPath: defaultMetricsURLPath,
LogsURLPath: defaultLogsURLPath,
},
Arrow: &ArrowSettings{
Arrow: ArrowSettings{
MemoryLimitMiB: defaultMemoryLimitMiB,
},
},
Expand All @@ -209,47 +146,7 @@ func TestUnmarshalConfigInvalidProtocol(t *testing.T) {
assert.EqualError(t, component.UnmarshalConfig(cm, cfg), "1 error(s) decoding:\n\n* 'protocols' has invalid keys: thrift")
}

func TestUnmarshalConfigInvalidSignalPath(t *testing.T) {
tests := []struct {
name string
testDataFn string
}{
{
name: "Invalid traces URL path",
testDataFn: "invalid_traces_path.yaml",
},
{
name: "Invalid metrics URL path",
testDataFn: "invalid_metrics_path.yaml",
},
{
name: "Invalid logs URL path",
testDataFn: "invalid_logs_path.yaml",
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
cm, err := confmaptest.LoadConf(filepath.Join("testdata", test.testDataFn))
require.NoError(t, err)
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
assert.EqualError(t, component.UnmarshalConfig(cm, cfg), "invalid HTTP URL path set for signal: parse \":invalid\": missing protocol scheme")
})
}
}

func TestUnmarshalConfigNoProtocols(t *testing.T) {
cfg := Config{}
assert.EqualError(t, component.ValidateConfig(cfg), "must specify at least one protocol when using the OTel Arrow receiver")
}

func TestUnmarshalConfigNoGRPC(t *testing.T) {
cfg := Config{
Protocols: Protocols{
HTTP: &httpServerSettings{},
Arrow: &ArrowSettings{},
},
}
assert.EqualError(t, component.ValidateConfig(cfg), "must specify at gRPC protocol when using the OTLP Arrow receiver")
assert.NoError(t, component.ValidateConfig(cfg))
}
Loading

0 comments on commit 579abeb

Please sign in to comment.