Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[otelarrowreceiver] Avoid one unnecessary span error at stream EOF #34175

Merged
merged 11 commits into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/otelarrow-spanerrors.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement
jmacd marked this conversation as resolved.
Show resolved Hide resolved

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: otelarrowreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Eliminate one spurious span error.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [34175]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
126 changes: 119 additions & 7 deletions internal/otelarrow/test/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/consumertest"
Expand All @@ -27,6 +28,9 @@ import (
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp"
"go.opentelemetry.io/collector/receiver"
otelcodes "go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest/observer"
Expand Down Expand Up @@ -57,6 +61,9 @@ type testConsumer struct {
sink consumertest.TracesSink
recvLogs *observer.ObservedLogs
expLogs *observer.ObservedLogs

recvSpans *tracetest.InMemoryExporter
expSpans *tracetest.InMemoryExporter
}

var _ consumer.Traces = &testConsumer{}
Expand All @@ -66,7 +73,7 @@ type RecvConfig = otelarrowreceiver.Config
type CfgFunc func(*ExpConfig, *RecvConfig)
type GenFunc func(int) ptrace.Traces
type MkGen func() GenFunc
type EndFunc func(t *testing.T, tp testParams, testCon *testConsumer, expect [][]ptrace.Traces)
type EndFunc func(t *testing.T, tp testParams, testCon *testConsumer, expect [][]ptrace.Traces) (rops, eops map[string]int)
type ConsumerErrFunc func(t *testing.T, err error)

func (*testConsumer) Capabilities() consumer.Capabilities {
Expand All @@ -78,18 +85,21 @@ func (tc *testConsumer) ConsumeTraces(ctx context.Context, td ptrace.Traces) err
return tc.sink.ConsumeTraces(ctx, td)
}

func testLoggerSettings(_ *testing.T) (component.TelemetrySettings, *observer.ObservedLogs) {
func testLoggerSettings(_ *testing.T) (component.TelemetrySettings, *observer.ObservedLogs, *tracetest.InMemoryExporter) {
tset := componenttest.NewNopTelemetrySettings()

core, obslogs := observer.New(zapcore.InfoLevel)

exp := tracetest.NewInMemoryExporter()

// Note: if you want to see these logs in development, use:
// tset.Logger = zap.New(zapcore.NewTee(core, zaptest.NewLogger(t).Core()))
// Also see failureMemoryLimitEnding() for explicit tests based on the
// logs observer.
tset.Logger = zap.New(core)
tset.TracerProvider = trace.NewTracerProvider(trace.WithSyncer(exp))

return tset, obslogs
return tset, obslogs, exp
}

func basicTestConfig(t *testing.T, cfgF CfgFunc) (*testConsumer, exporter.Traces, receiver.Traces) {
Expand Down Expand Up @@ -119,12 +129,15 @@ func basicTestConfig(t *testing.T, cfgF CfgFunc) (*testConsumer, exporter.Traces
cfgF(exporterCfg, receiverCfg)
}

expTset, expLogs := testLoggerSettings(t)
recvTset, recvLogs := testLoggerSettings(t)
expTset, expLogs, expSpans := testLoggerSettings(t)
recvTset, recvLogs, recvSpans := testLoggerSettings(t)

testCon := &testConsumer{
recvLogs: recvLogs,
expLogs: expLogs,

recvSpans: recvSpans,
expSpans: expSpans,
}

receiver, err := rfact.CreateTracesReceiver(ctx, receiver.Settings{
Expand Down Expand Up @@ -254,7 +267,7 @@ func bulkyGenFunc() MkGen {

}

func standardEnding(t *testing.T, tp testParams, testCon *testConsumer, expect [][]ptrace.Traces) {
func standardEnding(t *testing.T, tp testParams, testCon *testConsumer, expect [][]ptrace.Traces) (rops, eops map[string]int) {
// Check for matching request count and data
require.Equal(t, tp.requestCount*tp.threadCount, testCon.sink.SpanCount())

Expand All @@ -271,6 +284,28 @@ func standardEnding(t *testing.T, tp testParams, testCon *testConsumer, expect [
}
asserter := assert.NewStdUnitTest(t)
assert.Equiv(asserter, expectJSON, receivedJSON)

rops = map[string]int{}
eops = map[string]int{}

for _, span := range testCon.expSpans.GetSpans() {
eops[fmt.Sprintf("%v/%v", span.Name, span.Status.Code)]++

// This span has a recognized span error which we can't easily fix. See
// https://github.com/open-telemetry/opentelemetry-go-contrib/issues/2644
if span.Name == "opentelemetry.proto.experimental.arrow.v1.ArrowTracesService/ArrowTraces" {
continue
}

require.NotEqual(t, otelcodes.Error, span.Status.Code,
"Exporter span has error: %v: %v", span.Name, span.Status.Description)
}
for _, span := range testCon.recvSpans.GetSpans() {
rops[fmt.Sprintf("%v/%v", span.Name, span.Status.Code)]++
require.NotEqual(t, otelcodes.Error, span.Status.Code,
"Receiver span has error: %v: %v", span.Name, span.Status.Description)
}
return rops, eops
}

// logSigs computes a signature of a structured log message emitted by
Expand Down Expand Up @@ -311,7 +346,7 @@ func countMemoryLimitErrors(msgs []string) (cnt int) {
return
}

func failureMemoryLimitEnding(t *testing.T, _ testParams, testCon *testConsumer, _ [][]ptrace.Traces) {
func failureMemoryLimitEnding(t *testing.T, _ testParams, testCon *testConsumer, _ [][]ptrace.Traces) (rops, eops map[string]int) {
require.Equal(t, 0, testCon.sink.SpanCount())

eSigs, eMsgs := logSigs(testCon.expLogs)
Expand All @@ -326,6 +361,8 @@ func failureMemoryLimitEnding(t *testing.T, _ testParams, testCon *testConsumer,

require.Less(t, 0, countMemoryLimitErrors(rMsgs), "should have memory limit errors: %v", rMsgs)
require.Less(t, 0, countMemoryLimitErrors(eMsgs), "should have memory limit errors: %v", eMsgs)

return nil, nil
}

func consumerSuccess(t *testing.T, err error) {
Expand Down Expand Up @@ -376,3 +413,78 @@ func TestIntegrationMemoryLimited(t *testing.T) {
ecfg.TimeoutSettings.Timeout = 5 * time.Second
}, bulkyGenFunc(), consumerFailure, failureMemoryLimitEnding)
}

func multiStreamEnding(t *testing.T, p testParams, testCon *testConsumer, td [][]ptrace.Traces) (_, _ map[string]int) {
recvOps, expOps := standardEnding(t, p, testCon, td)

const streamName = "opentelemetry.proto.experimental.arrow.v1.ArrowTracesService/ArrowTraces"

total := p.threadCount * p.requestCount

// Exporter spans:
//
// This span is the Arrow gRPC client stream. Should have no
// stream errors, > 1 streams.
expStreamsUnset := expOps[streamName+"/Unset"]
expStreamsError := expOps[streamName+"/Error"]
require.Less(t, 1, expStreamsUnset+expStreamsError)
require.Equal(t, 1, expStreamsError)

// Number of export requests: exact match. This is the
// exporterhelper's base span.
require.Equal(t, total, expOps["exporter/otelarrowexporter/traces/Unset"])

// Number of export requests: exact match. This span covers
// handling one request in the Arrow exporter.
require.Equal(t, total, expOps["otel_arrow_stream_send/Unset"])

// Receiver spans
//
// This span is the Arrow gRPC server stream, instrumented by
// otelgrpc. Because of
// https://github.com/open-telemetry/opentelemetry-go-contrib/issues/2644
// we expect either an error or unset. There should be > 1
// streams.
recvStreamsUnset := recvOps[streamName+"/Unset"]
recvStreamsError := recvOps[streamName+"/Error"]
require.Equal(t, 0, recvStreamsError)
require.Less(t, 1, recvStreamsUnset+recvStreamsError)

// For each stream, there is one Recv() span at the end that ends
// in cancelation (or EOF). So we expect total to be less than
// this span count.
require.Equal(t, total+recvStreamsUnset+recvStreamsError, recvOps["otel_arrow_stream_inflight/Unset"])

// This is in request context, the Arrow stream handling one request.
require.Equal(t, total, recvOps["otel_arrow_stream_recv/Unset"])

// This is in request context, the receiverhelper's per-request span.
require.Equal(t, total, recvOps["receiver/otelarrowreceiver/TraceDataReceived/Unset"])

// Exporter and Receiver stream span counts match:
require.Equal(t, expStreamsUnset+expStreamsError, recvStreamsUnset+recvStreamsError)

return recvOps, expOps
}

func TestIntegrationSelfTracing(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

params := memoryLimitParams
params.requestCount = 1000
testIntegrationTraces(ctx, t, params, func(ecfg *ExpConfig, rcfg *RecvConfig) {
rcfg.Arrow.MemoryLimitMiB = 1
rcfg.Protocols.GRPC.Keepalive = &configgrpc.KeepaliveServerConfig{
ServerParameters: &configgrpc.KeepaliveServerParameters{
MaxConnectionAge: time.Second,
MaxConnectionAgeGrace: 5 * time.Second,
},
}

ecfg.Arrow.NumStreams = 1
ecfg.Arrow.MaxStreamLifetime = 2 * time.Second
ecfg.TimeoutSettings.Timeout = 1 * time.Second

}, func() GenFunc { return makeTestTraces }, consumerSuccess, multiStreamEnding)
}
20 changes: 16 additions & 4 deletions receiver/otelarrowreceiver/internal/arrow/arrow.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,22 +266,25 @@ func (h *headerReceiver) newContext(ctx context.Context, hdrs map[string][]strin
}

// logStreamError decides how to log an error.
func (r *Receiver) logStreamError(err error, where string) {
func (r *Receiver) logStreamError(err error, where string) (otelcodes.Code, string) {
var code codes.Code
var msg string
// gRPC tends to supply status-wrapped errors, so we always
// unpack them. A wrapped Canceled code indicates intentional
// shutdown, which can be due to normal causes (EOF, e.g.,
// max-stream-lifetime reached) or unusual causes (Canceled,
// e.g., because the other stream direction reached an error).
occode := otelcodes.Unset
if status, ok := status.FromError(err); ok {
code = status.Code()
occode = otelcodes.Error
jmacd marked this conversation as resolved.
Show resolved Hide resolved
msg = status.Message()
} else if errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) {
code = codes.Canceled
msg = err.Error()
} else {
code = codes.Internal
occode = otelcodes.Error
msg = err.Error()
}

Expand All @@ -290,6 +293,8 @@ func (r *Receiver) logStreamError(err error, where string) {
} else {
r.telemetry.Logger.Error("arrow stream error", zap.Int("code", int(code)), zap.String("message", msg), zap.String("where", where))
}

return occode, msg
}

func gRPCName(desc grpc.ServiceDesc) string {
Expand Down Expand Up @@ -458,8 +463,8 @@ func (id *inFlightData) recvDone(ctx context.Context, recvErrPtr *error) {

if retErr != nil {
// logStreamError because this response will break the stream.
id.logStreamError(retErr, "recv")
id.span.SetStatus(otelcodes.Error, retErr.Error())
occode, msg := id.logStreamError(retErr, "recv")
id.span.SetStatus(occode, msg)
}

id.anyDone(ctx)
Expand Down Expand Up @@ -553,6 +558,13 @@ func (r *receiverStream) recvOne(streamCtx context.Context, serverStream anyStre
} else if errors.Is(err, context.Canceled) {
return status.Error(codes.Canceled, "server stream shutdown")
}
if status, ok := status.FromError(err); ok {
jmacd marked this conversation as resolved.
Show resolved Hide resolved
// This is a special case to avoid introducing a span error
// for a canceled operation.
if status.Code() == codes.Canceled {
return io.EOF
}
}
// Note: err is directly from gRPC, should already have status.
return err
}
Expand Down Expand Up @@ -700,7 +712,7 @@ func (r *receiverStream) sendOne(serverStream anyStreamServer, resp batchResp) e

if err := serverStream.Send(bs); err != nil {
// logStreamError because this response will break the stream.
r.logStreamError(err, "send")
_, _ = r.logStreamError(err, "send")
return err
}

Expand Down
Loading