Skip to content

Commit

Permalink
fix logs
Browse files Browse the repository at this point in the history
  • Loading branch information
jmacd committed Oct 28, 2024
1 parent 7ea2e32 commit 845b138
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 88 deletions.
25 changes: 11 additions & 14 deletions receiver/otelarrowreceiver/internal/logs/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ type Receiver struct {
plogotlp.UnimplementedGRPCServer
nextConsumer consumer.Logs
obsrecv *receiverhelper.ObsReport
boundedQueue *admission.BoundedQueue
boundedQueue admission.Queue
sizer *plog.ProtoMarshaler
logger *zap.Logger
}

// New creates a new Receiver reference.
func New(logger *zap.Logger, nextConsumer consumer.Logs, obsrecv *receiverhelper.ObsReport, bq *admission.BoundedQueue) *Receiver {
func New(logger *zap.Logger, nextConsumer consumer.Logs, obsrecv *receiverhelper.ObsReport, bq admission.Queue) *Receiver {
return &Receiver{
nextConsumer: nextConsumer,
obsrecv: obsrecv,
Expand All @@ -41,26 +41,23 @@ func New(logger *zap.Logger, nextConsumer consumer.Logs, obsrecv *receiverhelper
// Export implements the service Export logs func.
func (r *Receiver) Export(ctx context.Context, req plogotlp.ExportRequest) (plogotlp.ExportResponse, error) {
ld := req.Logs()
numSpans := ld.LogRecordCount()
if numSpans == 0 {
numRecords := ld.LogRecordCount()
if numRecords == 0 {
return plogotlp.NewExportResponse(), nil
}

ctx = r.obsrecv.StartLogsOp(ctx)

var err error
sizeBytes := int64(r.sizer.LogsSize(req.Logs()))
err := r.boundedQueue.Acquire(ctx, sizeBytes)
if err != nil {
return plogotlp.NewExportResponse(), err
if release, acqErr := r.boundedQueue.Acquire(ctx, sizeBytes); acqErr == nil {
err = r.nextConsumer.ConsumeLogs(ctx, ld)
release() // immediate release
} else {
err = acqErr
}
defer func() {
if releaseErr := r.boundedQueue.Release(sizeBytes); releaseErr != nil {
r.logger.Error("Error releasing bytes from semaphore", zap.Error(releaseErr))
}
}()

err = r.nextConsumer.ConsumeLogs(ctx, ld)
r.obsrecv.EndLogsOp(ctx, dataFormatProtobuf, numSpans, err)
r.obsrecv.EndLogsOp(ctx, dataFormatProtobuf, numRecords, err)

return plogotlp.NewExportResponse(), err
}
Expand Down
180 changes: 117 additions & 63 deletions receiver/otelarrowreceiver/internal/logs/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,151 +8,205 @@ import (
"errors"
"net"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/plog/plogotlp"
"go.opentelemetry.io/collector/receiver/receiverhelper"
"go.opentelemetry.io/collector/receiver/receivertest"
"go.opentelemetry.io/otel/trace/noop"
"go.uber.org/multierr"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/testdata"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver/internal/testconsumer"
)

const (
maxWaiters = 10
maxBytes = int64(250)
maxBytes = 250
)

func TestExport(t *testing.T) {
type testSink struct {
consumertest.LogsSink
context.Context
context.CancelFunc
}

func newTestSink() *testSink {
ctx, cancel := context.WithCancel(context.Background())
return &testSink{
Context: ctx,
CancelFunc: cancel,
}
}

func (ts *testSink) unblock() {
time.Sleep(10 * time.Millisecond)
ts.CancelFunc()
}

func (ts *testSink) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
<-ts.Context.Done()
return ts.LogsSink.ConsumeLogs(ctx, ld)
}

func TestExport_Success(t *testing.T) {
ld := testdata.GenerateLogs(1)
req := plogotlp.NewExportRequestFromLogs(ld)

logSink := new(consumertest.LogsSink)
logClient := makeLogsServiceClient(t, logSink)
resp, err := logClient.Export(context.Background(), req)
logSink := newTestSink()
logsClient, selfExp, selfProv := makeTraceServiceClient(t, logSink)

go logSink.unblock()
resp, err := logsClient.Export(context.Background(), req)
require.NoError(t, err, "Failed to export trace: %v", err)
require.NotNil(t, resp, "The response is missing")

lds := logSink.AllLogs()
require.Len(t, lds, 1)
assert.EqualValues(t, ld, lds[0])
require.Len(t, logSink.AllLogs(), 1)
assert.EqualValues(t, ld, logSink.AllLogs()[0])

// One self-tracing spans is issued.
require.NoError(t, selfProv.ForceFlush(context.Background()))
require.Len(t, selfExp.GetSpans(), 1)
}

func TestExport_EmptyRequest(t *testing.T) {
logSink := new(consumertest.LogsSink)
logSink := newTestSink()
logsClient, selfExp, selfProv := makeTraceServiceClient(t, logSink)
empty := plogotlp.NewExportRequest()

logClient := makeLogsServiceClient(t, logSink)
resp, err := logClient.Export(context.Background(), plogotlp.NewExportRequest())
go logSink.unblock()
resp, err := logsClient.Export(context.Background(), empty)
assert.NoError(t, err, "Failed to export trace: %v", err)
assert.NotNil(t, resp, "The response is missing")

require.Len(t, logSink.AllLogs(), 0)

// No self-tracing spans are issued.
require.NoError(t, selfProv.ForceFlush(context.Background()))
require.Len(t, selfExp.GetSpans(), 0)
}

func TestExport_ErrorConsumer(t *testing.T) {
ld := testdata.GenerateLogs(1)
req := plogotlp.NewExportRequestFromLogs(ld)

logClient := makeLogsServiceClient(t, consumertest.NewErr(errors.New("my error")))
resp, err := logClient.Export(context.Background(), req)
logsClient, selfExp, selfProv := makeTraceServiceClient(t, consumertest.NewErr(errors.New("my error")))
resp, err := logsClient.Export(context.Background(), req)
assert.EqualError(t, err, "rpc error: code = Unknown desc = my error")
assert.Equal(t, plogotlp.ExportResponse{}, resp)

// One self-tracing spans is issued.
require.NoError(t, selfProv.ForceFlush(context.Background()))
require.Len(t, selfExp.GetSpans(), 1)
}

func TestExport_AdmissionLimitBytesExceeded(t *testing.T) {
func TestExport_AdmissionRequestTooLarge(t *testing.T) {
ld := testdata.GenerateLogs(10)
logSink := new(consumertest.LogsSink)
logSink := newTestSink()
req := plogotlp.NewExportRequestFromLogs(ld)
logsClient, selfExp, selfProv := makeTraceServiceClient(t, logSink)

logClient := makeLogsServiceClient(t, logSink)
resp, err := logClient.Export(context.Background(), req)
assert.EqualError(t, err, "rpc error: code = Unknown desc = rejecting request, request size larger than configured limit")
go logSink.unblock()
resp, err := logsClient.Export(context.Background(), req)
assert.EqualError(t, err, "rpc error: code = InvalidArgument desc = rejecting request, request is too large")
assert.Equal(t, plogotlp.ExportResponse{}, resp)
}

func TestExport_TooManyWaiters(t *testing.T) {
bc := testconsumer.NewBlockingConsumer()
// One self-tracing spans is issued.
require.NoError(t, selfProv.ForceFlush(context.Background()))
require.Len(t, selfExp.GetSpans(), 1)
}

logsClient := makeLogsServiceClient(t, bc)
bg := context.Background()
var errs, err error
func TestExport_AdmissionLimitExceeded(t *testing.T) {
ld := testdata.GenerateLogs(1)
logSink := newTestSink()
req := plogotlp.NewExportRequestFromLogs(ld)
var mtx sync.Mutex
numResponses := 0
// Send request that will acquire all of the semaphores bytes and block.
go func() {
_, err = logsClient.Export(bg, req)
mtx.Lock()
errs = multierr.Append(errs, err)
numResponses++
mtx.Unlock()
}()

for i := 0; i < maxWaiters+1; i++ {
logsClient, selfExp, selfProv := makeTraceServiceClient(t, logSink)

var wait sync.WaitGroup
wait.Add(10)

var expectSuccess atomic.Int32

for i := 0; i < 10; i++ {
go func() {
_, err := logsClient.Export(bg, req)
mtx.Lock()
errs = multierr.Append(errs, err)
numResponses++
mtx.Unlock()
defer wait.Done()
_, err := logsClient.Export(context.Background(), req)
if err == nil {
// some succeed!
expectSuccess.Add(1)
return
}
assert.EqualError(t, err, "rpc error: code = ResourceExhausted desc = rejecting request, too much pending data")
}()
}

// sleep so all async requests are blocked on semaphore Acquire.
time.Sleep(1 * time.Second)
logSink.unblock()
wait.Wait()

// unblock and wait for errors to be returned and written.
bc.Unblock()
assert.Eventually(t, func() bool {
mtx.Lock()
defer mtx.Unlock()
errSlice := multierr.Errors(errs)
return numResponses == maxWaiters+2 && len(errSlice) == 1
}, 3*time.Second, 10*time.Millisecond)
// 10 self-tracing spans are issued
require.NoError(t, selfProv.ForceFlush(context.Background()))
require.Len(t, selfExp.GetSpans(), 10)

assert.ErrorContains(t, errs, "too many waiters")
// Expect the correct number of success and failure.
testSuccess := 0
for _, span := range selfExp.GetSpans() {
switch span.Status.Code {
case codes.Ok, codes.Unset:
testSuccess++
}
}
require.Equal(t, int(expectSuccess.Load()), testSuccess)
}

func makeLogsServiceClient(t *testing.T, lc consumer.Logs) plogotlp.GRPCClient {
addr := otlpReceiverOnGRPCServer(t, lc)
func makeTraceServiceClient(t *testing.T, lc consumer.Logs) (plogotlp.GRPCClient, *tracetest.InMemoryExporter, *trace.TracerProvider) {
addr, exp, tp := otlpReceiverOnGRPCServer(t, lc)
cc, err := grpc.NewClient(addr.String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err, "Failed to create the TraceServiceClient: %v", err)
t.Cleanup(func() {
require.NoError(t, cc.Close())
})

return plogotlp.NewGRPCClient(cc)
return plogotlp.NewGRPCClient(cc), exp, tp
}

func otlpReceiverOnGRPCServer(t *testing.T, lc consumer.Logs) net.Addr {
func otlpReceiverOnGRPCServer(t *testing.T, lc consumer.Logs) (net.Addr, *tracetest.InMemoryExporter, *trace.TracerProvider) {
ln, err := net.Listen("tcp", "localhost:")
require.NoError(t, err, "Failed to find an available address to run the gRPC server: %v", err)

t.Cleanup(func() {
require.NoError(t, ln.Close())
})

exp := tracetest.NewInMemoryExporter()

tp := trace.NewTracerProvider(trace.WithSyncer(exp))
telset := componenttest.NewNopTelemetrySettings()
telset.TracerProvider = tp

set := receivertest.NewNopSettings()
set.ID = component.NewIDWithName(component.MustNewType("otlp"), "log")
set.TelemetrySettings = telset

set.ID = component.NewIDWithName(component.MustNewType("otlp"), "logs")
obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
ReceiverID: set.ID,
Transport: "grpc",
ReceiverCreateSettings: set,
})
require.NoError(t, err)

bq := admission.NewBoundedQueue(noop.NewTracerProvider(), maxBytes, maxWaiters)
bq := admission.NewBoundedQueue(telset, maxBytes, 0)
r := New(zap.NewNop(), lc, obsrecv, bq)
// Now run it as a gRPC server
srv := grpc.NewServer()
Expand All @@ -161,5 +215,5 @@ func otlpReceiverOnGRPCServer(t *testing.T, lc consumer.Logs) net.Addr {
_ = srv.Serve(ln)
}()

return ln.Addr()
return ln.Addr(), exp, tp
}
Loading

0 comments on commit 845b138

Please sign in to comment.