Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[jaegerreceiver] Remove unused remote sampling server #36971

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions receiver/jaegerreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ type RemoteSamplingConfig struct {

// Protocols is the configuration for the supported protocols.
type Protocols struct {
GRPC *configgrpc.ServerConfig `mapstructure:"grpc"`
ThriftHTTP *confighttp.ServerConfig `mapstructure:"thrift_http"`
ThriftBinary *ProtocolUDP `mapstructure:"thrift_binary"`
ThriftCompact *ProtocolUDP `mapstructure:"thrift_compact"`
GRPC *configgrpc.ServerConfig `mapstructure:"grpc"`
ThriftHTTP *confighttp.ServerConfig `mapstructure:"thrift_http"`
ThriftBinaryUDP *ProtocolUDP `mapstructure:"thrift_binary"`
ThriftCompactUDP *ProtocolUDP `mapstructure:"thrift_compact"`
}

// ProtocolUDP is the configuration for a UDP protocol.
Expand Down Expand Up @@ -82,8 +82,8 @@ var (
func (cfg *Config) Validate() error {
if cfg.GRPC == nil &&
cfg.ThriftHTTP == nil &&
cfg.ThriftBinary == nil &&
cfg.ThriftCompact == nil {
cfg.ThriftBinaryUDP == nil &&
cfg.ThriftCompactUDP == nil {
return errors.New("must specify at least one protocol when using the Jaeger receiver")
}

Expand All @@ -99,14 +99,14 @@ func (cfg *Config) Validate() error {
}
}

if cfg.ThriftBinary != nil {
if err := checkPortFromEndpoint(cfg.ThriftBinary.Endpoint); err != nil {
if cfg.ThriftBinaryUDP != nil {
if err := checkPortFromEndpoint(cfg.ThriftBinaryUDP.Endpoint); err != nil {
return fmt.Errorf("invalid port number for the Thrift UDP Binary endpoint: %w", err)
}
}

if cfg.ThriftCompact != nil {
if err := checkPortFromEndpoint(cfg.ThriftCompact.Endpoint); err != nil {
if cfg.ThriftCompactUDP != nil {
if err := checkPortFromEndpoint(cfg.ThriftCompactUDP.Endpoint); err != nil {
return fmt.Errorf("invalid port number for the Thrift UDP Compact endpoint: %w", err)
}
}
Expand Down Expand Up @@ -145,10 +145,10 @@ func (cfg *Config) Unmarshal(componentParser *confmap.Conf) error {
cfg.ThriftHTTP = nil
}
if !protocols.IsSet(protoThriftBinary) {
cfg.ThriftBinary = nil
cfg.ThriftBinaryUDP = nil
}
if !protocols.IsSet(protoThriftCompact) {
cfg.ThriftCompact = nil
cfg.ThriftCompactUDP = nil
}

return nil
Expand Down
16 changes: 8 additions & 8 deletions receiver/jaegerreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestLoadConfig(t *testing.T) {
ThriftHTTP: &confighttp.ServerConfig{
Endpoint: ":3456",
},
ThriftCompact: &ProtocolUDP{
ThriftCompactUDP: &ProtocolUDP{
Endpoint: "0.0.0.0:456",
ServerConfigUDP: ServerConfigUDP{
QueueSize: 100_000,
Expand All @@ -51,7 +51,7 @@ func TestLoadConfig(t *testing.T) {
SocketBufferSize: 65_536,
},
},
ThriftBinary: &ProtocolUDP{
ThriftBinaryUDP: &ProtocolUDP{
Endpoint: "0.0.0.0:789",
ServerConfigUDP: ServerConfigUDP{
QueueSize: 1_000,
Expand All @@ -76,11 +76,11 @@ func TestLoadConfig(t *testing.T) {
ThriftHTTP: &confighttp.ServerConfig{
Endpoint: "localhost:14268",
},
ThriftCompact: &ProtocolUDP{
ThriftCompactUDP: &ProtocolUDP{
Endpoint: "localhost:6831",
ServerConfigUDP: defaultServerConfigUDP(),
},
ThriftBinary: &ProtocolUDP{
ThriftBinaryUDP: &ProtocolUDP{
Endpoint: "localhost:6832",
ServerConfigUDP: defaultServerConfigUDP(),
},
Expand All @@ -97,7 +97,7 @@ func TestLoadConfig(t *testing.T) {
Transport: confignet.TransportTypeTCP,
},
},
ThriftCompact: &ProtocolUDP{
ThriftCompactUDP: &ProtocolUDP{
Endpoint: "localhost:6831",
ServerConfigUDP: defaultServerConfigUDP(),
},
Expand Down Expand Up @@ -183,7 +183,7 @@ func TestInvalidConfig(t *testing.T) {
{
desc: "thrift-udp-compact-no-port",
apply: func(cfg *Config) {
cfg.ThriftCompact = &ProtocolUDP{
cfg.ThriftCompactUDP = &ProtocolUDP{
Endpoint: "localhost:",
}
},
Expand All @@ -192,7 +192,7 @@ func TestInvalidConfig(t *testing.T) {
{
desc: "thrift-udp-binary-no-port",
apply: func(cfg *Config) {
cfg.ThriftBinary = &ProtocolUDP{
cfg.ThriftBinaryUDP = &ProtocolUDP{
Endpoint: "localhost:",
}
},
Expand Down Expand Up @@ -220,7 +220,7 @@ func TestInvalidConfig(t *testing.T) {
{
desc: "port-outside-of-range",
apply: func(cfg *Config) {
cfg.ThriftBinary = &ProtocolUDP{
cfg.ThriftBinaryUDP = &ProtocolUDP{
Endpoint: "localhost:65536",
}
},
Expand Down
24 changes: 3 additions & 21 deletions receiver/jaegerreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,11 @@ func createDefaultConfig() component.Config {
ThriftHTTP: &confighttp.ServerConfig{
Endpoint: testutil.EndpointForPort(defaultHTTPPort),
},
ThriftBinary: &ProtocolUDP{
ThriftBinaryUDP: &ProtocolUDP{
Endpoint: testutil.EndpointForPort(defaultThriftBinaryPort),
ServerConfigUDP: defaultServerConfigUDP(),
},
ThriftCompact: &ProtocolUDP{
ThriftCompactUDP: &ProtocolUDP{
Endpoint: testutil.EndpointForPort(defaultThriftCompactPort),
ServerConfigUDP: defaultServerConfigUDP(),
},
Expand All @@ -86,28 +86,10 @@ func createTracesReceiver(

rCfg := cfg.(*Config)

var config configuration
// Set ports
if rCfg.Protocols.GRPC != nil {
config.GRPCServerConfig = *rCfg.Protocols.GRPC
}

if rCfg.Protocols.ThriftHTTP != nil {
config.HTTPServerConfig = *rCfg.ThriftHTTP
}

if rCfg.Protocols.ThriftBinary != nil {
config.AgentBinaryThrift = *rCfg.ThriftBinary
}

if rCfg.Protocols.ThriftCompact != nil {
config.AgentCompactThrift = *rCfg.ThriftCompact
}

if rCfg.RemoteSampling != nil {
set.Logger.Warn("You are using a deprecated no-op `remote_sampling` option which will be removed soon; use a `jaegerremotesampling` extension instead")
}

// Create the receiver.
return newJaegerReceiver(set.ID, &config, nextConsumer, set)
return newJaegerReceiver(set.ID, rCfg.Protocols, nextConsumer, set)
}
12 changes: 6 additions & 6 deletions receiver/jaegerreceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func TestCreateDefaultGRPCEndpoint(t *testing.T) {
r, err := factory.CreateTraces(context.Background(), set, cfg, nil)

assert.NoError(t, err, "unexpected error creating receiver")
assert.Equal(t, "0.0.0.0:14250", r.(*jReceiver).config.GRPCServerConfig.NetAddr.Endpoint, "grpc port should be default")
assert.Equal(t, "0.0.0.0:14250", r.(*jReceiver).config.GRPC.NetAddr.Endpoint, "grpc port should be default")
}

func TestCreateTLSGPRCEndpoint(t *testing.T) {
Expand Down Expand Up @@ -144,33 +144,33 @@ func TestCreateInvalidHTTPEndpoint(t *testing.T) {
r, err := factory.CreateTraces(context.Background(), set, cfg, nil)

assert.NoError(t, err, "unexpected error creating receiver")
assert.Equal(t, "localhost:14268", r.(*jReceiver).config.HTTPServerConfig.Endpoint, "http port should be default")
assert.Equal(t, "localhost:14268", r.(*jReceiver).config.ThriftHTTP.Endpoint, "http port should be default")
}

func TestCreateInvalidThriftBinaryEndpoint(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()

cfg.(*Config).Protocols.ThriftBinary = &ProtocolUDP{
cfg.(*Config).Protocols.ThriftBinaryUDP = &ProtocolUDP{
Endpoint: "0.0.0.0:6832",
}
set := receivertest.NewNopSettings()
r, err := factory.CreateTraces(context.Background(), set, cfg, nil)

assert.NoError(t, err, "unexpected error creating receiver")
assert.Equal(t, "0.0.0.0:6832", r.(*jReceiver).config.AgentBinaryThrift.Endpoint, "thrift port should be default")
assert.Equal(t, "0.0.0.0:6832", r.(*jReceiver).config.ThriftBinaryUDP.Endpoint, "thrift port should be default")
}

func TestCreateInvalidThriftCompactEndpoint(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()

cfg.(*Config).Protocols.ThriftCompact = &ProtocolUDP{
cfg.(*Config).Protocols.ThriftCompactUDP = &ProtocolUDP{
Endpoint: "0.0.0.0:6831",
}
set := receivertest.NewNopSettings()
r, err := factory.CreateTraces(context.Background(), set, cfg, nil)

assert.NoError(t, err, "unexpected error creating receiver")
assert.Equal(t, "0.0.0.0:6831", r.(*jReceiver).config.AgentCompactThrift.Endpoint, "thrift port should be default")
assert.Equal(t, "0.0.0.0:6831", r.(*jReceiver).config.ThriftCompactUDP.Endpoint, "thrift port should be default")
}
83 changes: 13 additions & 70 deletions receiver/jaegerreceiver/jaeger_agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,14 @@ package jaegerreceiver

import (
"context"
"fmt"
"net"
"net/http"
"testing"
"time"

"github.com/apache/thrift/lib/go/thrift"
"github.com/jaegertracing/jaeger/cmd/agent/app/servers/thriftudp"
"github.com/jaegertracing/jaeger/model"
jaegerconvert "github.com/jaegertracing/jaeger/model/converter/thrift/jaeger"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
"github.com/jaegertracing/jaeger/thrift-gen/agent"
jaegerthrift "github.com/jaegertracing/jaeger/thrift-gen/jaeger"
"github.com/stretchr/testify/assert"
Expand All @@ -26,7 +23,6 @@ import (
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/receiver/receivertest"
conventions "go.opentelemetry.io/collector/semconv/v1.27.0"
"google.golang.org/grpc"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger"
Expand All @@ -37,17 +33,17 @@ var jaegerAgent = component.NewIDWithName(metadata.Type, "agent_test")

func TestJaegerAgentUDP_ThriftCompact(t *testing.T) {
addr := testutil.GetAvailableLocalAddress(t)
testJaegerAgent(t, addr, &configuration{
AgentCompactThrift: ProtocolUDP{
testJaegerAgent(t, addr, Protocols{
ThriftCompactUDP: &ProtocolUDP{
Endpoint: addr,
ServerConfigUDP: defaultServerConfigUDP(),
},
})
}

func TestJaegerAgentUDP_ThriftCompact_InvalidPort(t *testing.T) {
config := &configuration{
AgentCompactThrift: ProtocolUDP{
config := Protocols{
ThriftCompactUDP: &ProtocolUDP{
Endpoint: "0.0.0.0:999999",
ServerConfigUDP: defaultServerConfigUDP(),
},
Expand All @@ -63,8 +59,8 @@ func TestJaegerAgentUDP_ThriftCompact_InvalidPort(t *testing.T) {

func TestJaegerAgentUDP_ThriftBinary(t *testing.T) {
addr := testutil.GetAvailableLocalAddress(t)
testJaegerAgent(t, addr, &configuration{
AgentBinaryThrift: ProtocolUDP{
testJaegerAgent(t, addr, Protocols{
ThriftBinaryUDP: &ProtocolUDP{
Endpoint: addr,
ServerConfigUDP: defaultServerConfigUDP(),
},
Expand All @@ -75,8 +71,8 @@ func TestJaegerAgentUDP_ThriftBinary_PortInUse(t *testing.T) {
// This test confirms that the thrift binary port is opened correctly. This is all we can test at the moment. See above.
addr := testutil.GetAvailableLocalAddress(t)

config := &configuration{
AgentBinaryThrift: ProtocolUDP{
config := Protocols{
ThriftBinaryUDP: &ProtocolUDP{
Endpoint: addr,
ServerConfigUDP: defaultServerConfigUDP(),
},
Expand All @@ -85,7 +81,7 @@ func TestJaegerAgentUDP_ThriftBinary_PortInUse(t *testing.T) {
jr, err := newJaegerReceiver(jaegerAgent, config, nil, set)
require.NoError(t, err)

assert.NoError(t, jr.startAgent(componenttest.NewNopHost()), "Start failed")
assert.NoError(t, jr.startAgent(), "Start failed")
t.Cleanup(func() { require.NoError(t, jr.Shutdown(context.Background())) })

l, err := net.Listen("udp", addr)
Expand All @@ -97,8 +93,8 @@ func TestJaegerAgentUDP_ThriftBinary_PortInUse(t *testing.T) {
}

func TestJaegerAgentUDP_ThriftBinary_InvalidPort(t *testing.T) {
config := &configuration{
AgentBinaryThrift: ProtocolUDP{
config := Protocols{
ThriftBinaryUDP: &ProtocolUDP{
Endpoint: "0.0.0.0:999999",
ServerConfigUDP: defaultServerConfigUDP(),
},
Expand All @@ -112,60 +108,7 @@ func TestJaegerAgentUDP_ThriftBinary_InvalidPort(t *testing.T) {
require.NoError(t, jr.Shutdown(context.Background()))
}

func initializeGRPCTestServer(t *testing.T, beforeServe func(server *grpc.Server), opts ...grpc.ServerOption) (*grpc.Server, net.Addr) {
server := grpc.NewServer(opts...)
lis, err := net.Listen("tcp", "localhost:0")
require.NoError(t, err)
beforeServe(server)
go func() {
err := server.Serve(lis)
assert.NoError(t, err)
}()
return server, lis.Addr()
}

type mockSamplingHandler struct{}

func (*mockSamplingHandler) GetSamplingStrategy(context.Context, *api_v2.SamplingStrategyParameters) (*api_v2.SamplingStrategyResponse, error) {
return &api_v2.SamplingStrategyResponse{StrategyType: api_v2.SamplingStrategyType_PROBABILISTIC}, nil
}

func TestJaegerHTTP(t *testing.T) {
s, _ := initializeGRPCTestServer(t, func(s *grpc.Server) {
api_v2.RegisterSamplingManagerServer(s, &mockSamplingHandler{})
})
defer s.GracefulStop()

endpoint := testutil.GetAvailableLocalAddress(t)
config := &configuration{
AgentHTTPEndpoint: endpoint,
}
set := receivertest.NewNopSettings()
jr, err := newJaegerReceiver(jaegerAgent, config, nil, set)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, jr.Shutdown(context.Background())) })

assert.NoError(t, jr.Start(context.Background(), componenttest.NewNopHost()), "Start failed")

// allow http server to start
assert.Eventually(t, func() bool {
var conn net.Conn
conn, err = net.Dial("tcp", endpoint)
if err == nil && conn != nil {
conn.Close()
return true
}
return false
}, 10*time.Second, 5*time.Millisecond, "failed to wait for the port to be open")

resp, err := http.Get(fmt.Sprintf("http://%s/sampling?service=test", endpoint))
assert.NoError(t, err, "should not have failed to make request")
assert.NotNil(t, resp)
defer resp.Body.Close()
assert.Equal(t, 500, resp.StatusCode, "should have returned 200")
}

func testJaegerAgent(t *testing.T, agentEndpoint string, receiverConfig *configuration) {
func testJaegerAgent(t *testing.T, agentEndpoint string, receiverConfig Protocols) {
// 1. Create the Jaeger receiver aka "server"
sink := new(consumertest.TracesSink)
set := receivertest.NewNopSettings()
Expand All @@ -184,7 +127,7 @@ func testJaegerAgent(t *testing.T, agentEndpoint string, receiverConfig *configu
require.NoError(t, err, "Start failed")

// 2. Then send spans to the Jaeger receiver.
jexp, err := newClientUDP(agentEndpoint, jr.config.AgentBinaryThrift.Endpoint != "")
jexp, err := newClientUDP(agentEndpoint, jr.config.ThriftBinaryUDP != nil)
require.NoError(t, err, "Failed to create the Jaeger OpenTelemetry exporter for the live application")

// 3. Now finally send some spans
Expand Down
Loading
Loading