diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py index db86703b7..d88a8e811 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py @@ -48,6 +48,7 @@ UpDownCounter, ) from opentelemetry.sdk.metrics.export import AggregationTemporality, PeriodicExportingMetricReader +from opentelemetry.sdk.metrics.view import Aggregation, ExponentialBucketHistogramAggregation from opentelemetry.sdk.resources import Resource, get_aggregated_resources from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor, SpanExporter @@ -277,6 +278,7 @@ def create_exporter(self): ) _logger.debug("AWS Application Signals export protocol: %s", protocol) + # Application Signals depends on Delta metrics for export and use in CloudWatch Metrics temporality_dict: Dict[type, AggregationTemporality] = {} for typ in [ Counter, @@ -289,6 +291,12 @@ def create_exporter(self): ]: temporality_dict[typ] = AggregationTemporality.DELTA + # Histograms must only be exported with no more than 100 buckets per EMF specifications. Per OTEL documentation, + # max total buckets = max_size*2+1; *2 is because of positive and negative buckets, +1 because of the zero + # bucket. Negatives are not a concern for Application Signals use-case, which only measures latency, so max_size + # of 99 gives total buckets of 100. + aggregation_dict: Dict[type, Aggregation] = {Histogram: ExponentialBucketHistogramAggregation(99, 20)} + if protocol == "http/protobuf": application_signals_endpoint = os.environ.get( APPLICATION_SIGNALS_EXPORTER_ENDPOINT_CONFIG, @@ -296,7 +304,9 @@ def create_exporter(self): ) _logger.debug("AWS Application Signals export endpoint: %s", application_signals_endpoint) return OTLPHttpOTLPMetricExporter( - endpoint=application_signals_endpoint, preferred_temporality=temporality_dict + endpoint=application_signals_endpoint, + preferred_temporality=temporality_dict, + preferred_aggregation=aggregation_dict, ) if protocol == "grpc": # pylint: disable=import-outside-toplevel @@ -312,7 +322,9 @@ def create_exporter(self): ) _logger.debug("AWS Application Signals export endpoint: %s", application_signals_endpoint) return OTLPGrpcOTLPMetricExporter( - endpoint=application_signals_endpoint, preferred_temporality=temporality_dict + endpoint=application_signals_endpoint, + preferred_temporality=temporality_dict, + preferred_aggregation=aggregation_dict, ) raise RuntimeError(f"Unsupported AWS Application Signals export protocol: {protocol} ") diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_distro.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_distro.py index 9bca8acd1..3f8838083 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_distro.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_distro.py @@ -7,10 +7,7 @@ from amazon.opentelemetry.distro.patches._instrumentation_patch import apply_instrumentation_patches from opentelemetry.distro import OpenTelemetryDistro from opentelemetry.environment_variables import OTEL_PROPAGATORS, OTEL_PYTHON_ID_GENERATOR -from opentelemetry.sdk.environment_variables import ( - OTEL_EXPORTER_OTLP_METRICS_DEFAULT_HISTOGRAM_AGGREGATION, - OTEL_EXPORTER_OTLP_PROTOCOL, -) +from opentelemetry.sdk.environment_variables import OTEL_EXPORTER_OTLP_PROTOCOL _logger: Logger = getLogger(__name__) @@ -25,20 +22,14 @@ def _configure(self, **kwargs): due to gRPC having a strict dependency on the Python version the artifact was built for (OTEL observed this: https://github.com/open-telemetry/opentelemetry-operator/blob/461ba68e80e8ac6bf2603eb353547cd026119ed2/autoinstrumentation/python/requirements.txt#L2-L3) - Also sets default OTEL_PROPAGATORS, OTEL_PYTHON_ID_GENERATOR, and - OTEL_EXPORTER_OTLP_METRICS_DEFAULT_HISTOGRAM_AGGREGATION to ensure good compatibility with X-Ray and Application - Signals. + Also sets default OTEL_PROPAGATORS and OTEL_PYTHON_ID_GENERATOR to ensure good compatibility with X-Ray and + Application Signals. Also applies patches to upstream instrumentation - usually these are stopgap measures until we can contribute long-term changes to upstream. kwargs: apply_patches: bool - apply patches to upstream instrumentation. Default is True. - - TODO: - 1. OTLPMetricExporterMixin is using hard coded histogram_aggregation_type, which reads - OTEL_EXPORTER_OTLP_METRICS_DEFAULT_HISTOGRAM_AGGREGATION environment variable. Need to work with upstream to - make it to be configurable. """ # Issue: https://github.com/open-telemetry/opentelemetry-python-contrib/issues/2495 @@ -61,9 +52,6 @@ def _configure(self, **kwargs): os.environ.setdefault(OTEL_PROPAGATORS, "xray,tracecontext,b3,b3multi") os.environ.setdefault(OTEL_PYTHON_ID_GENERATOR, "xray") - os.environ.setdefault( - OTEL_EXPORTER_OTLP_METRICS_DEFAULT_HISTOGRAM_AGGREGATION, "base2_exponential_bucket_histogram" - ) if kwargs.get("apply_patches", True): apply_instrumentation_patches() diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_instrumentation_patch.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_instrumentation_patch.py index 11f4c11b6..041568dda 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_instrumentation_patch.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_instrumentation_patch.py @@ -6,6 +6,7 @@ import pkg_resources +from amazon.opentelemetry.distro.patches._otlp_metric_exporter_patches import _apply_otlp_metric_exporter_patches from amazon.opentelemetry.distro.patches._resource_detector_patches import _apply_resource_detector_patches _logger: Logger = getLogger(__name__) @@ -31,6 +32,7 @@ def apply_instrumentation_patches() -> None: # No need to check if library is installed as this patches opentelemetry.sdk, # which must be installed for the distro to work at all. _apply_resource_detector_patches() + _apply_otlp_metric_exporter_patches() def _is_installed(req: str) -> bool: diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_otlp_metric_exporter_patches.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_otlp_metric_exporter_patches.py new file mode 100644 index 000000000..2eb68a82d --- /dev/null +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_otlp_metric_exporter_patches.py @@ -0,0 +1,270 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +# Modifications Copyright The OpenTelemetry Authors. Licensed under the Apache License 2.0 License. +from os import environ +from typing import Dict, Optional + +import requests + +from opentelemetry.exporter.otlp.proto.common._internal.metrics_encoder import OTLPMetricExporterMixin, _logger +from opentelemetry.exporter.otlp.proto.http import Compression as HttpCompression +from opentelemetry.exporter.otlp.proto.http.metric_exporter import DEFAULT_ENDPOINT, DEFAULT_TIMEOUT +from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter as HttpOTLPMetricExporter +from opentelemetry.exporter.otlp.proto.http.metric_exporter import _append_metrics_path, _compression_from_env +from opentelemetry.sdk.environment_variables import ( + OTEL_EXPORTER_OTLP_CERTIFICATE, + OTEL_EXPORTER_OTLP_ENDPOINT, + OTEL_EXPORTER_OTLP_HEADERS, + OTEL_EXPORTER_OTLP_METRICS_CERTIFICATE, + OTEL_EXPORTER_OTLP_METRICS_DEFAULT_HISTOGRAM_AGGREGATION, + OTEL_EXPORTER_OTLP_METRICS_ENDPOINT, + OTEL_EXPORTER_OTLP_METRICS_HEADERS, + OTEL_EXPORTER_OTLP_METRICS_PROTOCOL, + OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE, + OTEL_EXPORTER_OTLP_METRICS_TIMEOUT, + OTEL_EXPORTER_OTLP_PROTOCOL, + OTEL_EXPORTER_OTLP_TIMEOUT, +) +from opentelemetry.sdk.metrics import ( + Counter, + Histogram, + ObservableCounter, + ObservableGauge, + ObservableUpDownCounter, + UpDownCounter, +) +from opentelemetry.sdk.metrics._internal.aggregation import Aggregation as InternalAggregation +from opentelemetry.sdk.metrics.export import AggregationTemporality, MetricExporter +from opentelemetry.sdk.metrics.view import Aggregation as ViewAggregation +from opentelemetry.sdk.metrics.view import ExplicitBucketHistogramAggregation, ExponentialBucketHistogramAggregation +from opentelemetry.util.re import parse_env_headers + + +# The OpenTelemetry Authors code +def _apply_otlp_metric_exporter_patches() -> None: # pragma: no cover + """OTLP Metrics Exporter patches for getting the following change in the upstream: + https://github.com/open-telemetry/opentelemetry-python/commit/12f449074e80fa88b59468a48b7a4b99dbcda34d + """ + + def patch_otlp_metric_exporter_mixin_common_configuration( + self, + preferred_temporality: Dict[type, AggregationTemporality] = None, + preferred_aggregation: Dict[type, ViewAggregation] = None, + ) -> None: + + # pylint: disable=unnecessary-dunder-call + MetricExporter.__init__( + self, + preferred_temporality=self._get_temporality(preferred_temporality), + preferred_aggregation=self._get_aggregation(preferred_aggregation), + ) + + def patch_otlp_metric_exporter_mixin_get_temporality( + self, preferred_temporality: Dict[type, AggregationTemporality] + ) -> Dict[type, AggregationTemporality]: + + otel_exporter_otlp_metrics_temporality_preference = ( + environ.get( + OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE, + "CUMULATIVE", + ) + .upper() + .strip() + ) + + if otel_exporter_otlp_metrics_temporality_preference == "DELTA": + instrument_class_temporality = { + Counter: AggregationTemporality.DELTA, + UpDownCounter: AggregationTemporality.CUMULATIVE, + Histogram: AggregationTemporality.DELTA, + ObservableCounter: AggregationTemporality.DELTA, + ObservableUpDownCounter: AggregationTemporality.CUMULATIVE, + ObservableGauge: AggregationTemporality.CUMULATIVE, + } + + elif otel_exporter_otlp_metrics_temporality_preference == "LOWMEMORY": + instrument_class_temporality = { + Counter: AggregationTemporality.DELTA, + UpDownCounter: AggregationTemporality.CUMULATIVE, + Histogram: AggregationTemporality.DELTA, + ObservableCounter: AggregationTemporality.CUMULATIVE, + ObservableUpDownCounter: AggregationTemporality.CUMULATIVE, + ObservableGauge: AggregationTemporality.CUMULATIVE, + } + + else: + if otel_exporter_otlp_metrics_temporality_preference != ("CUMULATIVE"): + # pylint: disable=logging-fstring-interpolation + _logger.warning( + "Unrecognized OTEL_EXPORTER_METRICS_TEMPORALITY_PREFERENCE" + " value found: " + f"{otel_exporter_otlp_metrics_temporality_preference}, " + "using CUMULATIVE" + ) + instrument_class_temporality = { + Counter: AggregationTemporality.CUMULATIVE, + UpDownCounter: AggregationTemporality.CUMULATIVE, + Histogram: AggregationTemporality.CUMULATIVE, + ObservableCounter: AggregationTemporality.CUMULATIVE, + ObservableUpDownCounter: AggregationTemporality.CUMULATIVE, + ObservableGauge: AggregationTemporality.CUMULATIVE, + } + + instrument_class_temporality.update(preferred_temporality or {}) + + return instrument_class_temporality + + def patch_otlp_metric_exporter_mixin_get_aggregation( + self, + preferred_aggregation: Dict[type, ViewAggregation], + ) -> Dict[type, ViewAggregation]: + + otel_exporter_otlp_metrics_default_histogram_aggregation = environ.get( + OTEL_EXPORTER_OTLP_METRICS_DEFAULT_HISTOGRAM_AGGREGATION, + "explicit_bucket_histogram", + ) + + if otel_exporter_otlp_metrics_default_histogram_aggregation == ("base2_exponential_bucket_histogram"): + + instrument_class_aggregation = { + Histogram: ExponentialBucketHistogramAggregation(), + } + + else: + + if otel_exporter_otlp_metrics_default_histogram_aggregation != ("explicit_bucket_histogram"): + + # pylint: disable=implicit-str-concat + _logger.warning( + ("Invalid value for %s: %s, using explicit bucket " "histogram aggregation"), + OTEL_EXPORTER_OTLP_METRICS_DEFAULT_HISTOGRAM_AGGREGATION, + otel_exporter_otlp_metrics_default_histogram_aggregation, + ) + + instrument_class_aggregation = { + Histogram: ExplicitBucketHistogramAggregation(), + } + + instrument_class_aggregation.update(preferred_aggregation or {}) + + return instrument_class_aggregation + + def patch_http_otlp_metric_exporter_init( + self, + endpoint: Optional[str] = None, + certificate_file: Optional[str] = None, + headers: Optional[Dict[str, str]] = None, + timeout: Optional[int] = None, + compression: Optional[HttpCompression] = None, + session: Optional[requests.Session] = None, + preferred_temporality: Dict[type, AggregationTemporality] = None, + preferred_aggregation: Dict[type, InternalAggregation] = None, + ): + self._endpoint = endpoint or environ.get( + OTEL_EXPORTER_OTLP_METRICS_ENDPOINT, + _append_metrics_path(environ.get(OTEL_EXPORTER_OTLP_ENDPOINT, DEFAULT_ENDPOINT)), + ) + self._certificate_file = certificate_file or environ.get( + OTEL_EXPORTER_OTLP_METRICS_CERTIFICATE, + environ.get(OTEL_EXPORTER_OTLP_CERTIFICATE, True), + ) + headers_string = environ.get( + OTEL_EXPORTER_OTLP_METRICS_HEADERS, + environ.get(OTEL_EXPORTER_OTLP_HEADERS, ""), + ) + self._headers = headers or parse_env_headers(headers_string) + self._timeout = timeout or int( + environ.get( + OTEL_EXPORTER_OTLP_METRICS_TIMEOUT, + environ.get(OTEL_EXPORTER_OTLP_TIMEOUT, DEFAULT_TIMEOUT), + ) + ) + self._compression = compression or _compression_from_env() + self._session = session or requests.Session() + self._session.headers.update(self._headers) + self._session.headers.update({"Content-Type": "application/x-protobuf"}) + if self._compression is not HttpCompression.NoCompression: + self._session.headers.update({"Content-Encoding": self._compression.value}) + + self._common_configuration(preferred_temporality, preferred_aggregation) + + OTLPMetricExporterMixin._common_configuration = patch_otlp_metric_exporter_mixin_common_configuration + OTLPMetricExporterMixin._get_temporality = patch_otlp_metric_exporter_mixin_get_temporality + OTLPMetricExporterMixin._get_aggregation = patch_otlp_metric_exporter_mixin_get_aggregation + HttpOTLPMetricExporter.__init__ = patch_http_otlp_metric_exporter_init + + protocol = environ.get( + OTEL_EXPORTER_OTLP_METRICS_PROTOCOL, environ.get(OTEL_EXPORTER_OTLP_PROTOCOL, "http/protobuf") + ) + if protocol == "grpc": + _apply_grpc_otlp_metric_exporter_patches() + + +def _apply_grpc_otlp_metric_exporter_patches(): + # pylint: disable=import-outside-toplevel + # Delay import to only occur if gRPC specifically requested. Vended Docker image will not have gRPC bundled, + # so importing it at the class level can cause runtime failures. + from typing import Sequence as TypingSequence + from typing import Tuple, Union + + from grpc import ChannelCredentials + from grpc import Compression as GrpcCompression + + from opentelemetry.exporter.otlp.proto.grpc.exporter import ( + OTLPExporterMixin, + _get_credentials, + environ_to_compression, + ) + from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter as GrpcOTLPMetricExporter + from opentelemetry.sdk.environment_variables import ( + OTEL_EXPORTER_OTLP_METRICS_COMPRESSION, + OTEL_EXPORTER_OTLP_METRICS_INSECURE, + ) + + def patch_http_grpc_metric_exporter_init( + self, + endpoint: Optional[str] = None, + insecure: Optional[bool] = None, + credentials: Optional[ChannelCredentials] = None, + headers: Optional[Union[TypingSequence[Tuple[str, str]], Dict[str, str], str]] = None, + timeout: Optional[int] = None, + compression: Optional[GrpcCompression] = None, + preferred_temporality: Dict[type, AggregationTemporality] = None, + preferred_aggregation: Dict[type, InternalAggregation] = None, + max_export_batch_size: Optional[int] = None, + ): + + if insecure is None: + insecure = environ.get(OTEL_EXPORTER_OTLP_METRICS_INSECURE) + if insecure is not None: + insecure = insecure.lower() == "true" + + if not insecure and environ.get(OTEL_EXPORTER_OTLP_METRICS_CERTIFICATE) is not None: + credentials = _get_credentials(credentials, OTEL_EXPORTER_OTLP_METRICS_CERTIFICATE) + + environ_timeout = environ.get(OTEL_EXPORTER_OTLP_METRICS_TIMEOUT) + environ_timeout = int(environ_timeout) if environ_timeout is not None else None + + compression = ( + environ_to_compression(OTEL_EXPORTER_OTLP_METRICS_COMPRESSION) if compression is None else compression + ) + + self._common_configuration(preferred_temporality, preferred_aggregation) + + # pylint: disable=unnecessary-dunder-call + OTLPExporterMixin.__init__( + self, + endpoint=endpoint or environ.get(OTEL_EXPORTER_OTLP_METRICS_ENDPOINT), + insecure=insecure, + credentials=credentials, + headers=headers or environ.get(OTEL_EXPORTER_OTLP_METRICS_HEADERS), + timeout=timeout or environ_timeout, + compression=compression, + ) + + self._max_export_batch_size: Optional[int] = max_export_batch_size + + GrpcOTLPMetricExporter.__init__ = patch_http_grpc_metric_exporter_init + + +# END The OpenTelemetry Authors code diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py index 62d1cc380..c43ce2f04 100644 --- a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py @@ -295,9 +295,6 @@ def validate_distro_environ(): # Set by AwsOpenTelemetryDistro tc.assertEqual("http/protobuf", os.environ.get("OTEL_EXPORTER_OTLP_PROTOCOL")) - tc.assertEqual( - "base2_exponential_bucket_histogram", os.environ.get("OTEL_EXPORTER_OTLP_METRICS_DEFAULT_HISTOGRAM_AGGREGATION") - ) tc.assertEqual("xray,tracecontext,b3,b3multi", os.environ.get("OTEL_PROPAGATORS")) tc.assertEqual("xray", os.environ.get("OTEL_PYTHON_ID_GENERATOR")) diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_instrumentation_patch.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_instrumentation_patch.py index bc6e851a9..6695dbea2 100644 --- a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_instrumentation_patch.py +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_instrumentation_patch.py @@ -1,13 +1,26 @@ # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 -from typing import Dict +from os import environ +from typing import Dict, Tuple from unittest import TestCase from unittest.mock import MagicMock, patch import pkg_resources from amazon.opentelemetry.distro.patches._instrumentation_patch import apply_instrumentation_patches +from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter as GrpcOTLPMetricExporter +from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter as HttpOTLPMetricExporter from opentelemetry.instrumentation.botocore.extensions import _KNOWN_EXTENSIONS +from opentelemetry.sdk.metrics import ( + Counter, + Histogram, + ObservableCounter, + ObservableGauge, + ObservableUpDownCounter, + UpDownCounter, +) +from opentelemetry.sdk.metrics.export import AggregationTemporality +from opentelemetry.sdk.metrics.view import Aggregation, ExponentialBucketHistogramAggregation from opentelemetry.semconv.trace import SpanAttributes _STREAM_NAME: str = "streamName" @@ -17,50 +30,67 @@ class TestInstrumentationPatch(TestCase): - @classmethod - def setUpClass(cls): - super().setUpClass() - cls.mock_get_distribution = patch( + """ + This test class has exactly one test, test_instrumentation_patch. This is an anti-pattern, but the scenario is + fairly unusual and we feel justifies the code smell. Essentially the _instrumentation_patch module monkey-patches + upstream components, so once it's run, it's challenging to "undo" between tests. To work around this, we have a + monolith test framework that tests two major categories of test scenarios: + 1. Patch behaviour + 2. Patch mechanism + + Patch behaviour tests validate upstream behaviour without patches, apply patches, and validate patched behaviour. + Patch mechanism tests validate the logic that is used to actually apply patches, and can be run regardless of the + pre- or post-patch behaviour. + """ + + mock_get_distribution: patch + mock_metric_exporter_init: patch + + def test_instrumentation_patch(self): + # Set up mocks used by all tests + self.mock_get_distribution = patch( "amazon.opentelemetry.distro.patches._instrumentation_patch.pkg_resources.get_distribution" ).start() + self.mock_metric_exporter_init = patch( + "opentelemetry.sdk.metrics._internal.export.MetricExporter.__init__" + ).start() - @classmethod - def tearDownClass(cls): - super().tearDownClass() - cls.mock_get_distribution.stop() + # Run tests that validate patch behaviour before and after patching + self._run_patch_behaviour_tests() + # Run tests not specifically related to patch behaviour + self._run_patch_mechanism_tests() - def test_botocore_not_installed(self): - # Test scenario 1: Botocore package not installed - self.mock_get_distribution.side_effect = pkg_resources.DistributionNotFound - apply_instrumentation_patches() - with patch( - "amazon.opentelemetry.distro.patches._botocore_patches._apply_botocore_instrumentation_patches" - ) as mock_apply_patches: - mock_apply_patches.assert_not_called() + # Clean up method patches + self.mock_get_distribution.stop() + self.mock_metric_exporter_init.stop() - def test_botocore_installed_wrong_version(self): - # Test scenario 2: Botocore package installed with wrong version - self.mock_get_distribution.side_effect = pkg_resources.VersionConflict("botocore==1.0.0", "botocore==0.0.1") - apply_instrumentation_patches() - with patch( - "amazon.opentelemetry.distro.patches._botocore_patches._apply_botocore_instrumentation_patches" - ) as mock_apply_patches: - mock_apply_patches.assert_not_called() + def _run_patch_behaviour_tests(self): + # Test setup + environ["OTEL_EXPORTER_OTLP_PROTOCOL"] = "grpc" + self.mock_get_distribution.return_value = "CorrectDistributionObject" - def test_botocore_installed_correct_version(self): - # Test scenario 3: Botocore package installed with correct version # Validate unpatched upstream behaviour - important to detect upstream changes that may break instrumentation - self._validate_unpatched_botocore_instrumentation() - - self.mock_get_distribution.return_value = "CorrectDistributionObject" + self._test_unpatched_botocore_instrumentation() + self._test_unpatched_otlp_metric_exporters() # Apply patches apply_instrumentation_patches() # Validate patched upstream behaviour - important to detect downstream changes that may break instrumentation - self._validate_patched_botocore_instrumentation() + self._test_patched_botocore_instrumentation() + self._test_patched_otlp_metric_exporters() - def _validate_unpatched_botocore_instrumentation(self): + # Test teardown + environ.pop("OTEL_EXPORTER_OTLP_PROTOCOL") + self._reset_mocks() + + def _run_patch_mechanism_tests(self): + self._test_botocore_installed_flag() + self._reset_mocks() + self._test_otlp_protocol_flag() + self._reset_mocks() + + def _test_unpatched_botocore_instrumentation(self): # Kinesis self.assertFalse("kinesis" in _KNOWN_EXTENSIONS, "Upstream has added a Kinesis extension") @@ -74,7 +104,22 @@ def _validate_unpatched_botocore_instrumentation(self): self.assertFalse("aws.sqs.queue_url" in attributes) self.assertFalse("aws.sqs.queue_name" in attributes) - def _validate_patched_botocore_instrumentation(self): + def _test_unpatched_otlp_metric_exporters(self): + (temporality_dict, aggregation_dict) = _get_metric_exporter_dicts() + + HttpOTLPMetricExporter(preferred_temporality=temporality_dict, preferred_aggregation=aggregation_dict) + self.mock_metric_exporter_init.assert_called_once() + self.assertEqual(temporality_dict, self.mock_metric_exporter_init.call_args[1]["preferred_temporality"]) + self.assertNotEqual(aggregation_dict, self.mock_metric_exporter_init.call_args[1]["preferred_aggregation"]) + self.mock_metric_exporter_init.reset_mock() + + GrpcOTLPMetricExporter(preferred_temporality=temporality_dict, preferred_aggregation=aggregation_dict) + self.mock_metric_exporter_init.assert_called_once() + self.assertEqual(temporality_dict, self.mock_metric_exporter_init.call_args[1]["preferred_temporality"]) + self.assertNotEqual(aggregation_dict, self.mock_metric_exporter_init.call_args[1]["preferred_aggregation"]) + self.mock_metric_exporter_init.reset_mock() + + def _test_patched_botocore_instrumentation(self): # Kinesis self.assertTrue("kinesis" in _KNOWN_EXTENSIONS) kinesis_attributes: Dict[str, str] = _do_extract_kinesis_attributes() @@ -96,6 +141,80 @@ def _validate_patched_botocore_instrumentation(self): self.assertTrue("aws.sqs.queue_name" in sqs_attributes) self.assertEqual(sqs_attributes["aws.sqs.queue_name"], _QUEUE_NAME) + def _test_patched_otlp_metric_exporters(self): + (temporality_dict, aggregation_dict) = _get_metric_exporter_dicts() + + HttpOTLPMetricExporter(preferred_temporality=temporality_dict, preferred_aggregation=aggregation_dict) + self.mock_metric_exporter_init.assert_called_once() + self.assertEqual(temporality_dict, self.mock_metric_exporter_init.call_args[1]["preferred_temporality"]) + self.assertEqual(aggregation_dict, self.mock_metric_exporter_init.call_args[1]["preferred_aggregation"]) + self.mock_metric_exporter_init.reset_mock() + + GrpcOTLPMetricExporter(preferred_temporality=temporality_dict, preferred_aggregation=aggregation_dict) + self.mock_metric_exporter_init.assert_called_once() + self.assertEqual(temporality_dict, self.mock_metric_exporter_init.call_args[1]["preferred_temporality"]) + self.assertEqual(aggregation_dict, self.mock_metric_exporter_init.call_args[1]["preferred_aggregation"]) + self.mock_metric_exporter_init.reset_mock() + + def _test_botocore_installed_flag(self): + with patch( + "amazon.opentelemetry.distro.patches._botocore_patches._apply_botocore_instrumentation_patches" + ) as mock_apply_patches: + self.mock_get_distribution.side_effect = pkg_resources.DistributionNotFound + apply_instrumentation_patches() + mock_apply_patches.assert_not_called() + + self.mock_get_distribution.side_effect = pkg_resources.VersionConflict("botocore==1.0.0", "botocore==0.0.1") + apply_instrumentation_patches() + mock_apply_patches.assert_not_called() + + self.mock_get_distribution.side_effect = None + self.mock_get_distribution.return_value = "CorrectDistributionObject" + apply_instrumentation_patches() + mock_apply_patches.assert_called() + + # pylint: disable=no-self-use + def _test_otlp_protocol_flag(self): + with patch( + "amazon.opentelemetry.distro.patches._otlp_metric_exporter_patches._apply_grpc_otlp_metric_exporter_patches" + ) as mock_apply_patch: + environ["OTEL_EXPORTER_OTLP_METRICS_PROTOCOL"] = "http/protobuf" + environ["OTEL_EXPORTER_OTLP_PROTOCOL"] = "http/protobuf" + apply_instrumentation_patches() + mock_apply_patch.assert_not_called() + + environ.pop("OTEL_EXPORTER_OTLP_METRICS_PROTOCOL") + environ.pop("OTEL_EXPORTER_OTLP_PROTOCOL") + apply_instrumentation_patches() + mock_apply_patch.assert_not_called() + + environ["OTEL_EXPORTER_OTLP_METRICS_PROTOCOL"] = "http/protobuf" + environ["OTEL_EXPORTER_OTLP_PROTOCOL"] = "grpc" + apply_instrumentation_patches() + mock_apply_patch.assert_not_called() + + environ["OTEL_EXPORTER_OTLP_METRICS_PROTOCOL"] = "grpc" + environ["OTEL_EXPORTER_OTLP_PROTOCOL"] = "http/protobuf" + apply_instrumentation_patches() + mock_apply_patch.assert_called_once() + mock_apply_patch.reset_mock() + + environ["OTEL_EXPORTER_OTLP_METRICS_PROTOCOL"] = "grpc" + environ.pop("OTEL_EXPORTER_OTLP_PROTOCOL") + apply_instrumentation_patches() + mock_apply_patch.assert_called_once() + mock_apply_patch.reset_mock() + + environ.pop("OTEL_EXPORTER_OTLP_METRICS_PROTOCOL") + environ["OTEL_EXPORTER_OTLP_PROTOCOL"] = "grpc" + apply_instrumentation_patches() + mock_apply_patch.assert_called_once() + mock_apply_patch.reset_mock() + + def _reset_mocks(self): + self.mock_get_distribution.reset_mock() + self.mock_metric_exporter_init.reset_mock() + def _do_extract_kinesis_attributes() -> Dict[str, str]: service_name: str = "kinesis" @@ -122,3 +241,19 @@ def _do_extract_attributes(service_name: str, params: Dict[str, str]) -> Dict[st sqs_extension = _KNOWN_EXTENSIONS[service_name]()(mock_call_context) sqs_extension.extract_attributes(attributes) return attributes + + +def _get_metric_exporter_dicts() -> Tuple: + temporality_dict: Dict[type, AggregationTemporality] = {} + for typ in [ + Counter, + UpDownCounter, + Histogram, + ObservableCounter, + ObservableUpDownCounter, + ObservableGauge, + ]: + temporality_dict[typ] = AggregationTemporality.DELTA + + aggregation_dict: Dict[type, Aggregation] = {Histogram: ExponentialBucketHistogramAggregation(99, 20)} + return temporality_dict, aggregation_dict