Skip to content

Commit

Permalink
Fix missing maxStreamLifetime timeout (#222)
Browse files Browse the repository at this point in the history
Fix missing maxStreamLifetime by checking
`stream.workState.maxStreamLifetime` instead of
`stream.maxStreamLifetime`, because `stream.maxStreamLifetime` is never
set anywhere.

This PR also fixes missing `otel_arrow_stream_recv` parent spans in
recently observed traces.
  • Loading branch information
moh-osman3 authored Jun 12, 2024
1 parent 327c826 commit 21add9a
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 11 deletions.
10 changes: 2 additions & 8 deletions collector/exporter/otelarrowexporter/internal/arrow/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,6 @@ import (

// Stream is 1:1 with gRPC stream.
type Stream struct {
// maxStreamLifetime is the max timeout before stream
// should be closed on the client side. This ensures a
// graceful shutdown before max_connection_age is reached
// on the server side.
maxStreamLifetime time.Duration

// producer is exclusive to the holder of the stream.
producer arrowRecord.ProducerAPI

Expand Down Expand Up @@ -257,8 +251,8 @@ func (s *Stream) write(ctx context.Context) (retErr error) {
hdrsEnc := hpack.NewEncoder(&hdrsBuf)

var timerCh <-chan time.Time
if s.maxStreamLifetime != 0 {
timer := time.NewTimer(s.maxStreamLifetime)
if s.workState.maxStreamLifetime != 0 {
timer := time.NewTimer(s.workState.maxStreamLifetime)
timerCh = timer.C
defer timer.Stop()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ func newStreamTestCase(t *testing.T, pname PrioritizerName) *streamTestCase {
ctc.requestMetadataCall.AnyTimes().Return(nil, nil)

stream := newStream(producer, prio, ctc.telset, netstats.Noop{}, state[0])
stream.maxStreamLifetime = 10 * time.Second

fromTracesCall := producer.EXPECT().BatchArrowRecordsFromTraces(gomock.Any()).Times(0)
fromMetricsCall := producer.EXPECT().BatchArrowRecordsFromMetrics(gomock.Any()).Times(0)
Expand Down Expand Up @@ -144,7 +143,6 @@ func TestStreamNoMaxLifetime(t *testing.T) {
t.Run(string(pname), func(t *testing.T) {

tc := newStreamTestCase(t, pname)
tc.stream.maxStreamLifetime = 0

tc.fromTracesCall.Times(1).Return(oneBatch, nil)
tc.closeSendCall.Times(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ func (r *receiverStream) recvOne(streamCtx context.Context, serverStream anyStre
defer flight.recvDone(inflightCtx, &retErr)

// this span is a child of the inflight, covering the Arrow decode, Auth, etc.
_, span := r.tracer.Start(inflightCtx, "otel_arrow_stream_recv")
inflightCtx, span := r.tracer.Start(inflightCtx, "otel_arrow_stream_recv")
defer span.End()

if err != nil {
Expand Down

0 comments on commit 21add9a

Please sign in to comment.