diff --git a/collector/receiver/filereceiver/receiver.go b/collector/receiver/filereceiver/receiver.go index d8bfc7c3..03dbf6a5 100644 --- a/collector/receiver/filereceiver/receiver.go +++ b/collector/receiver/filereceiver/receiver.go @@ -48,7 +48,7 @@ func (r *fileReceiver) Start(ctx context.Context, _ component.Host) error { } if err != nil { if errors.Is(err, io.EOF) { - r.logger.Debug("EOF reached") + r.logger.Info("EOF reached") } else { r.logger.Error("failed to read input file", zap.Error(err)) } diff --git a/collector/receiver/otelarrowreceiver/otlp.go b/collector/receiver/otelarrowreceiver/otlp.go index 336b7631..7a1973ec 100644 --- a/collector/receiver/otelarrowreceiver/otlp.go +++ b/collector/receiver/otelarrowreceiver/otlp.go @@ -154,6 +154,9 @@ func (r *otlpReceiver) startProtocolServers(host component.Host) error { // in which case the default is selected in the arrowRecord package. opts = append(opts, arrowRecord.WithMemoryLimit(r.cfg.Arrow.MemoryLimit)) } + if r.settings.TelemetrySettings.MeterProvider != nil { + opts = append(opts, arrowRecord.WithMeterProvider(r.settings.TelemetrySettings.MeterProvider)) + } return arrowRecord.NewConsumer(opts...) }) diff --git a/go.mod b/go.mod index d02af166..a67519cd 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,8 @@ require ( github.com/stretchr/testify v1.8.4 github.com/zeebo/assert v1.3.0 go.opentelemetry.io/collector/pdata v1.0.0-rcv0014 + go.opentelemetry.io/otel v1.16.0 + go.opentelemetry.io/otel/metric v1.16.0 golang.org/x/exp v0.0.0-20230321023759-10a507213a29 google.golang.org/grpc v1.57.0 google.golang.org/protobuf v1.31.0 @@ -26,6 +28,8 @@ require ( github.com/apache/thrift v0.16.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-metro v0.0.0-20180109044635-280f6062b5bc // indirect + github.com/go-logr/logr v1.2.4 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/goccy/go-json v0.9.11 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.3 // indirect @@ -43,6 +47,7 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/x448/float16 v0.8.4 // indirect github.com/zeebo/xxh3 v1.0.2 // indirect + go.opentelemetry.io/otel/trace v1.16.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/mod v0.9.0 // indirect golang.org/x/net v0.14.0 // indirect diff --git a/go.sum b/go.sum index f7235a76..24e62283 100644 --- a/go.sum +++ b/go.sum @@ -26,6 +26,11 @@ github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/ github.com/fxamacker/cbor/v2 v2.4.0 h1:ri0ArlOR+5XunOP8CRUowT0pSJOwhW098ZCUyskZD88= github.com/fxamacker/cbor/v2 v2.4.0/go.mod h1:TA1xS00nchWmaBnEIxPSE5oHLuJBAVvqrtAnWBwBCVo= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= +github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/goccy/go-json v0.9.11 h1:/pAaQDLHEoCq/5FFmSKBswWmK6H0e8g4159Kc/X/nqk= github.com/goccy/go-json v0.9.11/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= @@ -99,6 +104,12 @@ github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= go.opentelemetry.io/collector/pdata v1.0.0-rcv0014 h1:iT5qH0NLmkGeIdDtnBogYDx7L58t6CaWGL378DEo2QY= go.opentelemetry.io/collector/pdata v1.0.0-rcv0014/go.mod h1:BRvDrx43kiSoUx3mr7SoA7h9B8+OY99mUK+CZSQFWW4= +go.opentelemetry.io/otel v1.16.0 h1:Z7GVAX/UkAXPKsy94IU+i6thsQS4nb7LviLpnaNeW8s= +go.opentelemetry.io/otel v1.16.0/go.mod h1:vl0h9NUa1D5s1nv3A5vZOYWn8av4K8Ml6JDeHrT/bx4= +go.opentelemetry.io/otel/metric v1.16.0 h1:RbrpwVG1Hfv85LgnZ7+txXioPDoh6EdbZHo26Q3hqOo= +go.opentelemetry.io/otel/metric v1.16.0/go.mod h1:QE47cpOmkwipPiefDwo2wDzwJrlfxxNYodqc4xnGCo4= +go.opentelemetry.io/otel/trace v1.16.0 h1:8JRpaObFoW0pxuVPapkgH8UhHQj+bJW8jJsCZEu5MQs= +go.opentelemetry.io/otel/trace v1.16.0/go.mod h1:Yt9vYq1SdNz3xdjZZK7wcXv1qv2pwLkqr2QVwea0ef0= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= diff --git a/pkg/otel/arrow_record/consumer.go b/pkg/otel/arrow_record/consumer.go index ca430a45..1f1057b4 100644 --- a/pkg/otel/arrow_record/consumer.go +++ b/pkg/otel/arrow_record/consumer.go @@ -16,13 +16,18 @@ package arrow_record import ( "bytes" + "context" "fmt" + "math/rand" "github.com/apache/arrow/go/v12/arrow/ipc" "github.com/apache/arrow/go/v12/arrow/memory" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" colarspb "github.com/open-telemetry/otel-arrow/api/experimental/arrow/v1" common "github.com/open-telemetry/otel-arrow/pkg/otel/common/arrow" @@ -57,15 +62,43 @@ var ErrConsumerMemoryLimit = fmt.Errorf( // Consumer is a BatchArrowRecords consumer. type Consumer struct { + // streamConsumers is a map of reader state by SchemaID. streamConsumers map[string]*streamConsumer - config Config + // Config embeds the configurable parameters. + Config + + // allocator is the one instrumented in calls to Consume, + // it is reused across multiple IPC readers. + allocator *common.LimitedAllocator + // lastInuseValue is the previously-captured value for + // allocator.Inuse(). This is used to work around a + // limitation in the OTel synchronous instrument API, which we + // are using because we reject the use of atomic operations + // for allocators used here, since there is otherwise no + // concurrency. See inuseChangeByCaller() and + // inuseChangeObserve(). + lastInuseValue uint64 + + // counts of the number of records consumed. + recordsCounter metric.Int64Counter + // counts of the number of schema resets by data type. + schemaResetCounter metric.Int64Counter + // tracks allocator.Inuse() + memoryCounter metric.Int64UpDownCounter + + // uniqueAttr is set to an 8-byte hex digit string with + // 32-bits of randomness, applied to all metric events. + uniqueAttr attribute.KeyValue } type Config struct { memLimit uint64 tracesConfig *arrow.Config + + // from component.TelemetrySettings + meterProvider metric.MeterProvider } // WithMemoryLimit configures the Arrow limited memory allocator. @@ -82,6 +115,14 @@ func WithTracesConfig(tcfg *arrow.Config) Option { } } +// WithMeterProvider configures an OTel metrics provider. If none is +// configured, the global meter provider will be used. +func WithMeterProvider(p metric.MeterProvider) Option { + return func(cfg *Config) { + cfg.meterProvider = p + } +} + type streamConsumer struct { bufReader *bytes.Reader ipcReader *ipc.Reader @@ -94,20 +135,86 @@ type Option func(*Config) // the corresponding OTLP representation (pmetric,Metrics, plog.Logs, ptrace.Traces). func NewConsumer(opts ...Option) *Consumer { cfg := Config{ - memLimit: defaultMemoryLimit, - tracesConfig: arrow.DefaultConfig(), + memLimit: defaultMemoryLimit, + tracesConfig: arrow.DefaultConfig(), + meterProvider: otel.GetMeterProvider(), } for _, opt := range opts { opt(&cfg) } + meter := cfg.meterProvider.Meter("otel-arrow/pkg/otel/arrow_record") + allocator := common.NewLimitedAllocator(memory.NewGoAllocator(), cfg.memLimit) return &Consumer{ - config: cfg, - streamConsumers: make(map[string]*streamConsumer), + Config: cfg, + allocator: allocator, + uniqueAttr: attribute.String("stream_unique", fmt.Sprintf("%08x", rand.Uint32())), + streamConsumers: make(map[string]*streamConsumer), + recordsCounter: mustWarn(meter.Int64Counter("arrow_batch_records")), + schemaResetCounter: mustWarn(meter.Int64Counter("arrow_schema_resets")), + memoryCounter: mustWarn(meter.Int64UpDownCounter("arrow_memory_inuse")), + } +} + +func releaseRecords(recs []*record_message.RecordMessage) { + for _, rec := range recs { + rec.Record().Release() + } +} + +func mustWarn[T any](t T, err error) T { + if err != nil { + // as it's an otel error, let someone else handle it + otel.Handle(err) + } + return t +} + +type placeholder struct{} + +// inuseChangeByCaller records the change in allocated memory, +// attributing change to the caller. The `placeholder` returned +// is to encourage this idiom at the top of any function that +// allocates memory in the library. +// +// defer c.inuseChangeObserve(c.inuseChangeByCaller()) +// +// Note: This is a sanity check. Currently there are no allocator +// operations in the caller, and the caller is not responsible +// for releasing memory. The count by where=caller should equal 0. +func (c *Consumer) inuseChangeByCaller() placeholder { + c.inuseChangeObserveWhere("caller") + return placeholder{} +} + +// inuseChangeObserve records the change in allocated memory, +// attributing change to the library. +// +// Note if we have a memory accounting leak, we expect it will +// show up as a count `arrow_memory_inuse{where=library}`. +func (c *Consumer) inuseChangeObserve(_ placeholder) { + c.inuseChangeObserveWhere("library") +} + +// inuseChangeObserveWhere records synchronous UpDownCounter events +// tracking changes in allocator state. If OTel had a synchronous +// cumulative updowncounter option, that would be easier to use. +func (c *Consumer) inuseChangeObserveWhere(where string) { + ctx := context.Background() + last := c.lastInuseValue + inuse := c.allocator.Inuse() + attrs := metric.WithAttributes(c.uniqueAttr, attribute.String("where", where)) + + c.memoryCounter.Add(ctx, int64(inuse-last), attrs) + c.lastInuseValue = inuse + if inuse != last { + fmt.Println("change by", where, "=", int64(inuse-last), "; current value", inuse) } } // MetricsFrom produces an array of [pmetric.Metrics] from a BatchArrowRecords message. func (c *Consumer) MetricsFrom(bar *colarspb.BatchArrowRecords) ([]pmetric.Metrics, error) { + defer c.inuseChangeObserve(c.inuseChangeByCaller()) + // extracts the records from the BatchArrowRecords message records, err := c.Consume(bar) if err != nil { @@ -139,6 +246,7 @@ func (c *Consumer) MetricsFrom(bar *colarspb.BatchArrowRecords) ([]pmetric.Metri // LogsFrom produces an array of [plog.Logs] from a BatchArrowRecords message. func (c *Consumer) LogsFrom(bar *colarspb.BatchArrowRecords) ([]plog.Logs, error) { + defer c.inuseChangeObserve(c.inuseChangeByCaller()) records, err := c.Consume(bar) if err != nil { return nil, werror.Wrap(err) @@ -164,6 +272,7 @@ func (c *Consumer) LogsFrom(bar *colarspb.BatchArrowRecords) ([]plog.Logs, error // TracesFrom produces an array of [ptrace.Traces] from a BatchArrowRecords message. func (c *Consumer) TracesFrom(bar *colarspb.BatchArrowRecords) ([]ptrace.Traces, error) { + defer c.inuseChangeObserve(c.inuseChangeByCaller()) records, err := c.Consume(bar) if err != nil { return nil, werror.Wrap(err) @@ -172,7 +281,7 @@ func (c *Consumer) TracesFrom(bar *colarspb.BatchArrowRecords) ([]ptrace.Traces, result := make([]ptrace.Traces, 0, len(records)) // Compute all related records (i.e. Attributes, Events, and Links) - relatedData, tracesRecord, err := tracesotlp.RelatedDataFrom(records, c.config.tracesConfig) + relatedData, tracesRecord, err := tracesotlp.RelatedDataFrom(records, c.tracesConfig) if tracesRecord != nil { // Decode OTLP traces from the combination of the main record and the @@ -190,7 +299,12 @@ func (c *Consumer) TracesFrom(bar *colarspb.BatchArrowRecords) ([]ptrace.Traces, // Consume takes a BatchArrowRecords protobuf message and returns an array of RecordMessage. // Note: the records wrapped in the RecordMessage must be released after use by the caller. func (c *Consumer) Consume(bar *colarspb.BatchArrowRecords) ([]*record_message.RecordMessage, error) { + ctx := context.Background() + var ibes []*record_message.RecordMessage + defer func() { + c.recordsCounter.Add(ctx, int64(len(ibes)), metric.WithAttributes(c.uniqueAttr)) + }() // Transform each individual OtlpArrowPayload into RecordMessage for _, payload := range bar.ArrowPayloads { @@ -221,13 +335,15 @@ func (c *Consumer) Consume(bar *colarspb.BatchArrowRecords) ([]*record_message.R sc.bufReader.Reset(payload.Record) if sc.ipcReader == nil { + c.schemaResetCounter.Add(ctx, 1, metric.WithAttributes(attribute.String("payload_type", payload.Type.String()), c.uniqueAttr)) ipcReader, err := ipc.NewReader( sc.bufReader, - ipc.WithAllocator(common.NewLimitedAllocator(memory.NewGoAllocator(), c.config.memLimit)), + ipc.WithAllocator(c.allocator), ipc.WithDictionaryDeltas(true), ipc.WithZstd(), ) if err != nil { + releaseRecords(ibes) return nil, werror.Wrap(err) } sc.ipcReader = ipcReader @@ -244,7 +360,8 @@ func (c *Consumer) Consume(bar *colarspb.BatchArrowRecords) ([]*record_message.R } if len(ibes) < len(bar.ArrowPayloads) { - return ibes, ErrConsumerMemoryLimit + releaseRecords(ibes) + return nil, ErrConsumerMemoryLimit } return ibes, nil diff --git a/pkg/otel/common/arrow/allocator.go b/pkg/otel/common/arrow/allocator.go index bff3bafd..63994118 100644 --- a/pkg/otel/common/arrow/allocator.go +++ b/pkg/otel/common/arrow/allocator.go @@ -53,6 +53,10 @@ func (_ LimitError) Is(tgt error) bool { return ok } +func (l *LimitedAllocator) Inuse() uint64 { + return l.inuse +} + func (l *LimitedAllocator) Allocate(size int) []byte { change := uint64(size) if l.inuse+change > l.limit {