Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Addition of internal consumer metrics #52

Merged
merged 3 commits into from
Sep 27, 2023

Conversation

jmacd
Copy link
Contributor

@jmacd jmacd commented Sep 27, 2023

Adds three OTel metric instruments with instrumentation on:

  • records consumed
  • schema resets
  • memory in use.

Note that I have not added tests, which would delay us a lot for little benefit. There are existing debug-level assertions that cover the memory accounting. To test this experimentally, I used a recorded data file and a configuration like:

receivers:
  file/input:
    path: "${env:DIR}/data.json.zstd"
    format: json
    compression: zstd
    throttle: 0
  otelarrow/loopback:
    protocols:
      grpc:
        endpoint: 127.0.0.1:8082
        max_recv_msg_size_mib: 24
        
exporters:
  otelarrow/forward:
    endpoint: 127.0.0.1:8082
    wait_for_ready: true
    arrow:
      disabled: false
      num_streams: 1
      disable_downgrade: true
    tls:
      insecure: true
    retry_on_failure:
      enabled: false
    sending_queue:
      enabled: false
      num_consumers: 1
    timeout: 30s
  logging:
  file/output:
    path: "${env:DIR}/output.json.zstd"
    format: json
    compression: zstd

extensions:
  pprof:

processors:
  batch:
    send_batch_size: 1000
    send_batch_max_size: 1100
    timeout: 10s

service:
  extensions: [pprof]
  pipelines:
    traces/first:
      receivers: [file/input]
      processors: [batch]
      exporters: [otelarrow/forward]
    traces/second:
      receivers: [otelarrow/loopback]
      processors: []
      exporters: [logging, file/output]
      
  telemetry:
    resource:
      "service.name": "data-replayer"
    metrics:
      address: 127.0.0.1:8889
      level: detailed
    logs:
      level: debug

Then I see metrics (via Prometheus) at :8889/metrics, like:

# HELP otelcol_arrow_batch_records_total 
# TYPE otelcol_arrow_batch_records_total counter
otelcol_arrow_batch_records_total{stream_unique="4797068c"} 52445
# HELP otelcol_arrow_memory_inuse 
# TYPE otelcol_arrow_memory_inuse gauge
otelcol_arrow_memory_inuse{stream_unique="4797068c",where="library"} 5.6499008e+07
otelcol_arrow_memory_inuse{stream_unique="4797068c",where="user"} 0
# HELP otelcol_arrow_schema_resets_total 
# TYPE otelcol_arrow_schema_resets_total counter
otelcol_arrow_schema_resets_total{payload_type="RESOURCE_ATTRS",stream_unique="4797068c"} 1
otelcol_arrow_schema_resets_total{payload_type="SPANS",stream_unique="4797068c"} 7
otelcol_arrow_schema_resets_total{payload_type="SPAN_ATTRS",stream_unique="4797068c"} 4
otelcol_arrow_schema_resets_total{payload_type="SPAN_EVENTS",stream_unique="4797068c"} 3
otelcol_arrow_schema_resets_total{payload_type="SPAN_EVENT_ATTRS",stream_unique="4797068c"} 5
otelcol_arrow_schema_resets_total{payload_type="SPAN_LINKS",stream_unique="4797068c"} 3
otelcol_arrow_schema_resets_total{payload_type="SPAN_LINK_ATTRS",stream_unique="4797068c"} 1

Note that memory inuse appears to rise slowly.

@jmacd
Copy link
Contributor Author

jmacd commented Sep 27, 2023

(By the way, acknowledging that Prometheus will not be a good way to monitor metrics that have a unique attribute created every few minutes. This instrumentation maybe too expensive to run in practice, depending on vendor support for delta temporality and SDK support for "forgetting", but I would encourage us to over-instrument now and reduce later.

}
}

// MetricsFrom produces an array of [pmetric.Metrics] from a BatchArrowRecords message.
func (c *Consumer) MetricsFrom(bar *colarspb.BatchArrowRecords) ([]pmetric.Metrics, error) {
defer c.inuseChangeObserve(c.inuseChangeByCaller())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

basic question: what is the purpose of nesting these calls? i.e. what is the reason for returning placeholder{} and what is the difference between this and function signatures that allow sequential defers like

defer c.unuseChangeObserve()
defer c.inuseChangeByCaller()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See https://go.dev/doc/effective_go#defer for some detail.

The arguments to the deferred function (which include the receiver if the function is a method) are evaluated when the defer executes, not when the call executes.

This idiom allows me to use 1 line. The equivalent would be:

c.inuseChangeByCaller()
defer c.inuseChangeObserve()

if after learning about defer's evaluation order you think that's more readable, I'll change it!

attrs := metric.WithAttributes(c.uniqueAttr, attribute.String("where", where))

c.memoryCounter.Add(ctx, int64(inuse-last), attrs)
c.lastInuseValue = inuse
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this function thread safe? i.e. will setting c.lastInuseValue here have a risk of overwriting/being overwritten or are all consumers isolated?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The consumers are isolated. During an earlier discussion @lquerel asked me to avoid using asynchronous instruments, which would have required additional concurrency, so presently we have no atomic variables and no concurrent use. That's why we're wrangling a synchronous instrument to do what is ordinarily done through an observer.

Copy link
Contributor

@lquerel lquerel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

ipcReader, err := ipc.NewReader(
sc.bufReader,
ipc.WithAllocator(common.NewLimitedAllocator(memory.NewGoAllocator(), c.config.memLimit)),
ipc.WithAllocator(c.allocator),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not strictly equivalent to the previous code. With this modification we have a single allocator for all the payloads instead of one allocator per payload type. It's probably ok but that could explain why we observe a different behavior with the previous approach.

@lquerel lquerel merged commit 8df0a8f into open-telemetry:main Sep 27, 2023
jmacd added a commit that referenced this pull request Oct 6, 2023
Since we still do not have a changelog automation, this is what is
included since v0.3.0 worthy of note:

#47
#52
#53 
#54 
#55 
#56
#57 

README improvements,
Obfuscation processor fixes.
@jmacd jmacd deleted the jmacd/consumermetrics branch November 15, 2023 19:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants