Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit OTLP metric histogram buckets to 100 #204

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
UpDownCounter,
)
from opentelemetry.sdk.metrics.export import AggregationTemporality, PeriodicExportingMetricReader
from opentelemetry.sdk.metrics.view import Aggregation, ExponentialBucketHistogramAggregation
from opentelemetry.sdk.resources import Resource, get_aggregated_resources
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, SpanExporter
Expand Down Expand Up @@ -277,6 +278,7 @@ def create_exporter(self):
)
_logger.debug("AWS Application Signals export protocol: %s", protocol)

# Application Signals depends on Delta metrics for export and use in CloudWatch Metrics
temporality_dict: Dict[type, AggregationTemporality] = {}
for typ in [
Counter,
Expand All @@ -289,14 +291,22 @@ def create_exporter(self):
]:
temporality_dict[typ] = AggregationTemporality.DELTA

# Histograms must only be exported with no more than 100 buckets per EMF specifications. Per OTEL documentation,
# max total buckets = max_size*2+1; *2 is because of positive and negative buckets, +1 because of the zero
# bucket. Negatives are not a concern for Application Signals use-case, which only measures latency, so max_size
# of 99 gives total buckets of 100.
aggregation_dict: Dict[type, Aggregation] = {Histogram: ExponentialBucketHistogramAggregation(99, 20)}

if protocol == "http/protobuf":
application_signals_endpoint = os.environ.get(
APPLICATION_SIGNALS_EXPORTER_ENDPOINT_CONFIG,
os.environ.get(APP_SIGNALS_EXPORTER_ENDPOINT_CONFIG, "http://localhost:4316/v1/metrics"),
)
_logger.debug("AWS Application Signals export endpoint: %s", application_signals_endpoint)
return OTLPHttpOTLPMetricExporter(
endpoint=application_signals_endpoint, preferred_temporality=temporality_dict
endpoint=application_signals_endpoint,
preferred_temporality=temporality_dict,
preferred_aggregation=aggregation_dict,
)
if protocol == "grpc":
# pylint: disable=import-outside-toplevel
Expand All @@ -312,7 +322,9 @@ def create_exporter(self):
)
_logger.debug("AWS Application Signals export endpoint: %s", application_signals_endpoint)
return OTLPGrpcOTLPMetricExporter(
endpoint=application_signals_endpoint, preferred_temporality=temporality_dict
endpoint=application_signals_endpoint,
preferred_temporality=temporality_dict,
preferred_aggregation=aggregation_dict,
)

raise RuntimeError(f"Unsupported AWS Application Signals export protocol: {protocol} ")
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@
from amazon.opentelemetry.distro.patches._instrumentation_patch import apply_instrumentation_patches
from opentelemetry.distro import OpenTelemetryDistro
from opentelemetry.environment_variables import OTEL_PROPAGATORS, OTEL_PYTHON_ID_GENERATOR
from opentelemetry.sdk.environment_variables import (
OTEL_EXPORTER_OTLP_METRICS_DEFAULT_HISTOGRAM_AGGREGATION,
OTEL_EXPORTER_OTLP_PROTOCOL,
)
from opentelemetry.sdk.environment_variables import OTEL_EXPORTER_OTLP_PROTOCOL

_logger: Logger = getLogger(__name__)

Expand All @@ -25,20 +22,14 @@ def _configure(self, **kwargs):
due to gRPC having a strict dependency on the Python version the artifact was built for (OTEL observed this:
https://github.com/open-telemetry/opentelemetry-operator/blob/461ba68e80e8ac6bf2603eb353547cd026119ed2/autoinstrumentation/python/requirements.txt#L2-L3)

Also sets default OTEL_PROPAGATORS, OTEL_PYTHON_ID_GENERATOR, and
OTEL_EXPORTER_OTLP_METRICS_DEFAULT_HISTOGRAM_AGGREGATION to ensure good compatibility with X-Ray and Application
Signals.
Also sets default OTEL_PROPAGATORS and OTEL_PYTHON_ID_GENERATOR to ensure good compatibility with X-Ray and
Application Signals.

Also applies patches to upstream instrumentation - usually these are stopgap measures until we can contribute
long-term changes to upstream.

kwargs:
apply_patches: bool - apply patches to upstream instrumentation. Default is True.

TODO:
1. OTLPMetricExporterMixin is using hard coded histogram_aggregation_type, which reads
OTEL_EXPORTER_OTLP_METRICS_DEFAULT_HISTOGRAM_AGGREGATION environment variable. Need to work with upstream to
make it to be configurable.
"""

# Issue: https://github.com/open-telemetry/opentelemetry-python-contrib/issues/2495
Expand All @@ -61,9 +52,6 @@ def _configure(self, **kwargs):

os.environ.setdefault(OTEL_PROPAGATORS, "xray,tracecontext,b3,b3multi")
os.environ.setdefault(OTEL_PYTHON_ID_GENERATOR, "xray")
os.environ.setdefault(
OTEL_EXPORTER_OTLP_METRICS_DEFAULT_HISTOGRAM_AGGREGATION, "base2_exponential_bucket_histogram"
)

if kwargs.get("apply_patches", True):
apply_instrumentation_patches()
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import pkg_resources

from amazon.opentelemetry.distro.patches._otlp_metric_exporter_patches import _apply_otlp_metric_exporter_patches
from amazon.opentelemetry.distro.patches._resource_detector_patches import _apply_resource_detector_patches

_logger: Logger = getLogger(__name__)
Expand All @@ -31,6 +32,7 @@ def apply_instrumentation_patches() -> None:
# No need to check if library is installed as this patches opentelemetry.sdk,
# which must be installed for the distro to work at all.
_apply_resource_detector_patches()
_apply_otlp_metric_exporter_patches()


def _is_installed(req: str) -> bool:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,270 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
# Modifications Copyright The OpenTelemetry Authors. Licensed under the Apache License 2.0 License.
from os import environ
from typing import Dict, Optional

import requests

from opentelemetry.exporter.otlp.proto.common._internal.metrics_encoder import OTLPMetricExporterMixin, _logger
from opentelemetry.exporter.otlp.proto.http import Compression as HttpCompression
from opentelemetry.exporter.otlp.proto.http.metric_exporter import DEFAULT_ENDPOINT, DEFAULT_TIMEOUT
from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter as HttpOTLPMetricExporter
from opentelemetry.exporter.otlp.proto.http.metric_exporter import _append_metrics_path, _compression_from_env
from opentelemetry.sdk.environment_variables import (
OTEL_EXPORTER_OTLP_CERTIFICATE,
OTEL_EXPORTER_OTLP_ENDPOINT,
OTEL_EXPORTER_OTLP_HEADERS,
OTEL_EXPORTER_OTLP_METRICS_CERTIFICATE,
OTEL_EXPORTER_OTLP_METRICS_DEFAULT_HISTOGRAM_AGGREGATION,
OTEL_EXPORTER_OTLP_METRICS_ENDPOINT,
OTEL_EXPORTER_OTLP_METRICS_HEADERS,
OTEL_EXPORTER_OTLP_METRICS_PROTOCOL,
OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE,
OTEL_EXPORTER_OTLP_METRICS_TIMEOUT,
OTEL_EXPORTER_OTLP_PROTOCOL,
OTEL_EXPORTER_OTLP_TIMEOUT,
)
from opentelemetry.sdk.metrics import (
Counter,
Histogram,
ObservableCounter,
ObservableGauge,
ObservableUpDownCounter,
UpDownCounter,
)
from opentelemetry.sdk.metrics._internal.aggregation import Aggregation as InternalAggregation
from opentelemetry.sdk.metrics.export import AggregationTemporality, MetricExporter
from opentelemetry.sdk.metrics.view import Aggregation as ViewAggregation
from opentelemetry.sdk.metrics.view import ExplicitBucketHistogramAggregation, ExponentialBucketHistogramAggregation
from opentelemetry.util.re import parse_env_headers


# The OpenTelemetry Authors code
def _apply_otlp_metric_exporter_patches() -> None: # pragma: no cover
"""OTLP Metrics Exporter patches for getting the following change in the upstream:
https://github.com/open-telemetry/opentelemetry-python/commit/12f449074e80fa88b59468a48b7a4b99dbcda34d
"""

def patch_otlp_metric_exporter_mixin_common_configuration(
self,
preferred_temporality: Dict[type, AggregationTemporality] = None,
preferred_aggregation: Dict[type, ViewAggregation] = None,
) -> None:

# pylint: disable=unnecessary-dunder-call
MetricExporter.__init__(
self,
preferred_temporality=self._get_temporality(preferred_temporality),
preferred_aggregation=self._get_aggregation(preferred_aggregation),
)

def patch_otlp_metric_exporter_mixin_get_temporality(
self, preferred_temporality: Dict[type, AggregationTemporality]
) -> Dict[type, AggregationTemporality]:

otel_exporter_otlp_metrics_temporality_preference = (
environ.get(
OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE,
"CUMULATIVE",
)
.upper()
.strip()
)

if otel_exporter_otlp_metrics_temporality_preference == "DELTA":
instrument_class_temporality = {
Counter: AggregationTemporality.DELTA,
UpDownCounter: AggregationTemporality.CUMULATIVE,
Histogram: AggregationTemporality.DELTA,
ObservableCounter: AggregationTemporality.DELTA,
ObservableUpDownCounter: AggregationTemporality.CUMULATIVE,
ObservableGauge: AggregationTemporality.CUMULATIVE,
}

elif otel_exporter_otlp_metrics_temporality_preference == "LOWMEMORY":
instrument_class_temporality = {
Counter: AggregationTemporality.DELTA,
UpDownCounter: AggregationTemporality.CUMULATIVE,
Histogram: AggregationTemporality.DELTA,
ObservableCounter: AggregationTemporality.CUMULATIVE,
ObservableUpDownCounter: AggregationTemporality.CUMULATIVE,
ObservableGauge: AggregationTemporality.CUMULATIVE,
}

else:
if otel_exporter_otlp_metrics_temporality_preference != ("CUMULATIVE"):
# pylint: disable=logging-fstring-interpolation
_logger.warning(
"Unrecognized OTEL_EXPORTER_METRICS_TEMPORALITY_PREFERENCE"
" value found: "
f"{otel_exporter_otlp_metrics_temporality_preference}, "
"using CUMULATIVE"
)
instrument_class_temporality = {
Counter: AggregationTemporality.CUMULATIVE,
UpDownCounter: AggregationTemporality.CUMULATIVE,
Histogram: AggregationTemporality.CUMULATIVE,
ObservableCounter: AggregationTemporality.CUMULATIVE,
ObservableUpDownCounter: AggregationTemporality.CUMULATIVE,
ObservableGauge: AggregationTemporality.CUMULATIVE,
}

instrument_class_temporality.update(preferred_temporality or {})

return instrument_class_temporality

def patch_otlp_metric_exporter_mixin_get_aggregation(
self,
preferred_aggregation: Dict[type, ViewAggregation],
) -> Dict[type, ViewAggregation]:

otel_exporter_otlp_metrics_default_histogram_aggregation = environ.get(
OTEL_EXPORTER_OTLP_METRICS_DEFAULT_HISTOGRAM_AGGREGATION,
"explicit_bucket_histogram",
)

if otel_exporter_otlp_metrics_default_histogram_aggregation == ("base2_exponential_bucket_histogram"):

instrument_class_aggregation = {
Histogram: ExponentialBucketHistogramAggregation(),
}

else:

if otel_exporter_otlp_metrics_default_histogram_aggregation != ("explicit_bucket_histogram"):

# pylint: disable=implicit-str-concat
_logger.warning(
("Invalid value for %s: %s, using explicit bucket " "histogram aggregation"),
OTEL_EXPORTER_OTLP_METRICS_DEFAULT_HISTOGRAM_AGGREGATION,
otel_exporter_otlp_metrics_default_histogram_aggregation,
)

instrument_class_aggregation = {
Histogram: ExplicitBucketHistogramAggregation(),
}

instrument_class_aggregation.update(preferred_aggregation or {})

return instrument_class_aggregation

def patch_http_otlp_metric_exporter_init(
self,
endpoint: Optional[str] = None,
certificate_file: Optional[str] = None,
headers: Optional[Dict[str, str]] = None,
timeout: Optional[int] = None,
compression: Optional[HttpCompression] = None,
session: Optional[requests.Session] = None,
preferred_temporality: Dict[type, AggregationTemporality] = None,
preferred_aggregation: Dict[type, InternalAggregation] = None,
):
self._endpoint = endpoint or environ.get(
OTEL_EXPORTER_OTLP_METRICS_ENDPOINT,
_append_metrics_path(environ.get(OTEL_EXPORTER_OTLP_ENDPOINT, DEFAULT_ENDPOINT)),
)
self._certificate_file = certificate_file or environ.get(
OTEL_EXPORTER_OTLP_METRICS_CERTIFICATE,
environ.get(OTEL_EXPORTER_OTLP_CERTIFICATE, True),
)
headers_string = environ.get(
OTEL_EXPORTER_OTLP_METRICS_HEADERS,
environ.get(OTEL_EXPORTER_OTLP_HEADERS, ""),
)
self._headers = headers or parse_env_headers(headers_string)
self._timeout = timeout or int(
environ.get(
OTEL_EXPORTER_OTLP_METRICS_TIMEOUT,
environ.get(OTEL_EXPORTER_OTLP_TIMEOUT, DEFAULT_TIMEOUT),
)
)
self._compression = compression or _compression_from_env()
self._session = session or requests.Session()
self._session.headers.update(self._headers)
self._session.headers.update({"Content-Type": "application/x-protobuf"})
if self._compression is not HttpCompression.NoCompression:
self._session.headers.update({"Content-Encoding": self._compression.value})

self._common_configuration(preferred_temporality, preferred_aggregation)

OTLPMetricExporterMixin._common_configuration = patch_otlp_metric_exporter_mixin_common_configuration
OTLPMetricExporterMixin._get_temporality = patch_otlp_metric_exporter_mixin_get_temporality
OTLPMetricExporterMixin._get_aggregation = patch_otlp_metric_exporter_mixin_get_aggregation
HttpOTLPMetricExporter.__init__ = patch_http_otlp_metric_exporter_init

protocol = environ.get(
OTEL_EXPORTER_OTLP_METRICS_PROTOCOL, environ.get(OTEL_EXPORTER_OTLP_PROTOCOL, "http/protobuf")
)
if protocol == "grpc":
_apply_grpc_otlp_metric_exporter_patches()


def _apply_grpc_otlp_metric_exporter_patches():
# pylint: disable=import-outside-toplevel
# Delay import to only occur if gRPC specifically requested. Vended Docker image will not have gRPC bundled,
# so importing it at the class level can cause runtime failures.
from typing import Sequence as TypingSequence
from typing import Tuple, Union

from grpc import ChannelCredentials
from grpc import Compression as GrpcCompression

from opentelemetry.exporter.otlp.proto.grpc.exporter import (
OTLPExporterMixin,
_get_credentials,
environ_to_compression,
)
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter as GrpcOTLPMetricExporter
from opentelemetry.sdk.environment_variables import (
OTEL_EXPORTER_OTLP_METRICS_COMPRESSION,
OTEL_EXPORTER_OTLP_METRICS_INSECURE,
)

def patch_http_grpc_metric_exporter_init(
self,
endpoint: Optional[str] = None,
insecure: Optional[bool] = None,
credentials: Optional[ChannelCredentials] = None,
headers: Optional[Union[TypingSequence[Tuple[str, str]], Dict[str, str], str]] = None,
timeout: Optional[int] = None,
compression: Optional[GrpcCompression] = None,
preferred_temporality: Dict[type, AggregationTemporality] = None,
preferred_aggregation: Dict[type, InternalAggregation] = None,
max_export_batch_size: Optional[int] = None,
):

if insecure is None:
insecure = environ.get(OTEL_EXPORTER_OTLP_METRICS_INSECURE)
if insecure is not None:
insecure = insecure.lower() == "true"

if not insecure and environ.get(OTEL_EXPORTER_OTLP_METRICS_CERTIFICATE) is not None:
credentials = _get_credentials(credentials, OTEL_EXPORTER_OTLP_METRICS_CERTIFICATE)

environ_timeout = environ.get(OTEL_EXPORTER_OTLP_METRICS_TIMEOUT)
environ_timeout = int(environ_timeout) if environ_timeout is not None else None

compression = (
environ_to_compression(OTEL_EXPORTER_OTLP_METRICS_COMPRESSION) if compression is None else compression
)

self._common_configuration(preferred_temporality, preferred_aggregation)

# pylint: disable=unnecessary-dunder-call
OTLPExporterMixin.__init__(
self,
endpoint=endpoint or environ.get(OTEL_EXPORTER_OTLP_METRICS_ENDPOINT),
insecure=insecure,
credentials=credentials,
headers=headers or environ.get(OTEL_EXPORTER_OTLP_METRICS_HEADERS),
timeout=timeout or environ_timeout,
compression=compression,
)

self._max_export_batch_size: Optional[int] = max_export_batch_size

GrpcOTLPMetricExporter.__init__ = patch_http_grpc_metric_exporter_init


# END The OpenTelemetry Authors code
Original file line number Diff line number Diff line change
Expand Up @@ -295,9 +295,6 @@ def validate_distro_environ():

# Set by AwsOpenTelemetryDistro
tc.assertEqual("http/protobuf", os.environ.get("OTEL_EXPORTER_OTLP_PROTOCOL"))
tc.assertEqual(
"base2_exponential_bucket_histogram", os.environ.get("OTEL_EXPORTER_OTLP_METRICS_DEFAULT_HISTOGRAM_AGGREGATION")
)
tc.assertEqual("xray,tracecontext,b3,b3multi", os.environ.get("OTEL_PROPAGATORS"))
tc.assertEqual("xray", os.environ.get("OTEL_PYTHON_ID_GENERATOR"))

Expand Down
Loading
Loading