Skip to content

Commit

Permalink
Addition of internal consumer metrics (#52)
Browse files Browse the repository at this point in the history
Adds three OTel metric instruments with instrumentation on:
- records consumed
- schema resets
- memory in use.

Note that I have not added tests, which would delay us a lot for little
benefit. There are existing debug-level assertions that cover the memory
accounting. To test this experimentally, I used a recorded data file and
a configuration like:

```
receivers:
  file/input:
    path: "${env:DIR}/data.json.zstd"
    format: json
    compression: zstd
    throttle: 0
  otelarrow/loopback:
    protocols:
      grpc:
        endpoint: 127.0.0.1:8082
        max_recv_msg_size_mib: 24
        
exporters:
  otelarrow/forward:
    endpoint: 127.0.0.1:8082
    wait_for_ready: true
    arrow:
      disabled: false
      num_streams: 1
      disable_downgrade: true
    tls:
      insecure: true
    retry_on_failure:
      enabled: false
    sending_queue:
      enabled: false
      num_consumers: 1
    timeout: 30s
  logging:
  file/output:
    path: "${env:DIR}/output.json.zstd"
    format: json
    compression: zstd

extensions:
  pprof:

processors:
  batch:
    send_batch_size: 1000
    send_batch_max_size: 1100
    timeout: 10s

service:
  extensions: [pprof]
  pipelines:
    traces/first:
      receivers: [file/input]
      processors: [batch]
      exporters: [otelarrow/forward]
    traces/second:
      receivers: [otelarrow/loopback]
      processors: []
      exporters: [logging, file/output]
      
  telemetry:
    resource:
      "service.name": "data-replayer"
    metrics:
      address: 127.0.0.1:8889
      level: detailed
    logs:
      level: debug
```

Then I see metrics (via Prometheus) at :8889/metrics, like:

```
# HELP otelcol_arrow_batch_records_total 
# TYPE otelcol_arrow_batch_records_total counter
otelcol_arrow_batch_records_total{stream_unique="4797068c"} 52445
# HELP otelcol_arrow_memory_inuse 
# TYPE otelcol_arrow_memory_inuse gauge
otelcol_arrow_memory_inuse{stream_unique="4797068c",where="library"} 5.6499008e+07
otelcol_arrow_memory_inuse{stream_unique="4797068c",where="user"} 0
# HELP otelcol_arrow_schema_resets_total 
# TYPE otelcol_arrow_schema_resets_total counter
otelcol_arrow_schema_resets_total{payload_type="RESOURCE_ATTRS",stream_unique="4797068c"} 1
otelcol_arrow_schema_resets_total{payload_type="SPANS",stream_unique="4797068c"} 7
otelcol_arrow_schema_resets_total{payload_type="SPAN_ATTRS",stream_unique="4797068c"} 4
otelcol_arrow_schema_resets_total{payload_type="SPAN_EVENTS",stream_unique="4797068c"} 3
otelcol_arrow_schema_resets_total{payload_type="SPAN_EVENT_ATTRS",stream_unique="4797068c"} 5
otelcol_arrow_schema_resets_total{payload_type="SPAN_LINKS",stream_unique="4797068c"} 3
otelcol_arrow_schema_resets_total{payload_type="SPAN_LINK_ATTRS",stream_unique="4797068c"} 1
```

Note that memory inuse appears to rise slowly.

---------

Co-authored-by: Laurent Quérel <[email protected]>
  • Loading branch information
jmacd and lquerel authored Sep 27, 2023
1 parent d0a3db0 commit 8df0a8f
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 9 deletions.
2 changes: 1 addition & 1 deletion collector/receiver/filereceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (r *fileReceiver) Start(ctx context.Context, _ component.Host) error {
}
if err != nil {
if errors.Is(err, io.EOF) {
r.logger.Debug("EOF reached")
r.logger.Info("EOF reached")
} else {
r.logger.Error("failed to read input file", zap.Error(err))
}
Expand Down
3 changes: 3 additions & 0 deletions collector/receiver/otelarrowreceiver/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,9 @@ func (r *otlpReceiver) startProtocolServers(host component.Host) error {
// in which case the default is selected in the arrowRecord package.
opts = append(opts, arrowRecord.WithMemoryLimit(r.cfg.Arrow.MemoryLimit))
}
if r.settings.TelemetrySettings.MeterProvider != nil {
opts = append(opts, arrowRecord.WithMeterProvider(r.settings.TelemetrySettings.MeterProvider))
}
return arrowRecord.NewConsumer(opts...)
})

Expand Down
5 changes: 5 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ require (
github.com/stretchr/testify v1.8.4
github.com/zeebo/assert v1.3.0
go.opentelemetry.io/collector/pdata v1.0.0-rcv0014
go.opentelemetry.io/otel v1.16.0
go.opentelemetry.io/otel/metric v1.16.0
golang.org/x/exp v0.0.0-20230321023759-10a507213a29
google.golang.org/grpc v1.57.0
google.golang.org/protobuf v1.31.0
Expand All @@ -26,6 +28,8 @@ require (
github.com/apache/thrift v0.16.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-metro v0.0.0-20180109044635-280f6062b5bc // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/goccy/go-json v0.9.11 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
Expand All @@ -43,6 +47,7 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/x448/float16 v0.8.4 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
go.opentelemetry.io/otel/trace v1.16.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/mod v0.9.0 // indirect
golang.org/x/net v0.14.0 // indirect
Expand Down
11 changes: 11 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/
github.com/fxamacker/cbor/v2 v2.4.0 h1:ri0ArlOR+5XunOP8CRUowT0pSJOwhW098ZCUyskZD88=
github.com/fxamacker/cbor/v2 v2.4.0/go.mod h1:TA1xS00nchWmaBnEIxPSE5oHLuJBAVvqrtAnWBwBCVo=
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ=
github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/goccy/go-json v0.9.11 h1:/pAaQDLHEoCq/5FFmSKBswWmK6H0e8g4159Kc/X/nqk=
github.com/goccy/go-json v0.9.11/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
Expand Down Expand Up @@ -99,6 +104,12 @@ github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0=
github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA=
go.opentelemetry.io/collector/pdata v1.0.0-rcv0014 h1:iT5qH0NLmkGeIdDtnBogYDx7L58t6CaWGL378DEo2QY=
go.opentelemetry.io/collector/pdata v1.0.0-rcv0014/go.mod h1:BRvDrx43kiSoUx3mr7SoA7h9B8+OY99mUK+CZSQFWW4=
go.opentelemetry.io/otel v1.16.0 h1:Z7GVAX/UkAXPKsy94IU+i6thsQS4nb7LviLpnaNeW8s=
go.opentelemetry.io/otel v1.16.0/go.mod h1:vl0h9NUa1D5s1nv3A5vZOYWn8av4K8Ml6JDeHrT/bx4=
go.opentelemetry.io/otel/metric v1.16.0 h1:RbrpwVG1Hfv85LgnZ7+txXioPDoh6EdbZHo26Q3hqOo=
go.opentelemetry.io/otel/metric v1.16.0/go.mod h1:QE47cpOmkwipPiefDwo2wDzwJrlfxxNYodqc4xnGCo4=
go.opentelemetry.io/otel/trace v1.16.0 h1:8JRpaObFoW0pxuVPapkgH8UhHQj+bJW8jJsCZEu5MQs=
go.opentelemetry.io/otel/trace v1.16.0/go.mod h1:Yt9vYq1SdNz3xdjZZK7wcXv1qv2pwLkqr2QVwea0ef0=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
Expand Down
133 changes: 125 additions & 8 deletions pkg/otel/arrow_record/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,18 @@ package arrow_record

import (
"bytes"
"context"
"fmt"
"math/rand"

"github.com/apache/arrow/go/v12/arrow/ipc"
"github.com/apache/arrow/go/v12/arrow/memory"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

colarspb "github.com/open-telemetry/otel-arrow/api/experimental/arrow/v1"
common "github.com/open-telemetry/otel-arrow/pkg/otel/common/arrow"
Expand Down Expand Up @@ -57,15 +62,43 @@ var ErrConsumerMemoryLimit = fmt.Errorf(

// Consumer is a BatchArrowRecords consumer.
type Consumer struct {
// streamConsumers is a map of reader state by SchemaID.
streamConsumers map[string]*streamConsumer

config Config
// Config embeds the configurable parameters.
Config

// allocator is the one instrumented in calls to Consume,
// it is reused across multiple IPC readers.
allocator *common.LimitedAllocator
// lastInuseValue is the previously-captured value for
// allocator.Inuse(). This is used to work around a
// limitation in the OTel synchronous instrument API, which we
// are using because we reject the use of atomic operations
// for allocators used here, since there is otherwise no
// concurrency. See inuseChangeByCaller() and
// inuseChangeObserve().
lastInuseValue uint64

// counts of the number of records consumed.
recordsCounter metric.Int64Counter
// counts of the number of schema resets by data type.
schemaResetCounter metric.Int64Counter
// tracks allocator.Inuse()
memoryCounter metric.Int64UpDownCounter

// uniqueAttr is set to an 8-byte hex digit string with
// 32-bits of randomness, applied to all metric events.
uniqueAttr attribute.KeyValue
}

type Config struct {
memLimit uint64

tracesConfig *arrow.Config

// from component.TelemetrySettings
meterProvider metric.MeterProvider
}

// WithMemoryLimit configures the Arrow limited memory allocator.
Expand All @@ -82,6 +115,14 @@ func WithTracesConfig(tcfg *arrow.Config) Option {
}
}

// WithMeterProvider configures an OTel metrics provider. If none is
// configured, the global meter provider will be used.
func WithMeterProvider(p metric.MeterProvider) Option {
return func(cfg *Config) {
cfg.meterProvider = p
}
}

type streamConsumer struct {
bufReader *bytes.Reader
ipcReader *ipc.Reader
Expand All @@ -94,20 +135,86 @@ type Option func(*Config)
// the corresponding OTLP representation (pmetric,Metrics, plog.Logs, ptrace.Traces).
func NewConsumer(opts ...Option) *Consumer {
cfg := Config{
memLimit: defaultMemoryLimit,
tracesConfig: arrow.DefaultConfig(),
memLimit: defaultMemoryLimit,
tracesConfig: arrow.DefaultConfig(),
meterProvider: otel.GetMeterProvider(),
}
for _, opt := range opts {
opt(&cfg)
}
meter := cfg.meterProvider.Meter("otel-arrow/pkg/otel/arrow_record")
allocator := common.NewLimitedAllocator(memory.NewGoAllocator(), cfg.memLimit)
return &Consumer{
config: cfg,
streamConsumers: make(map[string]*streamConsumer),
Config: cfg,
allocator: allocator,
uniqueAttr: attribute.String("stream_unique", fmt.Sprintf("%08x", rand.Uint32())),
streamConsumers: make(map[string]*streamConsumer),
recordsCounter: mustWarn(meter.Int64Counter("arrow_batch_records")),
schemaResetCounter: mustWarn(meter.Int64Counter("arrow_schema_resets")),
memoryCounter: mustWarn(meter.Int64UpDownCounter("arrow_memory_inuse")),
}
}

func releaseRecords(recs []*record_message.RecordMessage) {
for _, rec := range recs {
rec.Record().Release()
}
}

func mustWarn[T any](t T, err error) T {
if err != nil {
// as it's an otel error, let someone else handle it
otel.Handle(err)
}
return t
}

type placeholder struct{}

// inuseChangeByCaller records the change in allocated memory,
// attributing change to the caller. The `placeholder` returned
// is to encourage this idiom at the top of any function that
// allocates memory in the library.
//
// defer c.inuseChangeObserve(c.inuseChangeByCaller())
//
// Note: This is a sanity check. Currently there are no allocator
// operations in the caller, and the caller is not responsible
// for releasing memory. The count by where=caller should equal 0.
func (c *Consumer) inuseChangeByCaller() placeholder {
c.inuseChangeObserveWhere("caller")
return placeholder{}
}

// inuseChangeObserve records the change in allocated memory,
// attributing change to the library.
//
// Note if we have a memory accounting leak, we expect it will
// show up as a count `arrow_memory_inuse{where=library}`.
func (c *Consumer) inuseChangeObserve(_ placeholder) {
c.inuseChangeObserveWhere("library")
}

// inuseChangeObserveWhere records synchronous UpDownCounter events
// tracking changes in allocator state. If OTel had a synchronous
// cumulative updowncounter option, that would be easier to use.
func (c *Consumer) inuseChangeObserveWhere(where string) {
ctx := context.Background()
last := c.lastInuseValue
inuse := c.allocator.Inuse()
attrs := metric.WithAttributes(c.uniqueAttr, attribute.String("where", where))

c.memoryCounter.Add(ctx, int64(inuse-last), attrs)
c.lastInuseValue = inuse
if inuse != last {
fmt.Println("change by", where, "=", int64(inuse-last), "; current value", inuse)
}
}

// MetricsFrom produces an array of [pmetric.Metrics] from a BatchArrowRecords message.
func (c *Consumer) MetricsFrom(bar *colarspb.BatchArrowRecords) ([]pmetric.Metrics, error) {
defer c.inuseChangeObserve(c.inuseChangeByCaller())

// extracts the records from the BatchArrowRecords message
records, err := c.Consume(bar)
if err != nil {
Expand Down Expand Up @@ -139,6 +246,7 @@ func (c *Consumer) MetricsFrom(bar *colarspb.BatchArrowRecords) ([]pmetric.Metri

// LogsFrom produces an array of [plog.Logs] from a BatchArrowRecords message.
func (c *Consumer) LogsFrom(bar *colarspb.BatchArrowRecords) ([]plog.Logs, error) {
defer c.inuseChangeObserve(c.inuseChangeByCaller())
records, err := c.Consume(bar)
if err != nil {
return nil, werror.Wrap(err)
Expand All @@ -164,6 +272,7 @@ func (c *Consumer) LogsFrom(bar *colarspb.BatchArrowRecords) ([]plog.Logs, error

// TracesFrom produces an array of [ptrace.Traces] from a BatchArrowRecords message.
func (c *Consumer) TracesFrom(bar *colarspb.BatchArrowRecords) ([]ptrace.Traces, error) {
defer c.inuseChangeObserve(c.inuseChangeByCaller())
records, err := c.Consume(bar)
if err != nil {
return nil, werror.Wrap(err)
Expand All @@ -172,7 +281,7 @@ func (c *Consumer) TracesFrom(bar *colarspb.BatchArrowRecords) ([]ptrace.Traces,
result := make([]ptrace.Traces, 0, len(records))

// Compute all related records (i.e. Attributes, Events, and Links)
relatedData, tracesRecord, err := tracesotlp.RelatedDataFrom(records, c.config.tracesConfig)
relatedData, tracesRecord, err := tracesotlp.RelatedDataFrom(records, c.tracesConfig)

if tracesRecord != nil {
// Decode OTLP traces from the combination of the main record and the
Expand All @@ -190,7 +299,12 @@ func (c *Consumer) TracesFrom(bar *colarspb.BatchArrowRecords) ([]ptrace.Traces,
// Consume takes a BatchArrowRecords protobuf message and returns an array of RecordMessage.
// Note: the records wrapped in the RecordMessage must be released after use by the caller.
func (c *Consumer) Consume(bar *colarspb.BatchArrowRecords) ([]*record_message.RecordMessage, error) {
ctx := context.Background()

var ibes []*record_message.RecordMessage
defer func() {
c.recordsCounter.Add(ctx, int64(len(ibes)), metric.WithAttributes(c.uniqueAttr))
}()

// Transform each individual OtlpArrowPayload into RecordMessage
for _, payload := range bar.ArrowPayloads {
Expand Down Expand Up @@ -221,13 +335,15 @@ func (c *Consumer) Consume(bar *colarspb.BatchArrowRecords) ([]*record_message.R

sc.bufReader.Reset(payload.Record)
if sc.ipcReader == nil {
c.schemaResetCounter.Add(ctx, 1, metric.WithAttributes(attribute.String("payload_type", payload.Type.String()), c.uniqueAttr))
ipcReader, err := ipc.NewReader(
sc.bufReader,
ipc.WithAllocator(common.NewLimitedAllocator(memory.NewGoAllocator(), c.config.memLimit)),
ipc.WithAllocator(c.allocator),
ipc.WithDictionaryDeltas(true),
ipc.WithZstd(),
)
if err != nil {
releaseRecords(ibes)
return nil, werror.Wrap(err)
}
sc.ipcReader = ipcReader
Expand All @@ -244,7 +360,8 @@ func (c *Consumer) Consume(bar *colarspb.BatchArrowRecords) ([]*record_message.R
}

if len(ibes) < len(bar.ArrowPayloads) {
return ibes, ErrConsumerMemoryLimit
releaseRecords(ibes)
return nil, ErrConsumerMemoryLimit
}

return ibes, nil
Expand Down
4 changes: 4 additions & 0 deletions pkg/otel/common/arrow/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ func (_ LimitError) Is(tgt error) bool {
return ok
}

func (l *LimitedAllocator) Inuse() uint64 {
return l.inuse
}

func (l *LimitedAllocator) Allocate(size int) []byte {
change := uint64(size)
if l.inuse+change > l.limit {
Expand Down

0 comments on commit 8df0a8f

Please sign in to comment.