Skip to content

Commit

Permalink
Backport test & lint fixes from opentelemetry-collector-contrib/31996 (
Browse files Browse the repository at this point in the history
…#190)

This addresses the test flake occasionally seen in this repository.
There was a channel being set to `nil` inappropriately, and uses of a
direct call to channel close() when the more appropriate pattern is to
cancel a context. This fixes the test flakes and lint problems from the
upstream repository in
open-telemetry/opentelemetry-collector-contrib#31996.
  • Loading branch information
jmacd authored May 10, 2024
1 parent 45e56f8 commit 0910113
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 46 deletions.
23 changes: 5 additions & 18 deletions collector/exporter/otelarrowexporter/internal/arrow/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"context"
"fmt"
"io"
"sync"

arrowpb "github.com/open-telemetry/otel-arrow/api/experimental/arrow/v1"
arrowCollectorMock "github.com/open-telemetry/otel-arrow/api/experimental/arrow/v1/mock"
Expand Down Expand Up @@ -160,7 +159,7 @@ func (ctc *commonTestCase) repeatedNewStream(nc func() testChannel) func(context
arrowpb.ArrowTracesService_ArrowTracesClient,
error,
) {
return func(ctx context.Context, opts ...grpc.CallOption) (
return func(ctx context.Context, _ ...grpc.CallOption) (
arrowpb.ArrowTracesService_ArrowTracesClient,
error,
) {
Expand All @@ -178,7 +177,6 @@ func (ctc *commonTestCase) repeatedNewStream(nc func() testChannel) func(context

// healthyTestChannel accepts the connection and returns an OK status immediately.
type healthyTestChannel struct {
lock sync.Mutex
sent chan *arrowpb.BatchArrowRecords
recv chan *arrowpb.BatchStatus
}
Expand All @@ -190,18 +188,7 @@ func newHealthyTestChannel() *healthyTestChannel {
}
}

func (tc *healthyTestChannel) doClose() {
tc.lock.Lock()
defer tc.lock.Unlock()
if tc.sent != nil {
close(tc.sent)
tc.sent = nil
}
}

func (tc *healthyTestChannel) sendChannel() chan *arrowpb.BatchArrowRecords {
tc.lock.Lock()
defer tc.lock.Unlock()
return tc.sent
}

Expand All @@ -211,7 +198,7 @@ func (tc *healthyTestChannel) onConnect(_ context.Context) error {

func (tc *healthyTestChannel) onCloseSend() func() error {
return func() error {
tc.doClose()
close(tc.sent)
return nil
}
}
Expand Down Expand Up @@ -265,7 +252,7 @@ func (tc *unresponsiveTestChannel) onCloseSend() func() error {
}

func (tc *unresponsiveTestChannel) onSend(ctx context.Context) func(*arrowpb.BatchArrowRecords) error {
return func(req *arrowpb.BatchArrowRecords) error {
return func(_ *arrowpb.BatchArrowRecords) error {
select {
case <-ctx.Done():
return ctx.Err()
Expand Down Expand Up @@ -313,7 +300,7 @@ func (tc *arrowUnsupportedTestChannel) onCloseSend() func() error {
}

func (tc *arrowUnsupportedTestChannel) onSend(ctx context.Context) func(*arrowpb.BatchArrowRecords) error {
return func(req *arrowpb.BatchArrowRecords) error {
return func(_ *arrowpb.BatchArrowRecords) error {
<-ctx.Done()
return ctx.Err()
}
Expand Down Expand Up @@ -346,7 +333,7 @@ func (tc *disconnectedTestChannel) onCloseSend() func() error {
}

func (tc *disconnectedTestChannel) onSend(_ context.Context) func(*arrowpb.BatchArrowRecords) error {
return func(req *arrowpb.BatchArrowRecords) error {
return func(_ *arrowpb.BatchArrowRecords) error {
panic("unreachable")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func newSingleStreamDowngradeDisabledTestCase(t *testing.T, pname PrioritizerNam

func newSingleStreamMetadataTestCase(t *testing.T) *exporterTestCase {
var count int
return newExporterTestCaseCommon(t, DefaultPrioritizer, NotNoisy, defaultMaxStreamLifetime, 1, false, func(ctx context.Context) (map[string]string, error) {
return newExporterTestCaseCommon(t, DefaultPrioritizer, NotNoisy, defaultMaxStreamLifetime, 1, false, func(_ context.Context) (map[string]string, error) {
defer func() { count++ }()
if count%2 == 0 {
return nil, nil
Expand Down Expand Up @@ -512,8 +512,10 @@ func TestArrowExporterStreaming(t *testing.T) {

tc.traceCall.AnyTimes().DoAndReturn(tc.returnNewStream(channel))

bg := context.Background()
require.NoError(t, tc.exporter.Start(bg))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

require.NoError(t, tc.exporter.Start(ctx))

var expectOutput []ptrace.Traces
var actualOutput []ptrace.Traces
Expand All @@ -534,23 +536,21 @@ func TestArrowExporterStreaming(t *testing.T) {

for times := 0; times < 10; times++ {
input := testdata.GenerateTraces(2)
ctx := context.Background()

sent, err := tc.exporter.SendAndWait(ctx, input)
sent, err := tc.exporter.SendAndWait(context.Background(), input)
require.NoError(t, err)
require.True(t, sent)

expectOutput = append(expectOutput, input)
}
// Stop the test conduit started above. If the sender were
// still sending, the test would panic on a closed channel.
channel.doClose()
// Stop the test conduit started above.
cancel()
wg.Wait()

// As this equality check doesn't support out of order slices,
// we sort the slices directly in the GenerateTraces function.
require.Equal(t, expectOutput, actualOutput)
require.NoError(t, tc.exporter.Shutdown(bg))
require.NoError(t, tc.exporter.Shutdown(ctx))
})
}
}
Expand All @@ -562,8 +562,9 @@ func TestArrowExporterHeaders(t *testing.T) {

tc.traceCall.AnyTimes().DoAndReturn(tc.returnNewStream(channel))

bg := context.Background()
require.NoError(t, tc.exporter.Start(bg))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
require.NoError(t, tc.exporter.Start(ctx))

var expectOutput []metadata.MD
var actualOutput []metadata.MD
Expand Down Expand Up @@ -591,7 +592,6 @@ func TestArrowExporterHeaders(t *testing.T) {

for times := 0; times < 10; times++ {
input := testdata.GenerateTraces(2)
ctx := context.Background()

if times%2 == 1 {
md := metadata.MD{
Expand All @@ -606,17 +606,16 @@ func TestArrowExporterHeaders(t *testing.T) {
})
}

sent, err := tc.exporter.SendAndWait(ctx, input)
sent, err := tc.exporter.SendAndWait(context.Background(), input)
require.NoError(t, err)
require.True(t, sent)
}
// Stop the test conduit started above. If the sender were
// still sending, the test would panic on a closed channel.
channel.doClose()
// Stop the test conduit started above.
cancel()
wg.Wait()

require.Equal(t, expectOutput, actualOutput)
require.NoError(t, tc.exporter.Shutdown(bg))
require.NoError(t, tc.exporter.Shutdown(ctx))
}

// TestArrowExporterIsTraced tests whether trace and span ID are
Expand All @@ -631,8 +630,8 @@ func TestArrowExporterIsTraced(t *testing.T) {

tc.traceCall.AnyTimes().DoAndReturn(tc.returnNewStream(channel))

bg := context.Background()
require.NoError(t, tc.exporter.Start(bg))
ctx, cancel := context.WithCancel(context.Background())
require.NoError(t, tc.exporter.Start(ctx))

var expectOutput []metadata.MD
var actualOutput []metadata.MD
Expand Down Expand Up @@ -660,17 +659,17 @@ func TestArrowExporterIsTraced(t *testing.T) {

for times := 0; times < 10; times++ {
input := testdata.GenerateTraces(2)
ctx := context.Background()
callCtx := context.Background()

if times%2 == 1 {
ctx = trace.ContextWithSpanContext(ctx,
callCtx = trace.ContextWithSpanContext(callCtx,
trace.NewSpanContext(trace.SpanContextConfig{
TraceID: [16]byte{byte(times), 1, 2, 3, 4, 5, 6, 7, 8, 9, 0xa, 0xb, 0xc, 0xd, 0xe, 0xf},
SpanID: [8]byte{byte(times), 1, 2, 3, 4, 5, 6, 7},
}),
)
expectMap := map[string]string{}
propagation.TraceContext{}.Inject(ctx, propagation.MapCarrier(expectMap))
propagation.TraceContext{}.Inject(callCtx, propagation.MapCarrier(expectMap))

md := metadata.MD{
"traceparent": []string{expectMap["traceparent"]},
Expand All @@ -683,17 +682,16 @@ func TestArrowExporterIsTraced(t *testing.T) {
})
}

sent, err := tc.exporter.SendAndWait(ctx, input)
sent, err := tc.exporter.SendAndWait(callCtx, input)
require.NoError(t, err)
require.True(t, sent)
}
// Stop the test conduit started above. If the sender were
// still sending, the test would panic on a closed channel.
channel.doClose()
// Stop the test conduit started above.
cancel()
wg.Wait()

require.Equal(t, expectOutput, actualOutput)
require.NoError(t, tc.exporter.Shutdown(bg))
require.NoError(t, tc.exporter.Shutdown(ctx))
})
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func (s *Stream) run(ctx context.Context, dc doneCancel, streamClient StreamClie
// performs a blocking send(). This returns when the data is in the write buffer,
// the caller waiting on its error channel.
func (s *Stream) write(ctx context.Context) (retErr error) {
// always close send()
// always close the send channel when this function returns.
defer func() { _ = s.client.CloseSend() }()

// headers are encoding using hpack, reusing a buffer on each call.
Expand Down

0 comments on commit 0910113

Please sign in to comment.