diff --git a/collector/receiver/otelarrowreceiver/internal/arrow/arrow.go b/collector/receiver/otelarrowreceiver/internal/arrow/arrow.go index a1d51a7b..4159822e 100644 --- a/collector/receiver/otelarrowreceiver/internal/arrow/arrow.go +++ b/collector/receiver/otelarrowreceiver/internal/arrow/arrow.go @@ -13,6 +13,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" arrowpb "github.com/open-telemetry/otel-arrow/api/experimental/arrow/v1" "github.com/open-telemetry/otel-arrow/collector/netstats" @@ -55,7 +56,7 @@ var ( ErrNoMetricsConsumer = fmt.Errorf("no metrics consumer") ErrNoLogsConsumer = fmt.Errorf("no logs consumer") ErrNoTracesConsumer = fmt.Errorf("no traces consumer") - ErrUnrecognizedPayload = fmt.Errorf("unrecognized OTel-Arrow payload") + ErrUnrecognizedPayload = consumererror.NewPermanent(fmt.Errorf("unrecognized OTel-Arrow payload")) ) type Consumers interface { @@ -285,7 +286,7 @@ func (h *headerReceiver) newContext(ctx context.Context, hdrs map[string][]strin } // logStreamError decides how to log an error. -func (r *Receiver) logStreamError(err error) { +func (r *Receiver) logStreamError(err error, where string) { var code codes.Code var msg string // gRPC tends to supply status-wrapped errors, so we always @@ -307,7 +308,7 @@ func (r *Receiver) logStreamError(err error) { if code == codes.Canceled { r.telemetry.Logger.Debug("arrow stream shutdown", zap.String("message", msg)) } else { - r.telemetry.Logger.Error("arrow stream error", zap.String("message", msg), zap.Int("code", int(code))) + r.telemetry.Logger.Error("arrow stream error", zap.String("message", msg), zap.Int("code", int(code)), zap.String("where", where)) } } @@ -340,10 +341,8 @@ type anyStreamServer interface { } type batchResp struct { - addr net.Addr - id int64 - err error - bytesToRelease int64 + id int64 + err error } func (r *Receiver) recoverErr(retErr *error) { @@ -355,7 +354,7 @@ func (r *Receiver) recoverErr(retErr *error) { zap.Reflect("recovered", err), zap.Stack("stacktrace"), ) - *retErr = fmt.Errorf("panic in otel-arrow-adapter: %v", err) + *retErr = status.Errorf(codes.Internal, "panic in otel-arrow-adapter: %v", err) } } @@ -370,116 +369,287 @@ func (r *Receiver) anyStream(serverStream anyStreamServer, method string) (retEr }() defer r.recoverErr(&retErr) + // doneCancel allows an error in the sender/receiver to + // interrupt the corresponding thread. doneCtx, doneCancel := context.WithCancel(streamCtx) defer doneCancel() + + // streamErrCh returns up to two errors from the sender and + // receiver threads started below. streamErrCh := make(chan error, 2) pendingCh := make(chan batchResp, runtime.NumCPU()) + // wg is used to ensure this thread returns after both + // sender and recevier threads return. + var wg sync.WaitGroup + wg.Add(2) + + // The inflightWG is used to wait for all data to send. The + // 1-count here is removed after srvReceiveLoop() returns, + // having this ensures that concurrent calls to Add() in the + // receiver do not race with Wait() in the sender. + r.inFlightWG.Add(1) + go func() { var err error + defer wg.Done() defer r.recoverErr(&err) + defer r.inFlightWG.Done() err = r.srvReceiveLoop(doneCtx, serverStream, pendingCh, method, ac) streamErrCh <- err }() - // WG is used to ensure main thread only returns once sender is finished sending responses for all requests. - var senderWG sync.WaitGroup - senderWG.Add(1) go func() { var err error + defer wg.Done() defer r.recoverErr(&err) err = r.srvSendLoop(doneCtx, serverStream, pendingCh) streamErrCh <- err - senderWG.Done() }() + // Wait for sender/receiver threads to return before returning. + defer wg.Wait() + select { case <-doneCtx.Done(): - senderWG.Wait() return status.Error(codes.Canceled, "server stream shutdown") case retErr = <-streamErrCh: doneCancel() - senderWG.Wait() return } } -func (r *Receiver) recvOne(ctx context.Context, serverStream anyStreamServer, hrcv *headerReceiver, pendingCh chan<- batchResp, method string, ac arrowRecord.ConsumerAPI) (retErr error) { +func (r *Receiver) newInFlightData(ctx context.Context, method string, batchID int64, pendingCh chan<- batchResp) (context.Context, *inFlightData) { + ctx, span := r.tracer.Start(ctx, "otel_arrow_stream_inflight") + r.inFlightWG.Add(1) - defer func() { - if retErr != nil { - r.inFlightWG.Done() // not processed - r.logStreamError(retErr) + r.recvInFlightRequests.Add(ctx, 1) + id := &inFlightData{ + Receiver: r, + method: method, + batchID: batchID, + pendingCh: pendingCh, + span: span, + } + id.refs.Add(1) + return ctx, id +} + +// inFlightData is responsible for storing the resources held by one request. +type inFlightData struct { + // Receiver is the owner of the resources held by this object. + *Receiver + + method string + batchID int64 + pendingCh chan<- batchResp + span trace.Span + + // refs counts the number of goroutines holding this object. + // initially the recvOne() body, on success the + // consumeAndRespond() function. + refs atomic.Int32 + + numAcquired int64 // how many bytes held in the semaphore + numItems int // how many items + uncompSize int64 // uncompressed data size +} + +func (id *inFlightData) recvDone(ctx context.Context, recvErrPtr *error) { + retErr := *recvErrPtr + + if retErr != nil { + // logStreamError because this response will break the stream. + id.logStreamError(retErr, "recv") + id.span.SetStatus(otelcodes.Error, retErr.Error()) + } + + id.anyDone(ctx) +} + +func (id *inFlightData) consumeDone(ctx context.Context, consumeErrPtr *error) { + retErr := *consumeErrPtr + + if retErr != nil { + // debug-level because the error was external from the pipeline. + id.telemetry.Logger.Debug("otel-arrow consume", zap.Error(retErr)) + id.span.SetStatus(otelcodes.Error, retErr.Error()) + } + + id.replyToCaller(retErr) + id.anyDone(ctx) +} + +func (id *inFlightData) replyToCaller(callerErr error) { + id.pendingCh <- batchResp{ + id: id.batchID, + err: callerErr, + } +} + +func (id *inFlightData) anyDone(ctx context.Context) { + // check if there are still refs, in which case leave the in-flight + // counts where they are. + if id.refs.Add(-1) != 0 { + return + } + + id.span.End() + + if id.numAcquired != 0 { + if err := id.boundedQueue.Release(id.numAcquired); err != nil { + id.telemetry.Logger.Error("release error", zap.Error(err)) } - }() + } + + if id.uncompSize != 0 { + id.recvInFlightBytes.Add(ctx, -id.uncompSize) + } + if id.numItems != 0 { + id.recvInFlightItems.Add(ctx, int64(-id.numItems)) + } + + // The netstats code knows that uncompressed size is + // unreliable for arrow transport, so we instrument it + // directly here. Only the primary direction of transport + // is instrumented this way. + var sized netstats.SizesStruct + sized.Method = id.method + sized.Length = id.uncompSize + id.netReporter.CountReceive(ctx, sized) + + id.recvInFlightRequests.Add(ctx, -1) + id.inFlightWG.Done() +} + +// recvOne begins processing a single Arrow batch. +// +// If an error is encountered before Arrow data is successfully consumed, +// the stream will break and the error will be returned immediately. +// +// If the error is due to authorization, the stream remains unbroken +// and the request fails. +// +// If not enough resources are available, the stream will block (if +// waiting permitted) or break (insufficient waiters). +// +// Assuming success, a new goroutine is created to handle consuming the +// data. +// +// This handles constructing an inFlightData object, which itself +// tracks everything that needs to be used by instrumention when the +// batch finishes. +func (r *Receiver) recvOne(streamCtx context.Context, serverStream anyStreamServer, hrcv *headerReceiver, pendingCh chan<- batchResp, method string, ac arrowRecord.ConsumerAPI) (retErr error) { // Receive a batch corresponding with one ptrace.Traces, pmetric.Metrics, // or plog.Logs item. req, err := serverStream.Recv() + // inflightCtx is carried through into consumeAndProcess on the success path. + inflightCtx, flight := r.newInFlightData(streamCtx, method, req.GetBatchId(), pendingCh) + defer flight.recvDone(inflightCtx, &retErr) + + // this span is a child of the inflight, covering the Arrow decode, Auth, etc. + _, span := r.tracer.Start(inflightCtx, "otel_arrow_stream_recv") + defer span.End() + if err != nil { if errors.Is(err, io.EOF) { return status.Error(codes.Canceled, "client stream shutdown") } else if errors.Is(err, context.Canceled) { return status.Error(codes.Canceled, "server stream shutdown") } + // Note: err is directly from gRPC, should already have status. return err } // Check for optional headers and set the incoming context. - thisCtx, authHdrs, err := hrcv.combineHeaders(ctx, req.GetHeaders()) + inflightCtx, authHdrs, err := hrcv.combineHeaders(inflightCtx, req.GetHeaders()) if err != nil { // Failing to parse the incoming headers breaks the stream. - return fmt.Errorf("arrow metadata error: %w", err) + return status.Errorf(codes.Internal, "arrow metadata error: %v", err) + } + + // Authorize the request, if configured, prior to acquiring resources. + if r.authServer != nil { + var authErr error + inflightCtx, authErr = r.authServer.Authenticate(inflightCtx, authHdrs) + if authErr != nil { + flight.replyToCaller(status.Error(codes.Unauthenticated, authErr.Error())) + return nil + } } - var prevAcquiredBytes int - uncompSizeStr, sizeHeaderFound := authHdrs["otlp-pdata-size"] - if !sizeHeaderFound || len(uncompSizeStr) == 0 { + + var prevAcquiredBytes int64 + uncompSizeHeaderStr, uncompSizeHeaderFound := authHdrs["otlp-pdata-size"] + if !uncompSizeHeaderFound || len(uncompSizeHeaderStr) == 0 { // This is a compressed size so make sure to acquire the difference when request is decompressed. - prevAcquiredBytes = proto.Size(req) + prevAcquiredBytes = int64(proto.Size(req)) } else { - prevAcquiredBytes, err = strconv.Atoi(uncompSizeStr[0]) + prevAcquiredBytes, err = strconv.ParseInt(uncompSizeHeaderStr[0], 10, 64) if err != nil { - return fmt.Errorf("failed to convert string to request size: %w", err) + return status.Errorf(codes.Internal, "failed to convert string to request size: %v", err) } } - // bounded queue to memory limit based on incoming uncompressed request size and waiters. - // Acquire will fail immediately if there are too many waiters, - // or will otherwise block until timeout or enough memory becomes available. - err = r.boundedQueue.Acquire(ctx, int64(prevAcquiredBytes)) + + // Use the bounded queue to memory limit based on incoming + // uncompressed request size and waiters. Acquire will fail + // immediately if there are too many waiters, or will + // otherwise block until timeout or enough memory becomes + // available. + err = r.boundedQueue.Acquire(inflightCtx, prevAcquiredBytes) if err != nil { - return fmt.Errorf("breaking stream: %w", err) + return status.Errorf(codes.ResourceExhausted, "otel-arrow bounded queue: %v", err) } + flight.numAcquired = prevAcquiredBytes - resp := batchResp{ - addr: hrcv.connInfo.Addr, - id: req.GetBatchId(), - bytesToRelease: int64(prevAcquiredBytes), - } + err, data, numItems, uncompSize := r.consumeBatch(ac, req) - var authErr error - if r.authServer != nil { - var newCtx context.Context - if newCtx, err = r.authServer.Authenticate(thisCtx, authHdrs); err != nil { - authErr = err + if err != nil { + if errors.Is(err, arrowRecord.ErrConsumerMemoryLimit) { + return status.Errorf(codes.ResourceExhausted, "otel-arrow decode: %v", err) } else { - thisCtx = newCtx + return status.Errorf(codes.Internal, "otel-arrow decode: %v", err) } } - // processAndConsume will process and send an error to the sender loop - go func() { - var err error - defer r.inFlightWG.Done() // done processing - defer r.recoverErr(&err) - err = r.processAndConsume(thisCtx, method, ac, req, authErr, resp, sizeHeaderFound) - resp.err = err - pendingCh <- resp - }() + flight.uncompSize = uncompSize + flight.numItems = numItems + + r.recvInFlightBytes.Add(inflightCtx, uncompSize) + r.recvInFlightItems.Add(inflightCtx, int64(numItems)) + + numAcquired, err := r.acquireAdditionalBytes(inflightCtx, prevAcquiredBytes, uncompSize, hrcv.connInfo.Addr, uncompSizeHeaderFound) + + flight.numAcquired = numAcquired + if err != nil { + return status.Errorf(codes.ResourceExhausted, "otel-arrow bounded queue re-acquire: %v", err) + } + + // Recognize that the request is still in-flight via consumeAndRespond() + flight.refs.Add(1) + + // consumeAndRespond consumes the data and returns control to the sender loop. + go r.consumeAndRespond(inflightCtx, data, flight) return nil } +// consumeAndRespond finishes the span started in recvOne and logs the +// result after invoking the pipeline to consume the data. +func (r *Receiver) consumeAndRespond(ctx context.Context, data any, flight *inFlightData) { + var err error + defer flight.consumeDone(ctx, &err) + + // recoverErr is a special function because it recovers panics, so we + // keep it in a separate defer than the processing above, which will + // run after the panic is recovered into an ordinary error. + defer r.recoverErr(&err) + + err = r.consumeData(ctx, data, flight) +} + +// srvReceiveLoop repeatedly receives one batch of data. func (r *Receiver) srvReceiveLoop(ctx context.Context, serverStream anyStreamServer, pendingCh chan<- batchResp, method string, ac arrowRecord.ConsumerAPI) (retErr error) { hrcv := newHeaderReceiver(ctx, r.authServer, r.gsettings.IncludeMetadata) for { @@ -487,46 +657,52 @@ func (r *Receiver) srvReceiveLoop(ctx context.Context, serverStream anyStreamSer case <-ctx.Done(): return status.Error(codes.Canceled, "server stream shutdown") default: - err := r.recvOne(ctx, serverStream, hrcv, pendingCh, method, ac) - if err != nil { + if err := r.recvOne(ctx, serverStream, hrcv, pendingCh, method, ac); err != nil { return err } } } } +// srvReceiveLoop repeatedly sends one batch data response. func (r *Receiver) sendOne(serverStream anyStreamServer, resp batchResp) error { // Note: Statuses can be batched, but we do not take // advantage of this feature. - status := &arrowpb.BatchStatus{ + bs := &arrowpb.BatchStatus{ BatchId: resp.id, } if resp.err == nil { - status.StatusCode = arrowpb.StatusCode_OK + bs.StatusCode = arrowpb.StatusCode_OK } else { - status.StatusMessage = resp.err.Error() - switch { - case errors.Is(resp.err, arrowRecord.ErrConsumerMemoryLimit): - r.telemetry.Logger.Error("arrow resource exhausted", zap.Error(resp.err)) - status.StatusCode = arrowpb.StatusCode_RESOURCE_EXHAUSTED - case consumererror.IsPermanent(resp.err): - r.telemetry.Logger.Error("arrow data error", zap.Error(resp.err)) - status.StatusCode = arrowpb.StatusCode_INVALID_ARGUMENT - default: - r.telemetry.Logger.Debug("arrow consumer error", zap.Error(resp.err)) - status.StatusCode = arrowpb.StatusCode_UNAVAILABLE + // Generally, code in the receiver should use + // status.Errorf(codes.XXX, ...) so that we take the + // first branch. + if gsc, ok := status.FromError(resp.err); ok { + bs.StatusCode = arrowpb.StatusCode(gsc.Code()) + bs.StatusMessage = gsc.Message() + } else { + // Ideally, we don't take this branch because all code uses + // gRPC status constructors and we've taken the branch above. + // + // This is a fallback for several broad categories of error. + bs.StatusMessage = resp.err.Error() + + switch { + case consumererror.IsPermanent(resp.err): + // Some kind of pipeline error, somewhere downstream. + r.telemetry.Logger.Error("arrow data error", zap.Error(resp.err)) + bs.StatusCode = arrowpb.StatusCode_INVALID_ARGUMENT + default: + // Probably a pipeline error, retryable. + r.telemetry.Logger.Debug("arrow consumer error", zap.Error(resp.err)) + bs.StatusCode = arrowpb.StatusCode_UNAVAILABLE + } } } - err := serverStream.Send(status) - if err != nil { - r.logStreamError(err) - return err - } - - err = r.boundedQueue.Release(resp.bytesToRelease) - if err != nil { - r.logStreamError(err) + if err := serverStream.Send(bs); err != nil { + // logStreamError because this response will break the stream. + r.logStreamError(err, "send") return err } @@ -535,8 +711,12 @@ func (r *Receiver) sendOne(serverStream anyStreamServer, resp batchResp) error { func (r *Receiver) flushSender(serverStream anyStreamServer, pendingCh <-chan batchResp) error { var err error - // wait for all in flight requests to successfully be processed or fail. + // wait for all in flight requests to be successfully + // processed or fail. this implies waiting for the receiver + // loop to exit, as it holds one additional wait count to + // avoid a race with Add() here. r.inFlightWG.Wait() + for { select { case resp := <-pendingCh: @@ -552,235 +732,165 @@ func (r *Receiver) flushSender(serverStream anyStreamServer, pendingCh <-chan ba } func (r *Receiver) srvSendLoop(ctx context.Context, serverStream anyStreamServer, pendingCh <-chan batchResp) error { - var err error for { select { case <-ctx.Done(): - err = r.flushSender(serverStream, pendingCh) - // err := multierr.Append(err, ctx.Err()) - return err + return r.flushSender(serverStream, pendingCh) case resp := <-pendingCh: - err = r.sendOne(serverStream, resp) - if err != nil { + if err := r.sendOne(serverStream, resp); err != nil { return err } } } } -func (r *Receiver) processAndConsume(ctx context.Context, method string, arrowConsumer arrowRecord.ConsumerAPI, req *arrowpb.BatchArrowRecords, authErr error, response batchResp, sizeHeaderFound bool) error { - var err error - - ctx, span := r.tracer.Start(ctx, "otel_arrow_stream_recv") - defer span.End() - - r.recvInFlightRequests.Add(ctx, 1) - defer func() { - r.recvInFlightRequests.Add(ctx, -1) - // Set span status if an error is returned. - if err != nil { - span := trace.SpanFromContext(ctx) - span.SetStatus(otelcodes.Error, err.Error()) - } - }() - - // Process records: an error in this code path does - // not necessarily break the stream. - if authErr != nil { - err = authErr - } else { - err = r.processRecords(ctx, method, arrowConsumer, req, response, sizeHeaderFound) - } - - return err -} +// consumeBatch applies the batch to the Arrow Consumer, returns a +// slice of pdata objects of the corresponding data type as `any`. +// along with the number of items and true uncompressed size. +func (r *Receiver) consumeBatch(arrowConsumer arrowRecord.ConsumerAPI, records *arrowpb.BatchArrowRecords) (retErr error, retData any, numItems int, uncompSize int64) { -// processRecords returns an error and a boolean indicating whether -// the error (true) was from processing the data (i.e., invalid -// argument) or (false) from the consuming pipeline. The boolean is -// not used when success (nil error) is returned. -func (r *Receiver) processRecords(ctx context.Context, method string, arrowConsumer arrowRecord.ConsumerAPI, records *arrowpb.BatchArrowRecords, response batchResp, sizeHeaderFound bool) error { payloads := records.GetArrowPayloads() if len(payloads) == 0 { - return nil + return nil, nil, 0, 0 } - var uncompSize int64 - defer func() { - // The netstats code knows that uncompressed size is - // unreliable for arrow transport, so we instrument it - // directly here. Only the primary direction of transport - // is instrumented this way. - var sized netstats.SizesStruct - sized.Method = method - sized.Length = uncompSize - r.netReporter.CountReceive(ctx, sized) - }() switch payloads[0].Type { case arrowpb.ArrowPayloadType_UNIVARIATE_METRICS: if r.Metrics() == nil { - return status.Error(codes.Unimplemented, "metrics service not available") + return status.Error(codes.Unimplemented, "metrics service not available"), nil, 0, 0 } var sizer pmetric.ProtoMarshaler - var numPts int - - ctx = r.obsrecv.StartMetricsOp(ctx) data, err := arrowConsumer.MetricsFrom(records) - if err != nil { - err = consumererror.NewPermanent(err) - } else { + if err == nil { for _, metrics := range data { - items := metrics.DataPointCount() - sz := int64(sizer.MetricsSize(metrics)) - - r.recvInFlightBytes.Add(ctx, sz) - r.recvInFlightItems.Add(ctx, int64(items)) - - numPts += items - uncompSize += sz - err = multierr.Append(err, - r.Metrics().ConsumeMetrics(ctx, metrics), - ) - } - - acquireErr := r.acquireAdditionalBytes(ctx, uncompSize, response, sizeHeaderFound) - - if acquireErr != nil { - err = multierr.Append(err, acquireErr) - // if acquireAdditionalBytes() failed then the previously acquired bytes were already released i.e. nothing to releaes. - return err + numItems += metrics.DataPointCount() + uncompSize += int64(sizer.MetricsSize(metrics)) } - - // entire request has been processed, decrement counter. - r.recvInFlightBytes.Add(ctx, -uncompSize) - r.recvInFlightItems.Add(ctx, int64(-numPts)) } - r.obsrecv.EndMetricsOp(ctx, streamFormat, numPts, err) - return err + retData = data + retErr = err case arrowpb.ArrowPayloadType_LOGS: if r.Logs() == nil { - return status.Error(codes.Unimplemented, "logs service not available") + return status.Error(codes.Unimplemented, "logs service not available"), nil, 0, 0 } var sizer plog.ProtoMarshaler - var numLogs int - ctx = r.obsrecv.StartLogsOp(ctx) data, err := arrowConsumer.LogsFrom(records) - if err != nil { - err = consumererror.NewPermanent(err) - } else { + if err == nil { for _, logs := range data { - items := logs.LogRecordCount() - sz := int64(sizer.LogsSize(logs)) - - r.recvInFlightBytes.Add(ctx, sz) - r.recvInFlightItems.Add(ctx, int64(items)) - numLogs += items - uncompSize += sz - err = multierr.Append(err, - r.Logs().ConsumeLogs(ctx, logs), - ) + numItems += logs.LogRecordCount() + uncompSize += int64(sizer.LogsSize(logs)) } - - acquireErr := r.acquireAdditionalBytes(ctx, uncompSize, response, sizeHeaderFound) - - if acquireErr != nil { - err = multierr.Append(err, acquireErr) - // if acquireAdditionalBytes() failed then the previously acquired bytes were already released i.e. nothing to releaes. - return err - } - - // entire request has been processed, decrement counter. - r.recvInFlightBytes.Add(ctx, -uncompSize) - r.recvInFlightItems.Add(ctx, int64(-numLogs)) } - r.obsrecv.EndLogsOp(ctx, streamFormat, numLogs, err) - return err + retData = data + retErr = err case arrowpb.ArrowPayloadType_SPANS: if r.Traces() == nil { - return status.Error(codes.Unimplemented, "traces service not available") + return status.Error(codes.Unimplemented, "traces service not available"), nil, 0, 0 } var sizer ptrace.ProtoMarshaler - var numSpans int - ctx = r.obsrecv.StartTracesOp(ctx) data, err := arrowConsumer.TracesFrom(records) - if err != nil { - err = consumererror.NewPermanent(err) - } else { + if err == nil { for _, traces := range data { - items := traces.SpanCount() - sz := int64(sizer.TracesSize(traces)) + numItems += traces.SpanCount() + uncompSize += int64(sizer.TracesSize(traces)) + } + } + retData = data + retErr = err - r.recvInFlightBytes.Add(ctx, sz) - r.recvInFlightItems.Add(ctx, int64(items)) + default: + retErr = ErrUnrecognizedPayload + } - numSpans += items - uncompSize += sz - err = multierr.Append(err, - r.Traces().ConsumeTraces(ctx, traces), - ) - } + return retErr, retData, numItems, uncompSize +} - acquireErr := r.acquireAdditionalBytes(ctx, uncompSize, response, sizeHeaderFound) +// consumeData invokes the next pipeline consumer for a received batch of data. +// it uses the standard OTel collector instrumentation (receiverhelper.ObsReport). +// +// if any errors are permanent, returns a permanent error. +func (r *Receiver) consumeData(ctx context.Context, data any, flight *inFlightData) (retErr error) { + oneOp := func(err error) { + retErr = multierr.Append(retErr, err) + } + var final func(context.Context, string, int, error) - if acquireErr != nil { - err = multierr.Append(err, acquireErr) - // if acquireAdditionalBytes() failed then the previously acquired bytes were already released i.e. nothing to releaes. - return err - } + switch items := data.(type) { + case []pmetric.Metrics: + ctx = r.obsrecv.StartMetricsOp(ctx) + for _, metrics := range items { + oneOp(r.Metrics().ConsumeMetrics(ctx, metrics)) + } + final = r.obsrecv.EndMetricsOp - // entire request has been processed, decrement counter. - r.recvInFlightBytes.Add(ctx, -uncompSize) - r.recvInFlightItems.Add(ctx, int64(-numSpans)) + case []plog.Logs: + ctx = r.obsrecv.StartLogsOp(ctx) + for _, logs := range items { + oneOp(r.Logs().ConsumeLogs(ctx, logs)) } - r.obsrecv.EndTracesOp(ctx, streamFormat, numSpans, err) - return err + final = r.obsrecv.EndLogsOp + + case []ptrace.Traces: + ctx = r.obsrecv.StartTracesOp(ctx) + for _, traces := range items { + oneOp(r.Traces().ConsumeTraces(ctx, traces)) + } + final = r.obsrecv.EndTracesOp default: - return ErrUnrecognizedPayload + retErr = ErrUnrecognizedPayload } + if final != nil { + final(ctx, streamFormat, flight.numItems, retErr) + } + return retErr } -func (r *Receiver) acquireAdditionalBytes(ctx context.Context, uncompSize int64, response batchResp, sizeHeaderFound bool) error { - diff := uncompSize - response.bytesToRelease - - var err error - if diff != 0 { - if sizeHeaderFound { - var clientAddr string - if response.addr != nil { - clientAddr = response.addr.String() - } - // a mismatch between header set by exporter and the uncompSize just calculated. - r.telemetry.Logger.Debug("mismatch between uncompressed size in receiver and otlp-pdata-size header", zap.String("client-address", clientAddr), zap.Int("uncompsize", int(uncompSize)), zap.Int("otlp-pdata-size", int(response.bytesToRelease))) - } else if diff < 0 { - // proto.Size() on compressed request was greater than pdata uncompressed size. - r.telemetry.Logger.Debug("uncompressed size is less than compressed size", zap.Int("uncompressed", int(uncompSize)), zap.Int("compressed", int(response.bytesToRelease))) - } +func (r *Receiver) acquireAdditionalBytes(ctx context.Context, prevAcquired, uncompSize int64, addr net.Addr, uncompSizeHeaderFound bool) (int64, error) { + diff := uncompSize - prevAcquired - if diff > 0 { - // diff > 0 means we previously acquired too few bytes initially and we need to correct this. Release previously - // acquired bytes to prevent deadlock and reacquire the uncompressed size we just calculated. - // Note: No need to release and reacquire bytes if diff < 0 because this has less impact and no reason to potentially block - // a request that is in flight by reacquiring the correct size. - err = r.boundedQueue.Release(response.bytesToRelease) - if err != nil { - return err - } + if diff == 0 { + return uncompSize, nil + } - err = r.boundedQueue.Acquire(ctx, uncompSize) - if err != nil { - response.bytesToRelease = int64(0) - } else { - response.bytesToRelease = uncompSize - } + if uncompSizeHeaderFound { + var clientAddr string + if addr != nil { + clientAddr = addr.String() } - + // a mismatch between header set by exporter and the uncompSize just calculated. + r.telemetry.Logger.Debug("mismatch between uncompressed size in receiver and otlp-pdata-size header", + zap.String("client-address", clientAddr), + zap.Int("uncompsize", int(uncompSize)), + zap.Int("otlp-pdata-size", int(prevAcquired)), + ) + } else if diff < 0 { + // proto.Size() on compressed request was greater than pdata uncompressed size. + r.telemetry.Logger.Debug("uncompressed size is less than compressed size", + zap.Int("uncompressed", int(uncompSize)), + zap.Int("compressed", int(prevAcquired)), + ) } - return err + if diff < 0 { + // If the difference is negative, release the overage. + if err := r.boundedQueue.Release(-diff); err != nil { + return 0, err + } + } else { + // Release previously acquired bytes to prevent deadlock and + // reacquire the uncompressed size we just calculated. + if err := r.boundedQueue.Release(prevAcquired); err != nil { + return 0, err + } + if err := r.boundedQueue.Acquire(ctx, uncompSize); err != nil { + return 0, err + } + } + return uncompSize, nil } diff --git a/collector/receiver/otelarrowreceiver/internal/arrow/arrow_test.go b/collector/receiver/otelarrowreceiver/internal/arrow/arrow_test.go index f6f1e1f9..113391c5 100644 --- a/collector/receiver/otelarrowreceiver/internal/arrow/arrow_test.go +++ b/collector/receiver/otelarrowreceiver/internal/arrow/arrow_test.go @@ -13,6 +13,7 @@ import ( "strings" "sync" "testing" + "time" arrowpb "github.com/open-telemetry/otel-arrow/api/experimental/arrow/v1" arrowCollectorMock "github.com/open-telemetry/otel-arrow/api/experimental/arrow/v1/mock" @@ -48,7 +49,9 @@ import ( "github.com/open-telemetry/otel-arrow/collector/receiver/otelarrowreceiver/internal/arrow/mock" ) -var defaultBQ = admission.NewBoundedQueue(int64(100000), int64(10)) +func defaultBQ() *admission.BoundedQueue { + return admission.NewBoundedQueue(int64(100000), int64(10)) +} type compareJSONTraces struct{ ptrace.Traces } type compareJSONMetrics struct{ pmetric.Metrics } @@ -106,7 +109,7 @@ func (healthyTestChannel) onConsume() error { type unhealthyTestChannel struct{} func (unhealthyTestChannel) onConsume() error { - return fmt.Errorf("consumer unhealthy") + return status.Errorf(codes.Unavailable, "consumer unhealthy") } type recvResult struct { @@ -282,6 +285,14 @@ func statusInvalidFor(batchID int64, msg string) *arrowpb.BatchStatus { } } +func statusInternalFor(batchID int64, msg string) *arrowpb.BatchStatus { + return &arrowpb.BatchStatus{ + BatchId: batchID, + StatusCode: arrowpb.StatusCode_INTERNAL, + StatusMessage: msg, + } +} + func statusExhaustedFor(batchID int64, msg string) *arrowpb.BatchStatus { return &arrowpb.BatchStatus{ BatchId: batchID, @@ -290,6 +301,14 @@ func statusExhaustedFor(batchID int64, msg string) *arrowpb.BatchStatus { } } +func statusUnauthenticatedFor(batchID int64, msg string) *arrowpb.BatchStatus { + return &arrowpb.BatchStatus{ + BatchId: batchID, + StatusCode: arrowpb.StatusCode_INVALID_ARGUMENT, + StatusMessage: msg, + } +} + func (ctc *commonTestCase) newRealConsumer() arrowRecord.ConsumerAPI { mock := arrowRecordMock.NewMockConsumerAPI(ctc.ctrl) cons := arrowRecord.NewConsumer() @@ -358,10 +377,26 @@ func (ctc *commonTestCase) start(newConsumer func() arrowRecord.ConsumerAPI, bq } func requireCanceledStatus(t *testing.T, err error) { + requireStatus(t, codes.Canceled, err) +} + +func requireUnavailableStatus(t *testing.T, err error) { + requireStatus(t, codes.Unavailable, err) +} + +func requireInternalStatus(t *testing.T, err error) { + requireStatus(t, codes.Internal, err) +} + +func requireExhaustedStatus(t *testing.T, err error) { + requireStatus(t, codes.ResourceExhausted, err) +} + +func requireStatus(t *testing.T, code codes.Code, err error) { require.Error(t, err) status, ok := status.FromError(err) require.True(t, ok, "is status-wrapped %v", err) - require.Equal(t, codes.Canceled, status.Code()) + require.Equal(t, code, status.Code()) } func TestBoundedQueueWithPdataHeaders(t *testing.T) { @@ -437,23 +472,21 @@ func TestBoundedQueueWithPdataHeaders(t *testing.T) { var bq *admission.BoundedQueue if tt.rejected { - ctc.stream.EXPECT().Send(statusUnavailableFor(batch.BatchId, "rejecting request, request size larger than configured limit")).Times(1).Return(fmt.Errorf("rejecting request, request size larger than configured limit")) - // make the boundedqueue limit be slightly less than the uncompressed size - bq = admission.NewBoundedQueue(int64(sizer.TracesSize(td)-100), int64(10)) + ctc.stream.EXPECT().Send(statusOKFor(batch.BatchId)).Times(0) + bq = admission.NewBoundedQueue(int64(sizer.TracesSize(td)-100), 10) } else { ctc.stream.EXPECT().Send(statusOKFor(batch.BatchId)).Times(1).Return(nil) - bq = admission.NewBoundedQueue(defaultBoundedQueueLimit, int64(10)) + bq = admission.NewBoundedQueue(defaultBoundedQueueLimit, 10) } ctc.start(ctc.newRealConsumer, bq) ctc.putBatch(batch, nil) if tt.rejected { - ctc.cancel() - } - - select { - case data := <-ctc.consume: + err := ctc.wait() + requireExhaustedStatus(t, err) + } else { + data := <-ctc.consume actualTD := data.Data.(ptrace.Traces) otelAssert.Equiv(stdTesting, []json.Marshaler{ compareJSONTraces{td}, @@ -462,8 +495,6 @@ func TestBoundedQueueWithPdataHeaders(t *testing.T) { }) err = ctc.cancelAndWait() requireCanceledStatus(t, err) - case err = <-ctc.streamErr: - requireCanceledStatus(t, err) } }) } @@ -480,7 +511,7 @@ func TestReceiverTraces(t *testing.T) { ctc.stream.EXPECT().Send(statusOKFor(batch.BatchId)).Times(1).Return(nil) - ctc.start(ctc.newRealConsumer, defaultBQ) + ctc.start(ctc.newRealConsumer, defaultBQ()) ctc.putBatch(batch, nil) otelAssert.Equiv(stdTesting, []json.Marshaler{ @@ -503,7 +534,7 @@ func TestReceiverLogs(t *testing.T) { ctc.stream.EXPECT().Send(statusOKFor(batch.BatchId)).Times(1).Return(nil) - ctc.start(ctc.newRealConsumer, defaultBQ) + ctc.start(ctc.newRealConsumer, defaultBQ()) ctc.putBatch(batch, nil) assert.EqualValues(t, []json.Marshaler{compareJSONLogs{ld}}, []json.Marshaler{compareJSONLogs{(<-ctc.consume).Data.(plog.Logs)}}) @@ -523,7 +554,7 @@ func TestReceiverMetrics(t *testing.T) { ctc.stream.EXPECT().Send(statusOKFor(batch.BatchId)).Times(1).Return(nil) - ctc.start(ctc.newRealConsumer, defaultBQ) + ctc.start(ctc.newRealConsumer, defaultBQ()) ctc.putBatch(batch, nil) otelAssert.Equiv(stdTesting, []json.Marshaler{ @@ -540,7 +571,7 @@ func TestReceiverRecvError(t *testing.T) { tc := healthyTestChannel{} ctc := newCommonTestCase(t, tc) - ctc.start(ctc.newRealConsumer, defaultBQ) + ctc.start(ctc.newRealConsumer, defaultBQ()) ctc.putBatch(nil, fmt.Errorf("test recv error")) @@ -557,16 +588,27 @@ func TestReceiverSendError(t *testing.T) { batch, err := ctc.testProducer.BatchArrowRecordsFromLogs(ld) require.NoError(t, err) - ctc.stream.EXPECT().Send(statusOKFor(batch.BatchId)).Times(1).Return(fmt.Errorf("test send error")) + ctc.stream.EXPECT().Send(statusOKFor(batch.BatchId)).Times(1).Return(status.Errorf(codes.Unavailable, "test send error")) - ctc.start(ctc.newRealConsumer, defaultBQ) + ctc.start(ctc.newRealConsumer, defaultBQ()) ctc.putBatch(batch, nil) assert.EqualValues(t, ld, (<-ctc.consume).Data) + start := time.Now() + for time.Since(start) < 10*time.Second { + if ctc.ctrl.Satisfied() { + break + } + time.Sleep(time.Second) + } + + // Release the receiver -- the sender has seen an error by + // now and should return the stream. (Oddly, gRPC has no way + // to signal the receive call to fail using context.) + close(ctc.receive) err = ctc.wait() - require.Error(t, err) - require.Contains(t, err.Error(), "test send error") + requireUnavailableStatus(t, err) } func TestReceiverConsumeError(t *testing.T) { @@ -600,7 +642,7 @@ func TestReceiverConsumeError(t *testing.T) { ctc.stream.EXPECT().Send(statusUnavailableFor(batch.BatchId, "consumer unhealthy")).Times(1).Return(nil) - ctc.start(ctc.newRealConsumer, defaultBQ) + ctc.start(ctc.newRealConsumer, defaultBQ()) ctc.putBatch(batch, nil) @@ -638,7 +680,7 @@ func TestReceiverInvalidData(t *testing.T) { } for _, item := range data { - tc := unhealthyTestChannel{} + tc := healthyTestChannel{} ctc := newCommonTestCase(t, tc) var batch *arrowpb.BatchArrowRecords @@ -657,13 +699,12 @@ func TestReceiverInvalidData(t *testing.T) { batch = copyBatch(batch) - ctc.stream.EXPECT().Send(statusInvalidFor(batch.BatchId, "Permanent error: test invalid error")).Times(1).Return(nil) - - ctc.start(ctc.newErrorConsumer, defaultBQ) + // newErrorConsumer determines the internal error in decoding above + ctc.start(ctc.newErrorConsumer, defaultBQ()) ctc.putBatch(batch, nil) - err = ctc.cancelAndWait() - requireCanceledStatus(t, err) + err = ctc.wait() + requireInternalStatus(t, err) } } @@ -694,13 +735,13 @@ func TestReceiverMemoryLimit(t *testing.T) { batch = copyBatch(batch) - ctc.stream.EXPECT().Send(statusExhaustedFor(batch.BatchId, "Permanent error: test oom error "+arrowRecord.ErrConsumerMemoryLimit.Error())).Times(1).Return(nil) + // The Recv() returns an error, there are no Send() calls. - ctc.start(ctc.newOOMConsumer, defaultBQ) + ctc.start(ctc.newOOMConsumer, defaultBQ()) ctc.putBatch(batch, nil) - err = ctc.cancelAndWait() - requireCanceledStatus(t, err) + err = ctc.wait() + requireExhaustedStatus(t, err) } } @@ -743,7 +784,7 @@ func TestReceiverEOF(t *testing.T) { ctc.stream.EXPECT().Send(gomock.Any()).Times(times).Return(nil) - ctc.start(ctc.newRealConsumer, defaultBQ) + ctc.start(ctc.newRealConsumer, defaultBQ()) go func() { for i := 0; i < times; i++ { @@ -808,7 +849,7 @@ func testReceiverHeaders(t *testing.T, includeMeta bool) { ctc.stream.EXPECT().Send(gomock.Any()).Times(len(expectData)).Return(nil) - ctc.start(ctc.newRealConsumer, defaultBQ, func(gsettings *configgrpc.ServerConfig, _ *auth.Server) { + ctc.start(ctc.newRealConsumer, defaultBQ(), func(gsettings *configgrpc.ServerConfig, _ *auth.Server) { gsettings.IncludeMetadata = includeMeta }) @@ -880,7 +921,7 @@ func TestReceiverCancel(t *testing.T) { ctc := newCommonTestCase(t, tc) ctc.cancel() - ctc.start(ctc.newRealConsumer, defaultBQ) + ctc.start(ctc.newRealConsumer, defaultBQ()) err := ctc.wait() requireCanceledStatus(t, err) @@ -1170,7 +1211,7 @@ func testReceiverAuthHeaders(t *testing.T, includeMeta bool, dataAuth bool) { }) var authCall *gomock.Call - ctc.start(ctc.newRealConsumer, defaultBQ, func(gsettings *configgrpc.ServerConfig, authPtr *auth.Server) { + ctc.start(ctc.newRealConsumer, defaultBQ(), func(gsettings *configgrpc.ServerConfig, authPtr *auth.Server) { gsettings.IncludeMetadata = includeMeta as := mock.NewMockServer(ctc.ctrl) diff --git a/collector/receiver/otelarrowreceiver/otelarrow_test.go b/collector/receiver/otelarrowreceiver/otelarrow_test.go index 43627341..c89be597 100644 --- a/collector/receiver/otelarrowreceiver/otelarrow_test.go +++ b/collector/receiver/otelarrowreceiver/otelarrow_test.go @@ -9,6 +9,7 @@ import ( "errors" "fmt" "net" + "strconv" "sync" "testing" "time" @@ -380,15 +381,25 @@ func (esc *errOrSinkConsumer) Reset() { type tracesSinkWithMetadata struct { consumertest.TracesSink - MDs []client.Metadata + + lock sync.Mutex + mds []client.Metadata } func (ts *tracesSinkWithMetadata) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { info := client.FromContext(ctx) - ts.MDs = append(ts.MDs, info.Metadata) + ts.lock.Lock() + defer ts.lock.Unlock() + ts.mds = append(ts.mds, info.Metadata) return ts.TracesSink.ConsumeTraces(ctx, td) } +func (ts *tracesSinkWithMetadata) Metadatas() []client.Metadata { + ts.lock.Lock() + defer ts.lock.Unlock() + return ts.mds +} + type anyStreamClient interface { Send(*arrowpb.BatchArrowRecords) error Recv() (*arrowpb.BatchStatus, error) @@ -470,12 +481,12 @@ func TestGRPCArrowReceiver(t *testing.T) { assert.Equal(t, expectTraces, sink.AllTraces()) - assert.Equal(t, len(expectMDs), len(sink.MDs)) + assert.Equal(t, len(expectMDs), len(sink.Metadatas())) // gRPC adds its own metadata keys, so we check for only the // expected ones below: for idx := range expectMDs { for key, vals := range expectMDs[idx] { - require.Equal(t, vals, sink.MDs[idx].Get(key), "for key %s", key) + require.Equal(t, vals, sink.Metadatas()[idx].Get(key), "for key %s", key) } } } @@ -565,8 +576,8 @@ func TestGRPCArrowReceiverAuth(t *testing.T) { // The stream has to be successful to get this far. The // authenticator fails every data item: require.Equal(t, batch.BatchId, resp.BatchId) - require.Equal(t, arrowpb.StatusCode_UNAVAILABLE, resp.StatusCode) - require.Equal(t, errorString, resp.StatusMessage) + require.Equal(t, arrowpb.StatusCode_UNAUTHENTICATED, resp.StatusCode) + require.Contains(t, resp.StatusMessage, errorString) } assert.NoError(t, cc.Close()) @@ -574,3 +585,92 @@ func TestGRPCArrowReceiverAuth(t *testing.T) { assert.Equal(t, 0, len(sink.AllTraces())) } + +func TestConcurrentArrowReceiver(t *testing.T) { + addr := testutil.GetAvailableLocalAddress(t) + sink := new(tracesSinkWithMetadata) + + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + cfg.GRPC.NetAddr.Endpoint = addr + cfg.GRPC.IncludeMetadata = true + id := component.NewID(component.MustNewType("arrow")) + tt := componenttest.NewNopTelemetrySettings() + ocr := newReceiver(t, factory, tt, cfg, id, sink, nil) + + require.NotNil(t, ocr) + require.NoError(t, ocr.Start(context.Background(), componenttest.NewNopHost())) + + cc, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock()) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + const itemsPerStream = 10 + const numStreams = 5 + + var wg sync.WaitGroup + wg.Add(numStreams) + + for j := 0; j < numStreams; j++ { + go func() { + defer wg.Done() + + client := arrowpb.NewArrowTracesServiceClient(cc) + stream, err := client.ArrowTraces(ctx, grpc.WaitForReady(true)) + require.NoError(t, err) + producer := arrowRecord.NewProducer() + + var headerBuf bytes.Buffer + hpd := hpack.NewEncoder(&headerBuf) + + // Repeatedly send traces via arrow. Set the expected traces + // metadata to receive. + for i := 0; i < itemsPerStream; i++ { + td := testdata.GenerateTraces(2) + + headerBuf.Reset() + err := hpd.WriteField(hpack.HeaderField{ + Name: "seq", + Value: fmt.Sprint(i), + }) + require.NoError(t, err) + + batch, err := producer.BatchArrowRecordsFromTraces(td) + require.NoError(t, err) + + batch.Headers = headerBuf.Bytes() + + err = stream.Send(batch) + + require.NoError(t, err) + + resp, err := stream.Recv() + require.NoError(t, err) + require.Equal(t, batch.BatchId, resp.BatchId) + require.Equal(t, arrowpb.StatusCode_OK, resp.StatusCode) + } + }() + } + wg.Wait() + + assert.NoError(t, cc.Close()) + require.NoError(t, ocr.Shutdown(context.Background())) + + counts := make([]int, itemsPerStream) + + // Two spans per stream/item. + require.Equal(t, itemsPerStream*numStreams*2, sink.SpanCount()) + require.Equal(t, itemsPerStream*numStreams, len(sink.Metadatas())) + + for _, md := range sink.Metadatas() { + val, err := strconv.Atoi(md.Get("seq")[0]) + require.NoError(t, err) + counts[val]++ + } + + for i := 0; i < itemsPerStream; i++ { + require.Equal(t, numStreams, counts[i]) + } +} diff --git a/collector/test/e2e_test.go b/collector/test/e2e_test.go new file mode 100644 index 00000000..7492b00b --- /dev/null +++ b/collector/test/e2e_test.go @@ -0,0 +1,191 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package test + +import ( + "context" + "encoding/json" + "fmt" + "math/rand" + "sync" + "testing" + "time" + + "github.com/open-telemetry/otel-arrow/collector/exporter/otelarrowexporter" + "github.com/open-telemetry/otel-arrow/collector/receiver/otelarrowreceiver" + "github.com/open-telemetry/otel-arrow/collector/testutil" + "github.com/open-telemetry/otel-arrow/pkg/otel/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" + "go.opentelemetry.io/collector/receiver" + "go.uber.org/zap" +) + +type testConsumer struct { + sink consumertest.TracesSink +} + +var _ consumer.Traces = &testConsumer{} + +func (*testConsumer) Capabilities() consumer.Capabilities { + return consumer.Capabilities{} +} + +func (tc *testConsumer) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { + time.Sleep(time.Duration(float64(time.Millisecond) * (1 + rand.Float64()))) + return tc.sink.ConsumeTraces(ctx, td) +} + +func TestIntegrationSimpleTraces(t *testing.T) { + const ( + threadCount = 10 + requestCount = 100 + ) + + efact := otelarrowexporter.NewFactory() + rfact := otelarrowreceiver.NewFactory() + + ecfg := efact.CreateDefaultConfig() + rcfg := rfact.CreateDefaultConfig() + + receiverCfg := rcfg.(*otelarrowreceiver.Config) + exporterCfg := ecfg.(*otelarrowexporter.Config) + + addr := testutil.GetAvailableLocalAddress(t) + + receiverCfg.Protocols.GRPC.NetAddr.Endpoint = addr + exporterCfg.ClientConfig.Endpoint = addr + exporterCfg.ClientConfig.WaitForReady = true + exporterCfg.ClientConfig.TLSSetting.Insecure = true + exporterCfg.TimeoutSettings.Timeout = time.Minute + exporterCfg.QueueSettings.Enabled = false + exporterCfg.RetryConfig.Enabled = false + exporterCfg.Arrow.NumStreams = 1 + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tset := componenttest.NewNopTelemetrySettings() + tset.Logger, _ = zap.NewDevelopment() + + host := componenttest.NewNopHost() + + testCon := &testConsumer{} + + receiver, err := rfact.CreateTracesReceiver(ctx, receiver.CreateSettings{ + ID: component.MustNewID("otelarrowreceiver"), + TelemetrySettings: tset, + }, receiverCfg, testCon) + require.NoError(t, err) + + exporter, err := efact.CreateTracesExporter(ctx, exporter.CreateSettings{ + ID: component.MustNewID("otelarrowexporter"), + TelemetrySettings: tset, + }, exporterCfg) + require.NoError(t, err) + + var startWG sync.WaitGroup + var exporterShutdownWG sync.WaitGroup + var startExporterShutdownWG sync.WaitGroup + var receiverShutdownWG sync.WaitGroup // wait for receiver shutdown + + receiverShutdownWG.Add(1) + exporterShutdownWG.Add(1) + startExporterShutdownWG.Add(1) + startWG.Add(1) + + // Run the receiver, shutdown after exporter does. + go func() { + defer receiverShutdownWG.Done() + require.NoError(t, receiver.Start(ctx, host)) + exporterShutdownWG.Wait() + require.NoError(t, receiver.Shutdown(ctx)) + }() + + // Run the exporter and wait for clients to finish + go func() { + defer exporterShutdownWG.Done() + require.NoError(t, exporter.Start(ctx, host)) + startWG.Done() + startExporterShutdownWG.Wait() + require.NoError(t, exporter.Shutdown(ctx)) + }() + + // wait for the exporter to start + startWG.Wait() + var clientDoneWG sync.WaitGroup // wait for client to finish + + var expect [threadCount][]ptrace.Traces + + for num := 0; num < threadCount; num++ { + clientDoneWG.Add(1) + go func() { + defer clientDoneWG.Done() + for i := 0; i < requestCount; i++ { + td := ptrace.NewTraces() + td.ResourceSpans().AppendEmpty().Resource().Attributes().PutStr("resource-attr", fmt.Sprint("resource-attr-val-", i)) + + ss := td.ResourceSpans().At(0).ScopeSpans().AppendEmpty().Spans() + span := ss.AppendEmpty() + + span.SetName("operationA") + span.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Now())) + span.SetEndTimestamp(pcommon.NewTimestampFromTime(time.Now())) + + span.SetTraceID(testutil.UInt64ToTraceID(rand.Uint64(), rand.Uint64())) + span.SetSpanID(testutil.UInt64ToSpanID(rand.Uint64())) + evs := span.Events() + ev0 := evs.AppendEmpty() + ev0.SetTimestamp(pcommon.NewTimestampFromTime(time.Now())) + ev0.SetName("event-with-attr") + ev0.Attributes().PutStr("span-event-attr", "span-event-attr-val") + ev0.SetDroppedAttributesCount(2) + ev1 := evs.AppendEmpty() + ev1.SetTimestamp(pcommon.NewTimestampFromTime(time.Now())) + ev1.SetName("event") + ev1.SetDroppedAttributesCount(2) + span.SetDroppedEventsCount(1) + status := span.Status() + status.SetCode(ptrace.StatusCodeError) + status.SetMessage("status-cancelled") + + require.NoError(t, exporter.ConsumeTraces(ctx, td)) + expect[num] = append(expect[num], td) + } + }() + } + + // wait til senders finish + clientDoneWG.Wait() + + // shut down exporter; it triggers receiver to shut down + startExporterShutdownWG.Done() + + // wait for receiver to shut down + receiverShutdownWG.Wait() + + // Check for matching request count and data + require.Equal(t, requestCount*threadCount, testCon.sink.SpanCount()) + + var expectJSON []json.Marshaler + for _, tdn := range expect { + for _, td := range tdn { + expectJSON = append(expectJSON, ptraceotlp.NewExportRequestFromTraces(td)) + } + } + var receivedJSON []json.Marshaler + + for _, td := range testCon.sink.AllTraces() { + receivedJSON = append(receivedJSON, ptraceotlp.NewExportRequestFromTraces(td)) + } + asserter := assert.NewStdUnitTest(t) + assert.Equiv(asserter, expectJSON, receivedJSON) +} diff --git a/collector/test/go.mod b/collector/test/go.mod new file mode 100644 index 00000000..b3d71ba3 --- /dev/null +++ b/collector/test/go.mod @@ -0,0 +1,82 @@ +module github.com/open-telemetry/otel-arrow/collector/test + +go 1.22.2 + +require ( + github.com/open-telemetry/otel-arrow/collector/exporter/otelarrowexporter v0.23.0 + github.com/open-telemetry/otel-arrow/collector/receiver/otelarrowreceiver v0.23.0 +) + +require ( + github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect + github.com/apache/arrow/go/v14 v14.0.2 // indirect + github.com/axiomhq/hyperloglog v0.0.0-20230201085229-3ddf4bad03dc // indirect + github.com/bahlo/generic-list-go v0.2.0 // indirect + github.com/buger/jsonparser v1.1.1 // indirect + github.com/cenkalti/backoff/v4 v4.3.0 // indirect + github.com/dgryski/go-metro v0.0.0-20180109044635-280f6062b5bc // indirect + github.com/fsnotify/fsnotify v1.7.0 // indirect + github.com/fxamacker/cbor/v2 v2.4.0 // indirect + github.com/go-logr/logr v1.4.1 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1 // indirect + github.com/goccy/go-json v0.10.2 // indirect + github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect + github.com/google/flatbuffers v23.5.26+incompatible // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/hashicorp/go-version v1.6.0 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/klauspost/compress v1.17.8 // indirect + github.com/klauspost/cpuid/v2 v2.2.5 // indirect + github.com/knadh/koanf/maps v0.1.1 // indirect + github.com/knadh/koanf/providers/confmap v0.1.0 // indirect + github.com/knadh/koanf/v2 v2.1.1 // indirect + github.com/mailru/easyjson v0.7.7 // indirect + github.com/mitchellh/copystructure v1.2.0 // indirect + github.com/mitchellh/reflectwalk v1.0.2 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/mostynb/go-grpc-compression v1.2.2 // indirect + github.com/open-telemetry/otel-arrow v0.23.0 // indirect + github.com/open-telemetry/otel-arrow/collector v0.23.0 // indirect + github.com/pierrec/lz4/v4 v4.1.18 // indirect + github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect + github.com/x448/float16 v0.8.4 // indirect + github.com/zeebo/xxh3 v1.0.2 // indirect + go.opentelemetry.io/collector v0.98.0 // indirect + go.opentelemetry.io/collector/component v0.98.0 // indirect + go.opentelemetry.io/collector/config/configauth v0.98.0 // indirect + go.opentelemetry.io/collector/config/configcompression v1.5.0 // indirect + go.opentelemetry.io/collector/config/configgrpc v0.98.0 // indirect + go.opentelemetry.io/collector/config/confignet v0.98.0 // indirect + go.opentelemetry.io/collector/config/configopaque v1.5.0 // indirect + go.opentelemetry.io/collector/config/configretry v0.98.0 // indirect + go.opentelemetry.io/collector/config/configtelemetry v0.98.0 // indirect + go.opentelemetry.io/collector/config/configtls v0.98.0 // indirect + go.opentelemetry.io/collector/config/internal v0.98.0 // indirect + go.opentelemetry.io/collector/confmap v0.98.0 // indirect + go.opentelemetry.io/collector/consumer v0.98.0 // indirect + go.opentelemetry.io/collector/exporter v0.98.0 // indirect + go.opentelemetry.io/collector/extension v0.98.0 // indirect + go.opentelemetry.io/collector/extension/auth v0.98.0 // indirect + go.opentelemetry.io/collector/featuregate v1.5.0 // indirect + go.opentelemetry.io/collector/pdata v1.5.0 // indirect + go.opentelemetry.io/collector/receiver v0.98.0 // indirect + go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect + go.opentelemetry.io/otel v1.25.0 // indirect + go.opentelemetry.io/otel/metric v1.25.0 // indirect + go.opentelemetry.io/otel/trace v1.25.0 // indirect + go.uber.org/multierr v1.11.0 // indirect + go.uber.org/zap v1.27.0 // indirect + golang.org/x/mod v0.14.0 // indirect + golang.org/x/net v0.24.0 // indirect + golang.org/x/sys v0.19.0 // indirect + golang.org/x/text v0.14.0 // indirect + golang.org/x/tools v0.15.0 // indirect + golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda // indirect + google.golang.org/grpc v1.63.2 // indirect + google.golang.org/protobuf v1.33.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/collector/test/go.sum b/collector/test/go.sum new file mode 100644 index 00000000..94a66c24 --- /dev/null +++ b/collector/test/go.sum @@ -0,0 +1,269 @@ +dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= +github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= +github.com/HdrHistogram/hdrhistogram-go v1.1.2 h1:5IcZpTvzydCQeHzK4Ef/D5rrSqwxob0t8PQPMybUNFM= +github.com/HdrHistogram/hdrhistogram-go v1.1.2/go.mod h1:yDgFjdqOqDEKOvasDdhWNXYg9BVp4O+o5f6V/ehm6Oo= +github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw= +github.com/apache/arrow/go/v14 v14.0.2 h1:N8OkaJEOfI3mEZt07BIkvo4sC6XDbL+48MBPWO5IONw= +github.com/apache/arrow/go/v14 v14.0.2/go.mod h1:u3fgh3EdgN/YQ8cVQRguVW3R+seMybFg8QBQ5LU+eBY= +github.com/axiomhq/hyperloglog v0.0.0-20230201085229-3ddf4bad03dc h1:Keo7wQ7UODUaHcEi7ltENhbAK2VgZjfat6mLy03tQzo= +github.com/axiomhq/hyperloglog v0.0.0-20230201085229-3ddf4bad03dc/go.mod h1:k08r+Yj1PRAmuayFiRK6MYuR5Ve4IuZtTfxErMIh0+c= +github.com/bahlo/generic-list-go v0.2.0 h1:5sz/EEAK+ls5wF+NeqDpk5+iNdMDXrh3z3nPnH1Wvgk= +github.com/bahlo/generic-list-go v0.2.0/go.mod h1:2KvAjgMlE5NNynlg/5iLrrCCZ2+5xWbdbCW3pNTGyYg= +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/brianvoe/gofakeit/v6 v6.17.0 h1:obbQTJeHfktJtiZzq0Q1bEpsNUs+yHrYlPVWt7BtmJ4= +github.com/brianvoe/gofakeit/v6 v6.17.0/go.mod h1:Ow6qC71xtwm79anlwKRlWZW6zVq9D2XHE4QSSMP/rU8= +github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs= +github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0= +github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= +github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-metro v0.0.0-20180109044635-280f6062b5bc h1:8WFBn63wegobsYAX0YjD+8suexZDga5CctH4CCTx2+8= +github.com/dgryski/go-metro v0.0.0-20180109044635-280f6062b5bc/go.mod h1:c9O8+fpSOX1DM8cPNSkX/qsBWdkD4yd2dpciOWQjpBw= +github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= +github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= +github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= +github.com/fxamacker/cbor/v2 v2.4.0 h1:ri0ArlOR+5XunOP8CRUowT0pSJOwhW098ZCUyskZD88= +github.com/fxamacker/cbor/v2 v2.4.0/go.mod h1:TA1xS00nchWmaBnEIxPSE5oHLuJBAVvqrtAnWBwBCVo= +github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= +github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1 h1:TQcrn6Wq+sKGkpyPvppOz99zsMBaUOKXq6HSv655U1c= +github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= +github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= +github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k= +github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb h1:PBC98N2aIaM3XXiurYmW7fx4GZkL8feAMVq7nEjURHk= +github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/flatbuffers v23.5.26+incompatible h1:M9dgRyhJemaM4Sw8+66GHBu8ioaQmyPLg1b8VwK5WJg= +github.com/google/flatbuffers v23.5.26+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= +github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/go-version v1.6.0 h1:feTTfFNnjP967rlCxM/I9g701jU+RN74YKx2mOkIeek= +github.com/hashicorp/go-version v1.6.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= +github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU= +github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/qbZg= +github.com/klauspost/cpuid/v2 v2.2.5/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= +github.com/knadh/koanf/maps v0.1.1 h1:G5TjmUh2D7G2YWf5SQQqSiHRJEjaicvU0KpypqB3NIs= +github.com/knadh/koanf/maps v0.1.1/go.mod h1:npD/QZY3V6ghQDdcQzl1W4ICNVTkohC8E73eI2xW4yI= +github.com/knadh/koanf/providers/confmap v0.1.0 h1:gOkxhHkemwG4LezxxN8DMOFopOPghxRVp7JbIvdvqzU= +github.com/knadh/koanf/providers/confmap v0.1.0/go.mod h1:2uLhxQzJnyHKfxG927awZC7+fyHFdQkd697K4MdLnIU= +github.com/knadh/koanf/v2 v2.1.1 h1:/R8eXqasSTsmDCsAyYj+81Wteg8AqrV9CP6gvsTsOmM= +github.com/knadh/koanf/v2 v2.1.1/go.mod h1:4mnTRbZCK+ALuBXHZMjDfG9y714L7TykVnZkXbMU3Es= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= +github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= +github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw= +github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s= +github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ= +github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/mostynb/go-grpc-compression v1.2.2 h1:XaDbnRvt2+1vgr0b/l0qh4mJAfIxE0bKXtz2Znl3GGI= +github.com/mostynb/go-grpc-compression v1.2.2/go.mod h1:GOCr2KBxXcblCuczg3YdLQlcin1/NfyDA348ckuCH6w= +github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= +github.com/open-telemetry/otel-arrow v0.23.0 h1:Vx4q3GR36l9O+S7ZOOITNL1TPp+X1WxkXbeXQA146k0= +github.com/open-telemetry/otel-arrow v0.23.0/go.mod h1:F50XFaiNfkfB0MYftZIUKFULm6pxfGqjbgQzevi+65M= +github.com/open-telemetry/otel-arrow/collector v0.23.0 h1:ztmq1ipJBhm4xWjHDbmKOtgP3Nl/ZDoLX+3ThhzFs6k= +github.com/open-telemetry/otel-arrow/collector v0.23.0/go.mod h1:SLgLEhhcfR9MjG1taK8RPuwiuIoAPW7IpCjFBobwIUM= +github.com/open-telemetry/otel-arrow/collector/exporter/otelarrowexporter v0.23.0 h1:uAWR1ZIiXXHIxUnPQhbWJvs2eHvZMEhcEnZvyQKsLq4= +github.com/open-telemetry/otel-arrow/collector/exporter/otelarrowexporter v0.23.0/go.mod h1:69lDwg/ju5fsX0K+XYMg6w0EPgZmp28riVwhmpxfiY8= +github.com/open-telemetry/otel-arrow/collector/receiver/otelarrowreceiver v0.23.0 h1:HTjZetc7n+PHVZ+ih6BZV4omd9rbufmdSQdzjwuPxuU= +github.com/open-telemetry/otel-arrow/collector/receiver/otelarrowreceiver v0.23.0/go.mod h1:x7vy8cEz76eZirkkPe2dbtKAwZdokmq2wq3c/u9+k8A= +github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ= +github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.19.0 h1:ygXvpU1AoN1MhdzckN+PyD9QJOSD4x7kmXYlnfbA6JU= +github.com/prometheus/client_golang v1.19.0/go.mod h1:ZRM9uEAypZakd+q/x7+gmsvXdURP+DABIEIjnmDdp+k= +github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= +github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= +github.com/prometheus/common v0.48.0 h1:QO8U2CdOzSn1BBsmXJXduaaW+dY/5QLjfB8svtSzKKE= +github.com/prometheus/common v0.48.0/go.mod h1:0/KsvlIEfPQCQ5I2iNSAWKPZziNCvRs5EC6ILDTlAPc= +github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo= +github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= +github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= +github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/wk8/go-ordered-map/v2 v2.1.8 h1:5h/BUHu93oj4gIdvHHHGsScSTMijfx5PeYkE/fJgbpc= +github.com/wk8/go-ordered-map/v2 v2.1.8/go.mod h1:5nJHM5DyteebpVlHnWMV0rPz6Zp7+xBAnxjb1X5vnTw= +github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= +github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= +github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= +github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= +github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= +go.opentelemetry.io/collector v0.98.0 h1:O7bpARGWzNfFQEYevLl4iigDrpGTJY3vV/kKqNZzMOk= +go.opentelemetry.io/collector v0.98.0/go.mod h1:fvPM+tBML07uvAP1MV2msYPSYJ9U/lgE1jDb3AFBaMM= +go.opentelemetry.io/collector/component v0.98.0 h1:0TMaBOyCdABiVLFdGOgG8zd/1IeGldCinYonbY08xWk= +go.opentelemetry.io/collector/component v0.98.0/go.mod h1:F6zyQLsoExl6r2q6WWZm8rmSSALbwG2zwIHLrMzZVio= +go.opentelemetry.io/collector/config/configauth v0.98.0 h1:FPffZ1dRL6emStrDUEGpL0rCChbUZNAQgpArXD0SESI= +go.opentelemetry.io/collector/config/configauth v0.98.0/go.mod h1:5pMzf2zgFwS7tujNq0AtOOli5vxIvnrNi7JlZwrBOFo= +go.opentelemetry.io/collector/config/configcompression v1.5.0 h1:FTxKbFPN4LznRCH/GQ+b+0tAWmg80Y2eEka79S2sLZ0= +go.opentelemetry.io/collector/config/configcompression v1.5.0/go.mod h1:O0fOPCADyGwGLLIf5lf7N3960NsnIfxsm6dr/mIpL+M= +go.opentelemetry.io/collector/config/configgrpc v0.98.0 h1:4yP/TphwQnbgLpJ72NymXaERVjLjuDAQp4iDKCTcv5g= +go.opentelemetry.io/collector/config/configgrpc v0.98.0/go.mod h1:tIng0xx1XlVr4I0YG5bNpts0hZDjwzN3Jkz6cKaSH/s= +go.opentelemetry.io/collector/config/confignet v0.98.0 h1:pXDBb2hFe10T/NMHlL/oMgk1aFfe4NmmJFdFoioyC9o= +go.opentelemetry.io/collector/config/confignet v0.98.0/go.mod h1:3naWoPss70RhDHhYjGACi7xh4NcVRvs9itzIRVWyu1k= +go.opentelemetry.io/collector/config/configopaque v1.5.0 h1:WJzgmsFU2v63BypPBNGL31ACwWn6PwumPJNpLZplcdE= +go.opentelemetry.io/collector/config/configopaque v1.5.0/go.mod h1:/otnfj2E8r5EfaAdNV4qHkTclmiBCZXaahV5EcLwT7k= +go.opentelemetry.io/collector/config/configretry v0.98.0 h1:gZRenX9oMLJmQ/CD8YwFNl9YYl68RtcD0RYSCJhrMAk= +go.opentelemetry.io/collector/config/configretry v0.98.0/go.mod h1:uRdmPeCkrW9Zsadh2WEbQ1AGXGYJ02vCfmmT+0g69nY= +go.opentelemetry.io/collector/config/configtelemetry v0.98.0 h1:f8RNZ1l/kYPPoxFmKKvTUli8iON7CMsm85KM38PVNts= +go.opentelemetry.io/collector/config/configtelemetry v0.98.0/go.mod h1:YV5PaOdtnU1xRomPcYqoHmyCr48tnaAREeGO96EZw8o= +go.opentelemetry.io/collector/config/configtls v0.98.0 h1:g+MADy01ge8iGC6v2tbJ5G27CWNG1BaJtmYdmpvm8e4= +go.opentelemetry.io/collector/config/configtls v0.98.0/go.mod h1:9RHArziz0mNEEkti0kz5LIdvbQGT7/Unu/0whKKazHQ= +go.opentelemetry.io/collector/config/internal v0.98.0 h1:wz/6ncawMX5cfIiXJEYSUm1g1U6iE/VxFRm4/WhVBPI= +go.opentelemetry.io/collector/config/internal v0.98.0/go.mod h1:xPnEE6QaTSXr+ctYMSTBxI2qwTntTUM4cYk7OTm6Ugc= +go.opentelemetry.io/collector/confmap v0.98.0 h1:qQreBlrqio1y7uhrAvr+W86YbQ6fw7StgkbYpvJ2vVc= +go.opentelemetry.io/collector/confmap v0.98.0/go.mod h1:BWKPIpYeUzSG6ZgCJMjF7xsLvyrvJCfYURl57E5vhiQ= +go.opentelemetry.io/collector/consumer v0.98.0 h1:47zJ5HFKXVA0RciuwkZnPU5W8j0TYUxToB1/zzzgEhs= +go.opentelemetry.io/collector/consumer v0.98.0/go.mod h1:c2edTq38uVJET/NE6VV7/Qpyznnlz8b6VE7J6TXD57c= +go.opentelemetry.io/collector/exporter v0.98.0 h1:eN2qtkiwpeX9gBu9JZw1k/CZ3N9wZE1aGJ1A0EvwJ7w= +go.opentelemetry.io/collector/exporter v0.98.0/go.mod h1:GCW46a0VAuW7nljlW//GgFXI+8mSrJjrdEKVO9icExE= +go.opentelemetry.io/collector/extension v0.98.0 h1:08B5ipEsoNmPHY96j5EUsUrFre01GOZ4zgttUDtPUkY= +go.opentelemetry.io/collector/extension v0.98.0/go.mod h1:fZ1Hnnahszl5j3xcW2sMRJ0FLWDOFkFMQeVDP0Se7i8= +go.opentelemetry.io/collector/extension/auth v0.98.0 h1:7b1jioijJbTMqaOCrz5Hoqf+zJn2iPlGmtN7pXLNWbA= +go.opentelemetry.io/collector/extension/auth v0.98.0/go.mod h1:gssWC4AxAwAEKI2CqS93lhjWffsVdzD8q7UGL6LaRr0= +go.opentelemetry.io/collector/featuregate v1.5.0 h1:uK8qnYQKz1TMkK+FDTFsywg/EybW/gbnOUaPNUkRznM= +go.opentelemetry.io/collector/featuregate v1.5.0/go.mod h1:w7nUODKxEi3FLf1HslCiE6YWtMtOOrMnSwsDam8Mg9w= +go.opentelemetry.io/collector/pdata v1.5.0 h1:1fKTmUpr0xCOhP/B0VEvtz7bYPQ45luQ8XFyA07j8LE= +go.opentelemetry.io/collector/pdata v1.5.0/go.mod h1:TYj8aKRWZyT/KuKQXKyqSEvK/GV+slFaDMEI+Ke64Yw= +go.opentelemetry.io/collector/pdata/testdata v0.98.0 h1:8gohV+LFXqMzuDwfOOQy9GcZBOX0C9xGoQkoeXFTzmI= +go.opentelemetry.io/collector/pdata/testdata v0.98.0/go.mod h1:B/IaHcf6+RtxI292CZu9TjfYQdi1n4+v6b8rHEonpKs= +go.opentelemetry.io/collector/receiver v0.98.0 h1:qw6JYwm+sHcZvM1DByo3QlGe6yGHuwd0yW4hEPVqYKU= +go.opentelemetry.io/collector/receiver v0.98.0/go.mod h1:AwIWn+KnquTR+kbhXQrMH+i2PvTCFldSIJznBWFYs0s= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 h1:4Pp6oUg3+e/6M4C0A/3kJ2VYa++dsWVTtGgLVj5xtHg= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0/go.mod h1:Mjt1i1INqiaoZOMGR1RIUJN+i3ChKoFRqzrRQhlkbs0= +go.opentelemetry.io/otel v1.25.0 h1:gldB5FfhRl7OJQbUHt/8s0a7cE8fbsPAtdpRaApKy4k= +go.opentelemetry.io/otel v1.25.0/go.mod h1:Wa2ds5NOXEMkCmUou1WA7ZBfLTHWIsp034OVD7AO+Vg= +go.opentelemetry.io/otel/exporters/prometheus v0.47.0 h1:OL6yk1Z/pEGdDnrBbxSsH+t4FY1zXfBRGd7bjwhlMLU= +go.opentelemetry.io/otel/exporters/prometheus v0.47.0/go.mod h1:xF3N4OSICZDVbbYZydz9MHFro1RjmkPUKEvar2utG+Q= +go.opentelemetry.io/otel/metric v1.25.0 h1:LUKbS7ArpFL/I2jJHdJcqMGxkRdxpPHE0VU/D4NuEwA= +go.opentelemetry.io/otel/metric v1.25.0/go.mod h1:rkDLUSd2lC5lq2dFNrX9LGAbINP5B7WBkC78RXCpH5s= +go.opentelemetry.io/otel/sdk v1.25.0 h1:PDryEJPC8YJZQSyLY5eqLeafHtG+X7FWnf3aXMtxbqo= +go.opentelemetry.io/otel/sdk v1.25.0/go.mod h1:oFgzCM2zdsxKzz6zwpTZYLLQsFwc+K0daArPdIhuxkw= +go.opentelemetry.io/otel/sdk/metric v1.25.0 h1:7CiHOy08LbrxMAp4vWpbiPcklunUshVpAvGBrdDRlGw= +go.opentelemetry.io/otel/sdk/metric v1.25.0/go.mod h1:LzwoKptdbBBdYfvtGCzGwk6GWMA3aUzBOwtQpR6Nz7o= +go.opentelemetry.io/otel/trace v1.25.0 h1:tqukZGLwQYRIFtSQM2u2+yfMVTgGVeqRLPUYx1Dq6RM= +go.opentelemetry.io/otel/trace v1.25.0/go.mod h1:hCCs70XM/ljO+BeQkyFnbK28SBIJ/Emuha+ccrCRT7I= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU= +go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/exp v0.0.0-20190125153040-c74c464bbbf2/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/exp v0.0.0-20191030013958-a1ab85dbe136/go.mod h1:JXzH8nQsPlswgeRAPE3MuO9GYsAcnJvJ4vnMwN/5qkY= +golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa h1:FRnLl4eNAQl8hwxVVC17teOw8kdjVDVAiFMtgUdTSRQ= +golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa/go.mod h1:zk2irFbV9DP96SEBUUAy67IdHUaZuSnrz1n472HUCLE= +golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs= +golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= +golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= +golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o= +golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.14.0 h1:dGoOF9QVLYng8IHTm7BAyWqCqSheQ5pYWGhzW00YJr0= +golang.org/x/mod v0.14.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.24.0 h1:1PcaxkF854Fu3+lvBIx5SYn9wRlBzzcnHZSiaFFAb0w= +golang.org/x/net v0.24.0/go.mod h1:2Q7sJY5mzlzWjKtYUEXSlBWCdyaioyXzRB2RtU8KVE8= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= +golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o= +golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190206041539-40960b6deb8e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191012152004-8de300cfc20a/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.15.0 h1:zdAyfUGbYmuVokhzVmghFl2ZJh5QhcfebBgmVPFYA+8= +golang.org/x/tools v0.15.0/go.mod h1:hpksKq4dtpQWS1uQ61JkdqWM3LscIS6Slf+VVkm+wQk= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 h1:+cNy6SZtPcJQH3LJVLOSmiC7MMxXNOb3PU/VUEz+EhU= +golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90= +gonum.org/v1/gonum v0.0.0-20180816165407-929014505bf4/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo= +gonum.org/v1/gonum v0.8.2/go.mod h1:oe/vMfY3deqTw+1EZJhuvEW2iwGF1bW9wwu7XCu0+v0= +gonum.org/v1/gonum v0.15.0 h1:2lYxjRbTYyxkJxlhC+LvJIx3SsANPdRybu1tGj9/OrQ= +gonum.org/v1/gonum v0.15.0/go.mod h1:xzZVBJBtS+Mz4q0Yl2LJTk+OxOg4jiXZ7qBoM0uISGo= +gonum.org/v1/netlib v0.0.0-20190313105609-8cb42192e0e0/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw= +gonum.org/v1/plot v0.0.0-20190515093506-e2840ee46a6b/go.mod h1:Wt8AAjI+ypCyYX3nZBvf6cAIx93T+c/OS2HFAYskSZc= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda h1:LI5DOvAxUPMv/50agcLLoo+AdWc1irS9Rzz4vPuD1V4= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY= +google.golang.org/grpc v1.63.2 h1:MUeiw1B2maTVZthpU5xvASfTh3LDbxHd6IJ6QQVU+xM= +google.golang.org/grpc v1.63.2/go.mod h1:WAX/8DgncnokcFUldAxq7GeB5DXHDbMF+lLvDomNkRA= +google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= +google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= diff --git a/collector/testutil/testutil.go b/collector/testutil/testutil.go index 44fdcb68..a0e4202e 100644 --- a/collector/testutil/testutil.go +++ b/collector/testutil/testutil.go @@ -4,6 +4,7 @@ package testutil // import "github.com/open-telemetry/otel-arrow/collector/testutil" import ( + "encoding/binary" "net" "os/exec" "runtime" @@ -12,6 +13,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" ) type portpair struct { @@ -98,3 +100,18 @@ func createExclusionsList(exclusionsText string, t testing.TB) []portpair { } return exclusions } + +// UInt64ToTraceID is from collector-contrib/internal/idutils +func UInt64ToTraceID(high, low uint64) pcommon.TraceID { + traceID := [16]byte{} + binary.BigEndian.PutUint64(traceID[:8], high) + binary.BigEndian.PutUint64(traceID[8:], low) + return traceID +} + +// UInt64ToSpanID is from collector-contrib/internal/idutils +func UInt64ToSpanID(id uint64) pcommon.SpanID { + spanID := [8]byte{} + binary.BigEndian.PutUint64(spanID[:8], id) + return spanID +} diff --git a/go.mod b/go.mod index 01de7d7e..fa80a1f8 100644 --- a/go.mod +++ b/go.mod @@ -56,14 +56,3 @@ require ( google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) - - - - - - - - - - - diff --git a/go.work b/go.work index f010cb72..4e8294d4 100644 --- a/go.work +++ b/go.work @@ -1,6 +1,4 @@ -go 1.21 - -toolchain go1.21.4 +go 1.22.2 use ( . @@ -15,4 +13,5 @@ use ( ./collector/processor/obfuscationprocessor ./collector/receiver/filereceiver ./collector/receiver/otelarrowreceiver + ./collector/test )