Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Addition of internal consumer metrics #52

Merged
merged 3 commits into from
Sep 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion collector/receiver/filereceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (r *fileReceiver) Start(ctx context.Context, _ component.Host) error {
}
if err != nil {
if errors.Is(err, io.EOF) {
r.logger.Debug("EOF reached")
r.logger.Info("EOF reached")
} else {
r.logger.Error("failed to read input file", zap.Error(err))
}
Expand Down
3 changes: 3 additions & 0 deletions collector/receiver/otelarrowreceiver/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,9 @@ func (r *otlpReceiver) startProtocolServers(host component.Host) error {
// in which case the default is selected in the arrowRecord package.
opts = append(opts, arrowRecord.WithMemoryLimit(r.cfg.Arrow.MemoryLimit))
}
if r.settings.TelemetrySettings.MeterProvider != nil {
opts = append(opts, arrowRecord.WithMeterProvider(r.settings.TelemetrySettings.MeterProvider))
}
return arrowRecord.NewConsumer(opts...)
})

Expand Down
5 changes: 5 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ require (
github.com/stretchr/testify v1.8.4
github.com/zeebo/assert v1.3.0
go.opentelemetry.io/collector/pdata v1.0.0-rcv0014
go.opentelemetry.io/otel v1.16.0
go.opentelemetry.io/otel/metric v1.16.0
golang.org/x/exp v0.0.0-20230321023759-10a507213a29
google.golang.org/grpc v1.57.0
google.golang.org/protobuf v1.31.0
Expand All @@ -26,6 +28,8 @@ require (
github.com/apache/thrift v0.16.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-metro v0.0.0-20180109044635-280f6062b5bc // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/goccy/go-json v0.9.11 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
Expand All @@ -43,6 +47,7 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/x448/float16 v0.8.4 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
go.opentelemetry.io/otel/trace v1.16.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/mod v0.9.0 // indirect
golang.org/x/net v0.14.0 // indirect
Expand Down
11 changes: 11 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/
github.com/fxamacker/cbor/v2 v2.4.0 h1:ri0ArlOR+5XunOP8CRUowT0pSJOwhW098ZCUyskZD88=
github.com/fxamacker/cbor/v2 v2.4.0/go.mod h1:TA1xS00nchWmaBnEIxPSE5oHLuJBAVvqrtAnWBwBCVo=
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ=
github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/goccy/go-json v0.9.11 h1:/pAaQDLHEoCq/5FFmSKBswWmK6H0e8g4159Kc/X/nqk=
github.com/goccy/go-json v0.9.11/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
Expand Down Expand Up @@ -99,6 +104,12 @@ github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0=
github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA=
go.opentelemetry.io/collector/pdata v1.0.0-rcv0014 h1:iT5qH0NLmkGeIdDtnBogYDx7L58t6CaWGL378DEo2QY=
go.opentelemetry.io/collector/pdata v1.0.0-rcv0014/go.mod h1:BRvDrx43kiSoUx3mr7SoA7h9B8+OY99mUK+CZSQFWW4=
go.opentelemetry.io/otel v1.16.0 h1:Z7GVAX/UkAXPKsy94IU+i6thsQS4nb7LviLpnaNeW8s=
go.opentelemetry.io/otel v1.16.0/go.mod h1:vl0h9NUa1D5s1nv3A5vZOYWn8av4K8Ml6JDeHrT/bx4=
go.opentelemetry.io/otel/metric v1.16.0 h1:RbrpwVG1Hfv85LgnZ7+txXioPDoh6EdbZHo26Q3hqOo=
go.opentelemetry.io/otel/metric v1.16.0/go.mod h1:QE47cpOmkwipPiefDwo2wDzwJrlfxxNYodqc4xnGCo4=
go.opentelemetry.io/otel/trace v1.16.0 h1:8JRpaObFoW0pxuVPapkgH8UhHQj+bJW8jJsCZEu5MQs=
go.opentelemetry.io/otel/trace v1.16.0/go.mod h1:Yt9vYq1SdNz3xdjZZK7wcXv1qv2pwLkqr2QVwea0ef0=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
Expand Down
133 changes: 125 additions & 8 deletions pkg/otel/arrow_record/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,18 @@ package arrow_record

import (
"bytes"
"context"
"fmt"
"math/rand"

"github.com/apache/arrow/go/v12/arrow/ipc"
"github.com/apache/arrow/go/v12/arrow/memory"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

colarspb "github.com/open-telemetry/otel-arrow/api/experimental/arrow/v1"
common "github.com/open-telemetry/otel-arrow/pkg/otel/common/arrow"
Expand Down Expand Up @@ -57,15 +62,43 @@ var ErrConsumerMemoryLimit = fmt.Errorf(

// Consumer is a BatchArrowRecords consumer.
type Consumer struct {
// streamConsumers is a map of reader state by SchemaID.
streamConsumers map[string]*streamConsumer

config Config
// Config embeds the configurable parameters.
Config

// allocator is the one instrumented in calls to Consume,
// it is reused across multiple IPC readers.
allocator *common.LimitedAllocator
// lastInuseValue is the previously-captured value for
// allocator.Inuse(). This is used to work around a
// limitation in the OTel synchronous instrument API, which we
// are using because we reject the use of atomic operations
// for allocators used here, since there is otherwise no
// concurrency. See inuseChangeByCaller() and
// inuseChangeObserve().
lastInuseValue uint64

// counts of the number of records consumed.
recordsCounter metric.Int64Counter
// counts of the number of schema resets by data type.
schemaResetCounter metric.Int64Counter
// tracks allocator.Inuse()
memoryCounter metric.Int64UpDownCounter

// uniqueAttr is set to an 8-byte hex digit string with
// 32-bits of randomness, applied to all metric events.
uniqueAttr attribute.KeyValue
}

type Config struct {
memLimit uint64

tracesConfig *arrow.Config

// from component.TelemetrySettings
meterProvider metric.MeterProvider
}

// WithMemoryLimit configures the Arrow limited memory allocator.
Expand All @@ -82,6 +115,14 @@ func WithTracesConfig(tcfg *arrow.Config) Option {
}
}

// WithMeterProvider configures an OTel metrics provider. If none is
// configured, the global meter provider will be used.
func WithMeterProvider(p metric.MeterProvider) Option {
return func(cfg *Config) {
cfg.meterProvider = p
}
}

type streamConsumer struct {
bufReader *bytes.Reader
ipcReader *ipc.Reader
Expand All @@ -94,20 +135,86 @@ type Option func(*Config)
// the corresponding OTLP representation (pmetric,Metrics, plog.Logs, ptrace.Traces).
func NewConsumer(opts ...Option) *Consumer {
cfg := Config{
memLimit: defaultMemoryLimit,
tracesConfig: arrow.DefaultConfig(),
memLimit: defaultMemoryLimit,
tracesConfig: arrow.DefaultConfig(),
meterProvider: otel.GetMeterProvider(),
}
for _, opt := range opts {
opt(&cfg)
}
meter := cfg.meterProvider.Meter("otel-arrow/pkg/otel/arrow_record")
allocator := common.NewLimitedAllocator(memory.NewGoAllocator(), cfg.memLimit)
return &Consumer{
config: cfg,
streamConsumers: make(map[string]*streamConsumer),
Config: cfg,
allocator: allocator,
uniqueAttr: attribute.String("stream_unique", fmt.Sprintf("%08x", rand.Uint32())),
streamConsumers: make(map[string]*streamConsumer),
recordsCounter: mustWarn(meter.Int64Counter("arrow_batch_records")),
schemaResetCounter: mustWarn(meter.Int64Counter("arrow_schema_resets")),
memoryCounter: mustWarn(meter.Int64UpDownCounter("arrow_memory_inuse")),
}
}

func releaseRecords(recs []*record_message.RecordMessage) {
for _, rec := range recs {
rec.Record().Release()
}
}

func mustWarn[T any](t T, err error) T {
if err != nil {
// as it's an otel error, let someone else handle it
otel.Handle(err)
}
return t
}

type placeholder struct{}

// inuseChangeByCaller records the change in allocated memory,
// attributing change to the caller. The `placeholder` returned
// is to encourage this idiom at the top of any function that
// allocates memory in the library.
//
// defer c.inuseChangeObserve(c.inuseChangeByCaller())
//
// Note: This is a sanity check. Currently there are no allocator
// operations in the caller, and the caller is not responsible
// for releasing memory. The count by where=caller should equal 0.
func (c *Consumer) inuseChangeByCaller() placeholder {
c.inuseChangeObserveWhere("caller")
return placeholder{}
}

// inuseChangeObserve records the change in allocated memory,
// attributing change to the library.
//
// Note if we have a memory accounting leak, we expect it will
// show up as a count `arrow_memory_inuse{where=library}`.
func (c *Consumer) inuseChangeObserve(_ placeholder) {
c.inuseChangeObserveWhere("library")
}

// inuseChangeObserveWhere records synchronous UpDownCounter events
// tracking changes in allocator state. If OTel had a synchronous
// cumulative updowncounter option, that would be easier to use.
func (c *Consumer) inuseChangeObserveWhere(where string) {
ctx := context.Background()
last := c.lastInuseValue
inuse := c.allocator.Inuse()
attrs := metric.WithAttributes(c.uniqueAttr, attribute.String("where", where))

c.memoryCounter.Add(ctx, int64(inuse-last), attrs)
c.lastInuseValue = inuse
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this function thread safe? i.e. will setting c.lastInuseValue here have a risk of overwriting/being overwritten or are all consumers isolated?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The consumers are isolated. During an earlier discussion @lquerel asked me to avoid using asynchronous instruments, which would have required additional concurrency, so presently we have no atomic variables and no concurrent use. That's why we're wrangling a synchronous instrument to do what is ordinarily done through an observer.

if inuse != last {
fmt.Println("change by", where, "=", int64(inuse-last), "; current value", inuse)
}
}

// MetricsFrom produces an array of [pmetric.Metrics] from a BatchArrowRecords message.
func (c *Consumer) MetricsFrom(bar *colarspb.BatchArrowRecords) ([]pmetric.Metrics, error) {
defer c.inuseChangeObserve(c.inuseChangeByCaller())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

basic question: what is the purpose of nesting these calls? i.e. what is the reason for returning placeholder{} and what is the difference between this and function signatures that allow sequential defers like

defer c.unuseChangeObserve()
defer c.inuseChangeByCaller()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See https://go.dev/doc/effective_go#defer for some detail.

The arguments to the deferred function (which include the receiver if the function is a method) are evaluated when the defer executes, not when the call executes.

This idiom allows me to use 1 line. The equivalent would be:

c.inuseChangeByCaller()
defer c.inuseChangeObserve()

if after learning about defer's evaluation order you think that's more readable, I'll change it!


// extracts the records from the BatchArrowRecords message
records, err := c.Consume(bar)
if err != nil {
Expand Down Expand Up @@ -139,6 +246,7 @@ func (c *Consumer) MetricsFrom(bar *colarspb.BatchArrowRecords) ([]pmetric.Metri

// LogsFrom produces an array of [plog.Logs] from a BatchArrowRecords message.
func (c *Consumer) LogsFrom(bar *colarspb.BatchArrowRecords) ([]plog.Logs, error) {
defer c.inuseChangeObserve(c.inuseChangeByCaller())
records, err := c.Consume(bar)
if err != nil {
return nil, werror.Wrap(err)
Expand All @@ -164,6 +272,7 @@ func (c *Consumer) LogsFrom(bar *colarspb.BatchArrowRecords) ([]plog.Logs, error

// TracesFrom produces an array of [ptrace.Traces] from a BatchArrowRecords message.
func (c *Consumer) TracesFrom(bar *colarspb.BatchArrowRecords) ([]ptrace.Traces, error) {
defer c.inuseChangeObserve(c.inuseChangeByCaller())
records, err := c.Consume(bar)
if err != nil {
return nil, werror.Wrap(err)
Expand All @@ -172,7 +281,7 @@ func (c *Consumer) TracesFrom(bar *colarspb.BatchArrowRecords) ([]ptrace.Traces,
result := make([]ptrace.Traces, 0, len(records))

// Compute all related records (i.e. Attributes, Events, and Links)
relatedData, tracesRecord, err := tracesotlp.RelatedDataFrom(records, c.config.tracesConfig)
relatedData, tracesRecord, err := tracesotlp.RelatedDataFrom(records, c.tracesConfig)

if tracesRecord != nil {
// Decode OTLP traces from the combination of the main record and the
Expand All @@ -190,7 +299,12 @@ func (c *Consumer) TracesFrom(bar *colarspb.BatchArrowRecords) ([]ptrace.Traces,
// Consume takes a BatchArrowRecords protobuf message and returns an array of RecordMessage.
// Note: the records wrapped in the RecordMessage must be released after use by the caller.
func (c *Consumer) Consume(bar *colarspb.BatchArrowRecords) ([]*record_message.RecordMessage, error) {
ctx := context.Background()

var ibes []*record_message.RecordMessage
defer func() {
c.recordsCounter.Add(ctx, int64(len(ibes)), metric.WithAttributes(c.uniqueAttr))
}()

// Transform each individual OtlpArrowPayload into RecordMessage
for _, payload := range bar.ArrowPayloads {
Expand Down Expand Up @@ -221,13 +335,15 @@ func (c *Consumer) Consume(bar *colarspb.BatchArrowRecords) ([]*record_message.R

sc.bufReader.Reset(payload.Record)
if sc.ipcReader == nil {
c.schemaResetCounter.Add(ctx, 1, metric.WithAttributes(attribute.String("payload_type", payload.Type.String()), c.uniqueAttr))
ipcReader, err := ipc.NewReader(
sc.bufReader,
ipc.WithAllocator(common.NewLimitedAllocator(memory.NewGoAllocator(), c.config.memLimit)),
ipc.WithAllocator(c.allocator),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not strictly equivalent to the previous code. With this modification we have a single allocator for all the payloads instead of one allocator per payload type. It's probably ok but that could explain why we observe a different behavior with the previous approach.

ipc.WithDictionaryDeltas(true),
ipc.WithZstd(),
)
if err != nil {
releaseRecords(ibes)
return nil, werror.Wrap(err)
}
sc.ipcReader = ipcReader
Expand All @@ -244,7 +360,8 @@ func (c *Consumer) Consume(bar *colarspb.BatchArrowRecords) ([]*record_message.R
}

if len(ibes) < len(bar.ArrowPayloads) {
return ibes, ErrConsumerMemoryLimit
releaseRecords(ibes)
return nil, ErrConsumerMemoryLimit
}

return ibes, nil
Expand Down
4 changes: 4 additions & 0 deletions pkg/otel/common/arrow/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ func (_ LimitError) Is(tgt error) bool {
return ok
}

func (l *LimitedAllocator) Inuse() uint64 {
return l.inuse
}

func (l *LimitedAllocator) Allocate(size int) []byte {
change := uint64(size)
if l.inuse+change > l.limit {
Expand Down