Skip to content

Commit

Permalink
[otel-arrow/receiver] Receiver concurrency fixes; readability improve…
Browse files Browse the repository at this point in the history
…ments & restructuring (#205)

Restructure receiver code to improve readability. There are a number of
metrics that are incremented when a batch starts being processed and are
decremented when the batch is finished, but the control flow that
maintained the balance of these updates was convoluted.

The root-cause of #204 is that Arrow batches meant for a consumer to be
processed in order were processed out-of-order. There was a large
function body which served two purposes: consume Arrow data of the
appropriate kind, enter data for the pipeline to consume next. This had
to be split into two parts and should have been done as part of #181.
(I, as reviewer, missed this and find, in hindsight, that the code is
not easy to follow.)

This improves the code structure by moving all stateful aspects of
starting/finishing a request into a new `inFlightData` object which has
a deferrable method to finish the request. Here, we keep:

1. The `inFlightWG` done count
2. The active requests metric
3. The active items metric
4. The active bytes metric
5. The bytes-acquired from the semaphore
6. A per-request span covering Arrow decode
7. Netstat-related instrumentation

Authorization now happens before acquiring from the semaphore. 

A number of `fmt.Errorf()` calls are replaced with `status.Errorf(...)`
and a specific error code. The tests are updated to be more specific.
Several Arrow tests were accidentally canceling the test before an
expected error condition was actually tested, they have been audited and
improved.

One new concurrent-receiver test was added.

Fixes #204.
  • Loading branch information
jmacd authored May 31, 2024
1 parent f31d821 commit c5c9d7e
Show file tree
Hide file tree
Showing 9 changed files with 1,105 additions and 307 deletions.
610 changes: 360 additions & 250 deletions collector/receiver/otelarrowreceiver/internal/arrow/arrow.go

Large diffs are not rendered by default.

115 changes: 78 additions & 37 deletions collector/receiver/otelarrowreceiver/internal/arrow/arrow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"strings"
"sync"
"testing"
"time"

arrowpb "github.com/open-telemetry/otel-arrow/api/experimental/arrow/v1"
arrowCollectorMock "github.com/open-telemetry/otel-arrow/api/experimental/arrow/v1/mock"
Expand Down Expand Up @@ -48,7 +49,9 @@ import (
"github.com/open-telemetry/otel-arrow/collector/receiver/otelarrowreceiver/internal/arrow/mock"
)

var defaultBQ = admission.NewBoundedQueue(int64(100000), int64(10))
func defaultBQ() *admission.BoundedQueue {
return admission.NewBoundedQueue(int64(100000), int64(10))
}

type compareJSONTraces struct{ ptrace.Traces }
type compareJSONMetrics struct{ pmetric.Metrics }
Expand Down Expand Up @@ -106,7 +109,7 @@ func (healthyTestChannel) onConsume() error {
type unhealthyTestChannel struct{}

func (unhealthyTestChannel) onConsume() error {
return fmt.Errorf("consumer unhealthy")
return status.Errorf(codes.Unavailable, "consumer unhealthy")
}

type recvResult struct {
Expand Down Expand Up @@ -282,6 +285,14 @@ func statusInvalidFor(batchID int64, msg string) *arrowpb.BatchStatus {
}
}

func statusInternalFor(batchID int64, msg string) *arrowpb.BatchStatus {
return &arrowpb.BatchStatus{
BatchId: batchID,
StatusCode: arrowpb.StatusCode_INTERNAL,
StatusMessage: msg,
}
}

func statusExhaustedFor(batchID int64, msg string) *arrowpb.BatchStatus {
return &arrowpb.BatchStatus{
BatchId: batchID,
Expand All @@ -290,6 +301,14 @@ func statusExhaustedFor(batchID int64, msg string) *arrowpb.BatchStatus {
}
}

func statusUnauthenticatedFor(batchID int64, msg string) *arrowpb.BatchStatus {
return &arrowpb.BatchStatus{
BatchId: batchID,
StatusCode: arrowpb.StatusCode_INVALID_ARGUMENT,
StatusMessage: msg,
}
}

func (ctc *commonTestCase) newRealConsumer() arrowRecord.ConsumerAPI {
mock := arrowRecordMock.NewMockConsumerAPI(ctc.ctrl)
cons := arrowRecord.NewConsumer()
Expand Down Expand Up @@ -358,10 +377,26 @@ func (ctc *commonTestCase) start(newConsumer func() arrowRecord.ConsumerAPI, bq
}

func requireCanceledStatus(t *testing.T, err error) {
requireStatus(t, codes.Canceled, err)
}

func requireUnavailableStatus(t *testing.T, err error) {
requireStatus(t, codes.Unavailable, err)
}

func requireInternalStatus(t *testing.T, err error) {
requireStatus(t, codes.Internal, err)
}

func requireExhaustedStatus(t *testing.T, err error) {
requireStatus(t, codes.ResourceExhausted, err)
}

func requireStatus(t *testing.T, code codes.Code, err error) {
require.Error(t, err)
status, ok := status.FromError(err)
require.True(t, ok, "is status-wrapped %v", err)
require.Equal(t, codes.Canceled, status.Code())
require.Equal(t, code, status.Code())
}

func TestBoundedQueueWithPdataHeaders(t *testing.T) {
Expand Down Expand Up @@ -437,23 +472,21 @@ func TestBoundedQueueWithPdataHeaders(t *testing.T) {

var bq *admission.BoundedQueue
if tt.rejected {
ctc.stream.EXPECT().Send(statusUnavailableFor(batch.BatchId, "rejecting request, request size larger than configured limit")).Times(1).Return(fmt.Errorf("rejecting request, request size larger than configured limit"))
// make the boundedqueue limit be slightly less than the uncompressed size
bq = admission.NewBoundedQueue(int64(sizer.TracesSize(td)-100), int64(10))
ctc.stream.EXPECT().Send(statusOKFor(batch.BatchId)).Times(0)
bq = admission.NewBoundedQueue(int64(sizer.TracesSize(td)-100), 10)
} else {
ctc.stream.EXPECT().Send(statusOKFor(batch.BatchId)).Times(1).Return(nil)
bq = admission.NewBoundedQueue(defaultBoundedQueueLimit, int64(10))
bq = admission.NewBoundedQueue(defaultBoundedQueueLimit, 10)
}

ctc.start(ctc.newRealConsumer, bq)
ctc.putBatch(batch, nil)

if tt.rejected {
ctc.cancel()
}

select {
case data := <-ctc.consume:
err := ctc.wait()
requireExhaustedStatus(t, err)
} else {
data := <-ctc.consume
actualTD := data.Data.(ptrace.Traces)
otelAssert.Equiv(stdTesting, []json.Marshaler{
compareJSONTraces{td},
Expand All @@ -462,8 +495,6 @@ func TestBoundedQueueWithPdataHeaders(t *testing.T) {
})
err = ctc.cancelAndWait()
requireCanceledStatus(t, err)
case err = <-ctc.streamErr:
requireCanceledStatus(t, err)
}
})
}
Expand All @@ -480,7 +511,7 @@ func TestReceiverTraces(t *testing.T) {

ctc.stream.EXPECT().Send(statusOKFor(batch.BatchId)).Times(1).Return(nil)

ctc.start(ctc.newRealConsumer, defaultBQ)
ctc.start(ctc.newRealConsumer, defaultBQ())
ctc.putBatch(batch, nil)

otelAssert.Equiv(stdTesting, []json.Marshaler{
Expand All @@ -503,7 +534,7 @@ func TestReceiverLogs(t *testing.T) {

ctc.stream.EXPECT().Send(statusOKFor(batch.BatchId)).Times(1).Return(nil)

ctc.start(ctc.newRealConsumer, defaultBQ)
ctc.start(ctc.newRealConsumer, defaultBQ())
ctc.putBatch(batch, nil)

assert.EqualValues(t, []json.Marshaler{compareJSONLogs{ld}}, []json.Marshaler{compareJSONLogs{(<-ctc.consume).Data.(plog.Logs)}})
Expand All @@ -523,7 +554,7 @@ func TestReceiverMetrics(t *testing.T) {

ctc.stream.EXPECT().Send(statusOKFor(batch.BatchId)).Times(1).Return(nil)

ctc.start(ctc.newRealConsumer, defaultBQ)
ctc.start(ctc.newRealConsumer, defaultBQ())
ctc.putBatch(batch, nil)

otelAssert.Equiv(stdTesting, []json.Marshaler{
Expand All @@ -540,7 +571,7 @@ func TestReceiverRecvError(t *testing.T) {
tc := healthyTestChannel{}
ctc := newCommonTestCase(t, tc)

ctc.start(ctc.newRealConsumer, defaultBQ)
ctc.start(ctc.newRealConsumer, defaultBQ())

ctc.putBatch(nil, fmt.Errorf("test recv error"))

Expand All @@ -557,16 +588,27 @@ func TestReceiverSendError(t *testing.T) {
batch, err := ctc.testProducer.BatchArrowRecordsFromLogs(ld)
require.NoError(t, err)

ctc.stream.EXPECT().Send(statusOKFor(batch.BatchId)).Times(1).Return(fmt.Errorf("test send error"))
ctc.stream.EXPECT().Send(statusOKFor(batch.BatchId)).Times(1).Return(status.Errorf(codes.Unavailable, "test send error"))

ctc.start(ctc.newRealConsumer, defaultBQ)
ctc.start(ctc.newRealConsumer, defaultBQ())
ctc.putBatch(batch, nil)

assert.EqualValues(t, ld, (<-ctc.consume).Data)

start := time.Now()
for time.Since(start) < 10*time.Second {
if ctc.ctrl.Satisfied() {
break
}
time.Sleep(time.Second)
}

// Release the receiver -- the sender has seen an error by
// now and should return the stream. (Oddly, gRPC has no way
// to signal the receive call to fail using context.)
close(ctc.receive)
err = ctc.wait()
require.Error(t, err)
require.Contains(t, err.Error(), "test send error")
requireUnavailableStatus(t, err)
}

func TestReceiverConsumeError(t *testing.T) {
Expand Down Expand Up @@ -600,7 +642,7 @@ func TestReceiverConsumeError(t *testing.T) {

ctc.stream.EXPECT().Send(statusUnavailableFor(batch.BatchId, "consumer unhealthy")).Times(1).Return(nil)

ctc.start(ctc.newRealConsumer, defaultBQ)
ctc.start(ctc.newRealConsumer, defaultBQ())

ctc.putBatch(batch, nil)

Expand Down Expand Up @@ -638,7 +680,7 @@ func TestReceiverInvalidData(t *testing.T) {
}

for _, item := range data {
tc := unhealthyTestChannel{}
tc := healthyTestChannel{}
ctc := newCommonTestCase(t, tc)

var batch *arrowpb.BatchArrowRecords
Expand All @@ -657,13 +699,12 @@ func TestReceiverInvalidData(t *testing.T) {

batch = copyBatch(batch)

ctc.stream.EXPECT().Send(statusInvalidFor(batch.BatchId, "Permanent error: test invalid error")).Times(1).Return(nil)

ctc.start(ctc.newErrorConsumer, defaultBQ)
// newErrorConsumer determines the internal error in decoding above
ctc.start(ctc.newErrorConsumer, defaultBQ())
ctc.putBatch(batch, nil)

err = ctc.cancelAndWait()
requireCanceledStatus(t, err)
err = ctc.wait()
requireInternalStatus(t, err)
}
}

Expand Down Expand Up @@ -694,13 +735,13 @@ func TestReceiverMemoryLimit(t *testing.T) {

batch = copyBatch(batch)

ctc.stream.EXPECT().Send(statusExhaustedFor(batch.BatchId, "Permanent error: test oom error "+arrowRecord.ErrConsumerMemoryLimit.Error())).Times(1).Return(nil)
// The Recv() returns an error, there are no Send() calls.

ctc.start(ctc.newOOMConsumer, defaultBQ)
ctc.start(ctc.newOOMConsumer, defaultBQ())
ctc.putBatch(batch, nil)

err = ctc.cancelAndWait()
requireCanceledStatus(t, err)
err = ctc.wait()
requireExhaustedStatus(t, err)
}
}

Expand Down Expand Up @@ -743,7 +784,7 @@ func TestReceiverEOF(t *testing.T) {

ctc.stream.EXPECT().Send(gomock.Any()).Times(times).Return(nil)

ctc.start(ctc.newRealConsumer, defaultBQ)
ctc.start(ctc.newRealConsumer, defaultBQ())

go func() {
for i := 0; i < times; i++ {
Expand Down Expand Up @@ -808,7 +849,7 @@ func testReceiverHeaders(t *testing.T, includeMeta bool) {

ctc.stream.EXPECT().Send(gomock.Any()).Times(len(expectData)).Return(nil)

ctc.start(ctc.newRealConsumer, defaultBQ, func(gsettings *configgrpc.ServerConfig, _ *auth.Server) {
ctc.start(ctc.newRealConsumer, defaultBQ(), func(gsettings *configgrpc.ServerConfig, _ *auth.Server) {
gsettings.IncludeMetadata = includeMeta
})

Expand Down Expand Up @@ -880,7 +921,7 @@ func TestReceiverCancel(t *testing.T) {
ctc := newCommonTestCase(t, tc)

ctc.cancel()
ctc.start(ctc.newRealConsumer, defaultBQ)
ctc.start(ctc.newRealConsumer, defaultBQ())

err := ctc.wait()
requireCanceledStatus(t, err)
Expand Down Expand Up @@ -1170,7 +1211,7 @@ func testReceiverAuthHeaders(t *testing.T, includeMeta bool, dataAuth bool) {
})

var authCall *gomock.Call
ctc.start(ctc.newRealConsumer, defaultBQ, func(gsettings *configgrpc.ServerConfig, authPtr *auth.Server) {
ctc.start(ctc.newRealConsumer, defaultBQ(), func(gsettings *configgrpc.ServerConfig, authPtr *auth.Server) {
gsettings.IncludeMetadata = includeMeta

as := mock.NewMockServer(ctc.ctrl)
Expand Down
Loading

0 comments on commit c5c9d7e

Please sign in to comment.