-
Notifications
You must be signed in to change notification settings - Fork 16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Addition of internal consumer metrics #52
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,13 +16,18 @@ package arrow_record | |
|
||
import ( | ||
"bytes" | ||
"context" | ||
"fmt" | ||
"math/rand" | ||
|
||
"github.com/apache/arrow/go/v12/arrow/ipc" | ||
"github.com/apache/arrow/go/v12/arrow/memory" | ||
"go.opentelemetry.io/collector/pdata/plog" | ||
"go.opentelemetry.io/collector/pdata/pmetric" | ||
"go.opentelemetry.io/collector/pdata/ptrace" | ||
"go.opentelemetry.io/otel" | ||
"go.opentelemetry.io/otel/attribute" | ||
"go.opentelemetry.io/otel/metric" | ||
|
||
colarspb "github.com/open-telemetry/otel-arrow/api/experimental/arrow/v1" | ||
common "github.com/open-telemetry/otel-arrow/pkg/otel/common/arrow" | ||
|
@@ -57,15 +62,43 @@ var ErrConsumerMemoryLimit = fmt.Errorf( | |
|
||
// Consumer is a BatchArrowRecords consumer. | ||
type Consumer struct { | ||
// streamConsumers is a map of reader state by SchemaID. | ||
streamConsumers map[string]*streamConsumer | ||
|
||
config Config | ||
// Config embeds the configurable parameters. | ||
Config | ||
|
||
// allocator is the one instrumented in calls to Consume, | ||
// it is reused across multiple IPC readers. | ||
allocator *common.LimitedAllocator | ||
// lastInuseValue is the previously-captured value for | ||
// allocator.Inuse(). This is used to work around a | ||
// limitation in the OTel synchronous instrument API, which we | ||
// are using because we reject the use of atomic operations | ||
// for allocators used here, since there is otherwise no | ||
// concurrency. See inuseChangeByCaller() and | ||
// inuseChangeObserve(). | ||
lastInuseValue uint64 | ||
|
||
// counts of the number of records consumed. | ||
recordsCounter metric.Int64Counter | ||
// counts of the number of schema resets by data type. | ||
schemaResetCounter metric.Int64Counter | ||
// tracks allocator.Inuse() | ||
memoryCounter metric.Int64UpDownCounter | ||
|
||
// uniqueAttr is set to an 8-byte hex digit string with | ||
// 32-bits of randomness, applied to all metric events. | ||
uniqueAttr attribute.KeyValue | ||
} | ||
|
||
type Config struct { | ||
memLimit uint64 | ||
|
||
tracesConfig *arrow.Config | ||
|
||
// from component.TelemetrySettings | ||
meterProvider metric.MeterProvider | ||
} | ||
|
||
// WithMemoryLimit configures the Arrow limited memory allocator. | ||
|
@@ -82,6 +115,14 @@ func WithTracesConfig(tcfg *arrow.Config) Option { | |
} | ||
} | ||
|
||
// WithMeterProvider configures an OTel metrics provider. If none is | ||
// configured, the global meter provider will be used. | ||
func WithMeterProvider(p metric.MeterProvider) Option { | ||
return func(cfg *Config) { | ||
cfg.meterProvider = p | ||
} | ||
} | ||
|
||
type streamConsumer struct { | ||
bufReader *bytes.Reader | ||
ipcReader *ipc.Reader | ||
|
@@ -94,20 +135,86 @@ type Option func(*Config) | |
// the corresponding OTLP representation (pmetric,Metrics, plog.Logs, ptrace.Traces). | ||
func NewConsumer(opts ...Option) *Consumer { | ||
cfg := Config{ | ||
memLimit: defaultMemoryLimit, | ||
tracesConfig: arrow.DefaultConfig(), | ||
memLimit: defaultMemoryLimit, | ||
tracesConfig: arrow.DefaultConfig(), | ||
meterProvider: otel.GetMeterProvider(), | ||
} | ||
for _, opt := range opts { | ||
opt(&cfg) | ||
} | ||
meter := cfg.meterProvider.Meter("otel-arrow/pkg/otel/arrow_record") | ||
allocator := common.NewLimitedAllocator(memory.NewGoAllocator(), cfg.memLimit) | ||
return &Consumer{ | ||
config: cfg, | ||
streamConsumers: make(map[string]*streamConsumer), | ||
Config: cfg, | ||
allocator: allocator, | ||
uniqueAttr: attribute.String("stream_unique", fmt.Sprintf("%08x", rand.Uint32())), | ||
streamConsumers: make(map[string]*streamConsumer), | ||
recordsCounter: mustWarn(meter.Int64Counter("arrow_batch_records")), | ||
schemaResetCounter: mustWarn(meter.Int64Counter("arrow_schema_resets")), | ||
memoryCounter: mustWarn(meter.Int64UpDownCounter("arrow_memory_inuse")), | ||
} | ||
} | ||
|
||
func releaseRecords(recs []*record_message.RecordMessage) { | ||
for _, rec := range recs { | ||
rec.Record().Release() | ||
} | ||
} | ||
|
||
func mustWarn[T any](t T, err error) T { | ||
if err != nil { | ||
// as it's an otel error, let someone else handle it | ||
otel.Handle(err) | ||
} | ||
return t | ||
} | ||
|
||
type placeholder struct{} | ||
|
||
// inuseChangeByCaller records the change in allocated memory, | ||
// attributing change to the caller. The `placeholder` returned | ||
// is to encourage this idiom at the top of any function that | ||
// allocates memory in the library. | ||
// | ||
// defer c.inuseChangeObserve(c.inuseChangeByCaller()) | ||
// | ||
// Note: This is a sanity check. Currently there are no allocator | ||
// operations in the caller, and the caller is not responsible | ||
// for releasing memory. The count by where=caller should equal 0. | ||
func (c *Consumer) inuseChangeByCaller() placeholder { | ||
c.inuseChangeObserveWhere("caller") | ||
return placeholder{} | ||
} | ||
|
||
// inuseChangeObserve records the change in allocated memory, | ||
// attributing change to the library. | ||
// | ||
// Note if we have a memory accounting leak, we expect it will | ||
// show up as a count `arrow_memory_inuse{where=library}`. | ||
func (c *Consumer) inuseChangeObserve(_ placeholder) { | ||
c.inuseChangeObserveWhere("library") | ||
} | ||
|
||
// inuseChangeObserveWhere records synchronous UpDownCounter events | ||
// tracking changes in allocator state. If OTel had a synchronous | ||
// cumulative updowncounter option, that would be easier to use. | ||
func (c *Consumer) inuseChangeObserveWhere(where string) { | ||
ctx := context.Background() | ||
last := c.lastInuseValue | ||
inuse := c.allocator.Inuse() | ||
attrs := metric.WithAttributes(c.uniqueAttr, attribute.String("where", where)) | ||
|
||
c.memoryCounter.Add(ctx, int64(inuse-last), attrs) | ||
c.lastInuseValue = inuse | ||
if inuse != last { | ||
fmt.Println("change by", where, "=", int64(inuse-last), "; current value", inuse) | ||
} | ||
} | ||
|
||
// MetricsFrom produces an array of [pmetric.Metrics] from a BatchArrowRecords message. | ||
func (c *Consumer) MetricsFrom(bar *colarspb.BatchArrowRecords) ([]pmetric.Metrics, error) { | ||
defer c.inuseChangeObserve(c.inuseChangeByCaller()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. basic question: what is the purpose of nesting these calls? i.e. what is the reason for returning
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See https://go.dev/doc/effective_go#defer for some detail.
This idiom allows me to use 1 line. The equivalent would be:
if after learning about defer's evaluation order you think that's more readable, I'll change it! |
||
|
||
// extracts the records from the BatchArrowRecords message | ||
records, err := c.Consume(bar) | ||
if err != nil { | ||
|
@@ -139,6 +246,7 @@ func (c *Consumer) MetricsFrom(bar *colarspb.BatchArrowRecords) ([]pmetric.Metri | |
|
||
// LogsFrom produces an array of [plog.Logs] from a BatchArrowRecords message. | ||
func (c *Consumer) LogsFrom(bar *colarspb.BatchArrowRecords) ([]plog.Logs, error) { | ||
defer c.inuseChangeObserve(c.inuseChangeByCaller()) | ||
records, err := c.Consume(bar) | ||
if err != nil { | ||
return nil, werror.Wrap(err) | ||
|
@@ -164,6 +272,7 @@ func (c *Consumer) LogsFrom(bar *colarspb.BatchArrowRecords) ([]plog.Logs, error | |
|
||
// TracesFrom produces an array of [ptrace.Traces] from a BatchArrowRecords message. | ||
func (c *Consumer) TracesFrom(bar *colarspb.BatchArrowRecords) ([]ptrace.Traces, error) { | ||
defer c.inuseChangeObserve(c.inuseChangeByCaller()) | ||
records, err := c.Consume(bar) | ||
if err != nil { | ||
return nil, werror.Wrap(err) | ||
|
@@ -172,7 +281,7 @@ func (c *Consumer) TracesFrom(bar *colarspb.BatchArrowRecords) ([]ptrace.Traces, | |
result := make([]ptrace.Traces, 0, len(records)) | ||
|
||
// Compute all related records (i.e. Attributes, Events, and Links) | ||
relatedData, tracesRecord, err := tracesotlp.RelatedDataFrom(records, c.config.tracesConfig) | ||
relatedData, tracesRecord, err := tracesotlp.RelatedDataFrom(records, c.tracesConfig) | ||
|
||
if tracesRecord != nil { | ||
// Decode OTLP traces from the combination of the main record and the | ||
|
@@ -190,7 +299,12 @@ func (c *Consumer) TracesFrom(bar *colarspb.BatchArrowRecords) ([]ptrace.Traces, | |
// Consume takes a BatchArrowRecords protobuf message and returns an array of RecordMessage. | ||
// Note: the records wrapped in the RecordMessage must be released after use by the caller. | ||
func (c *Consumer) Consume(bar *colarspb.BatchArrowRecords) ([]*record_message.RecordMessage, error) { | ||
ctx := context.Background() | ||
|
||
var ibes []*record_message.RecordMessage | ||
defer func() { | ||
c.recordsCounter.Add(ctx, int64(len(ibes)), metric.WithAttributes(c.uniqueAttr)) | ||
}() | ||
|
||
// Transform each individual OtlpArrowPayload into RecordMessage | ||
for _, payload := range bar.ArrowPayloads { | ||
|
@@ -221,13 +335,15 @@ func (c *Consumer) Consume(bar *colarspb.BatchArrowRecords) ([]*record_message.R | |
|
||
sc.bufReader.Reset(payload.Record) | ||
if sc.ipcReader == nil { | ||
c.schemaResetCounter.Add(ctx, 1, metric.WithAttributes(attribute.String("payload_type", payload.Type.String()), c.uniqueAttr)) | ||
ipcReader, err := ipc.NewReader( | ||
sc.bufReader, | ||
ipc.WithAllocator(common.NewLimitedAllocator(memory.NewGoAllocator(), c.config.memLimit)), | ||
ipc.WithAllocator(c.allocator), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not strictly equivalent to the previous code. With this modification we have a single allocator for all the payloads instead of one allocator per payload type. It's probably ok but that could explain why we observe a different behavior with the previous approach. |
||
ipc.WithDictionaryDeltas(true), | ||
ipc.WithZstd(), | ||
) | ||
if err != nil { | ||
releaseRecords(ibes) | ||
return nil, werror.Wrap(err) | ||
} | ||
sc.ipcReader = ipcReader | ||
|
@@ -244,7 +360,8 @@ func (c *Consumer) Consume(bar *colarspb.BatchArrowRecords) ([]*record_message.R | |
} | ||
|
||
if len(ibes) < len(bar.ArrowPayloads) { | ||
return ibes, ErrConsumerMemoryLimit | ||
releaseRecords(ibes) | ||
return nil, ErrConsumerMemoryLimit | ||
} | ||
|
||
return ibes, nil | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this function thread safe? i.e. will setting c.lastInuseValue here have a risk of overwriting/being overwritten or are all consumers isolated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The consumers are isolated. During an earlier discussion @lquerel asked me to avoid using asynchronous instruments, which would have required additional concurrency, so presently we have no atomic variables and no concurrent use. That's why we're wrangling a synchronous instrument to do what is ordinarily done through an observer.