diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_metric_attribute_generator.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_metric_attribute_generator.py index 1493c1c2a..93424379c 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_metric_attribute_generator.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_metric_attribute_generator.py @@ -29,6 +29,7 @@ AWS_STEPFUNCTIONS_ACTIVITY_ARN, AWS_STEPFUNCTIONS_STATEMACHINE_ARN, ) +from amazon.opentelemetry.distro._aws_resource_attribute_configurator import get_service_attribute from amazon.opentelemetry.distro._aws_span_processing_util import ( GEN_AI_REQUEST_MODEL, LOCAL_ROOT, @@ -37,7 +38,6 @@ UNKNOWN_OPERATION, UNKNOWN_REMOTE_OPERATION, UNKNOWN_REMOTE_SERVICE, - UNKNOWN_SERVICE, extract_api_path_value, get_egress_operation, get_ingress_operation, @@ -54,12 +54,11 @@ MetricAttributeGenerator, ) from amazon.opentelemetry.distro.sqs_url_parser import SqsUrlParser -from opentelemetry.sdk.resources import Resource, ResourceAttributes +from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import BoundedAttributes, ReadableSpan from opentelemetry.semconv.trace import SpanAttributes # Pertinent OTEL attribute keys -_SERVICE_NAME: str = ResourceAttributes.SERVICE_NAME _DB_CONNECTION_STRING: str = SpanAttributes.DB_CONNECTION_STRING _DB_NAME: str = SpanAttributes.DB_NAME _DB_OPERATION: str = SpanAttributes.DB_OPERATION @@ -103,10 +102,6 @@ # Special DEPENDENCY attribute value if GRAPHQL_OPERATION_TYPE attribute key is present. _GRAPHQL: str = "graphql" -# As per https://opentelemetry.io/docs/specs/semconv/resource/#service, if service name is not specified, SDK defaults -# the service name to unknown_service: or just unknown_service. -_OTEL_UNKNOWN_SERVICE_PREFIX: str = "unknown_service" - _logger: Logger = getLogger(__name__) @@ -152,15 +147,11 @@ def _generate_dependency_metric_attributes(span: ReadableSpan, resource: Resourc def _set_service(resource: Resource, span: ReadableSpan, attributes: BoundedAttributes) -> None: - """Service is always derived from SERVICE_NAME""" - service: str = resource.attributes.get(_SERVICE_NAME) - - # In practice the service name is never None, but we can be defensive here. - if service is None or service.startswith(_OTEL_UNKNOWN_SERVICE_PREFIX): + service_name, is_unknown = get_service_attribute(resource) + if is_unknown: _log_unknown_attribute(AWS_LOCAL_SERVICE, span) - service = UNKNOWN_SERVICE - attributes[AWS_LOCAL_SERVICE] = service + attributes[AWS_LOCAL_SERVICE] = service_name def _set_ingress_operation(span: ReadableSpan, attributes: BoundedAttributes) -> None: diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_resource_attribute_configurator.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_resource_attribute_configurator.py new file mode 100644 index 000000000..e83c56668 --- /dev/null +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_resource_attribute_configurator.py @@ -0,0 +1,19 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +from amazon.opentelemetry.distro._aws_span_processing_util import UNKNOWN_SERVICE +from opentelemetry.sdk.resources import SERVICE_NAME, Resource + +# As per https://opentelemetry.io/docs/specs/semconv/resource/#service, if service name is not specified, SDK defaults +# the service name to unknown_service: or just unknown_service. +_OTEL_UNKNOWN_SERVICE_PREFIX: str = "unknown_service" + + +def get_service_attribute(resource: Resource) -> (str, bool): + """Service is always derived from SERVICE_NAME""" + service: str = resource.attributes.get(SERVICE_NAME) + + # In practice the service name is never None, but we can be defensive here. + if service is None or service.startswith(_OTEL_UNKNOWN_SERVICE_PREFIX): + return UNKNOWN_SERVICE, True + + return service, False diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py index ad0c4c138..c9d3680a5 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py @@ -3,11 +3,13 @@ # Modifications Copyright The OpenTelemetry Authors. Licensed under the Apache License 2.0 License. import os from logging import Logger, getLogger -from typing import ClassVar, Dict, Type +from typing import ClassVar, Dict, List, Type, Union from importlib_metadata import version from typing_extensions import override +from amazon.opentelemetry.distro._aws_attribute_keys import AWS_LOCAL_SERVICE +from amazon.opentelemetry.distro._aws_resource_attribute_configurator import get_service_attribute from amazon.opentelemetry.distro.always_record_sampler import AlwaysRecordSampler from amazon.opentelemetry.distro.attribute_propagating_span_processor_builder import ( AttributePropagatingSpanProcessorBuilder, @@ -19,8 +21,11 @@ from amazon.opentelemetry.distro.aws_span_metrics_processor_builder import AwsSpanMetricsProcessorBuilder from amazon.opentelemetry.distro.otlp_udp_exporter import OTLPUdpSpanExporter from amazon.opentelemetry.distro.sampler.aws_xray_remote_sampler import AwsXRayRemoteSampler +from amazon.opentelemetry.distro.scope_based_exporter import ScopeBasedPeriodicExportingMetricReader +from amazon.opentelemetry.distro.scope_based_filtering_view import ScopeBasedRetainingView from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter as OTLPHttpOTLPMetricExporter from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter +from opentelemetry.metrics import set_meter_provider from opentelemetry.sdk._configuration import ( _get_exporter_names, _get_id_generator, @@ -29,7 +34,6 @@ _import_id_generator, _import_sampler, _init_logging, - _init_metrics, _OTelSDKConfigurator, ) from opentelemetry.sdk.environment_variables import ( @@ -50,7 +54,13 @@ ObservableUpDownCounter, UpDownCounter, ) -from opentelemetry.sdk.metrics.export import AggregationTemporality, PeriodicExportingMetricReader +from opentelemetry.sdk.metrics.export import ( + AggregationTemporality, + MetricExporter, + MetricReader, + PeriodicExportingMetricReader, +) +from opentelemetry.sdk.metrics.view import LastValueAggregation, View from opentelemetry.sdk.resources import Resource, get_aggregated_resources from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor, SpanExporter @@ -59,15 +69,17 @@ from opentelemetry.semconv.resource import ResourceAttributes from opentelemetry.trace import set_tracer_provider -APP_SIGNALS_ENABLED_CONFIG = "OTEL_AWS_APP_SIGNALS_ENABLED" +DEPRECATED_APP_SIGNALS_ENABLED_CONFIG = "OTEL_AWS_APP_SIGNALS_ENABLED" APPLICATION_SIGNALS_ENABLED_CONFIG = "OTEL_AWS_APPLICATION_SIGNALS_ENABLED" -APP_SIGNALS_EXPORTER_ENDPOINT_CONFIG = "OTEL_AWS_APP_SIGNALS_EXPORTER_ENDPOINT" +APPLICATION_SIGNALS_RUNTIME_ENABLED_CONFIG = "OTEL_AWS_APPLICATION_SIGNALS_RUNTIME_ENABLED" +DEPRECATED_APP_SIGNALS_EXPORTER_ENDPOINT_CONFIG = "OTEL_AWS_APP_SIGNALS_EXPORTER_ENDPOINT" APPLICATION_SIGNALS_EXPORTER_ENDPOINT_CONFIG = "OTEL_AWS_APPLICATION_SIGNALS_EXPORTER_ENDPOINT" METRIC_EXPORT_INTERVAL_CONFIG = "OTEL_METRIC_EXPORT_INTERVAL" DEFAULT_METRIC_EXPORT_INTERVAL = 60000.0 AWS_LAMBDA_FUNCTION_NAME_CONFIG = "AWS_LAMBDA_FUNCTION_NAME" AWS_XRAY_DAEMON_ADDRESS_CONFIG = "AWS_XRAY_DAEMON_ADDRESS" OTEL_AWS_PYTHON_DEFER_TO_WORKERS_ENABLED_CONFIG = "OTEL_AWS_PYTHON_DEFER_TO_WORKERS_ENABLED" +SYSTEM_METRICS_INSTRUMENTATION_SCOPE_NAME = "opentelemetry.instrumentation.system_metrics" OTEL_EXPORTER_OTLP_TRACES_ENDPOINT = "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT" # UDP package size is not larger than 64KB LAMBDA_SPAN_EXPORT_BATCH_SIZE = 10 @@ -127,7 +139,7 @@ def _initialize_components(): else [] ) - resource = get_aggregated_resources(resource_detectors).merge(Resource.create(auto_resource)) + resource = _customize_resource(get_aggregated_resources(resource_detectors).merge(Resource.create(auto_resource))) sampler_name = _get_sampler() sampler = _custom_import_sampler(sampler_name, resource) @@ -171,6 +183,27 @@ def _init_tracing( set_tracer_provider(trace_provider) +def _init_metrics( + exporters_or_readers: Dict[str, Union[Type[MetricExporter], Type[MetricReader]]], + resource: Resource = None, +): + metric_readers = [] + views = [] + + for _, exporter_or_reader_class in exporters_or_readers.items(): + exporter_args = {} + + if issubclass(exporter_or_reader_class, MetricReader): + metric_readers.append(exporter_or_reader_class(**exporter_args)) + else: + metric_readers.append(PeriodicExportingMetricReader(exporter_or_reader_class(**exporter_args))) + + _customize_metric_exporters(metric_readers, views) + + provider = MeterProvider(resource=resource, metric_readers=metric_readers, views=views) + set_meter_provider(provider) + + # END The OpenTelemetry Authors code @@ -303,14 +336,9 @@ def _customize_span_processors(provider: TracerProvider, resource: Resource) -> # Construct meterProvider _logger.info("AWS Application Signals enabled") otel_metric_exporter = ApplicationSignalsExporterProvider().create_exporter() - export_interval_millis = float(os.environ.get(METRIC_EXPORT_INTERVAL_CONFIG, DEFAULT_METRIC_EXPORT_INTERVAL)) - _logger.debug("Span Metrics export interval: %s", export_interval_millis) - # Cap export interval to 60 seconds. This is currently required for metrics-trace correlation to work correctly. - if export_interval_millis > DEFAULT_METRIC_EXPORT_INTERVAL: - export_interval_millis = DEFAULT_METRIC_EXPORT_INTERVAL - _logger.info("AWS Application Signals metrics export interval capped to %s", export_interval_millis) + periodic_exporting_metric_reader = PeriodicExportingMetricReader( - exporter=otel_metric_exporter, export_interval_millis=export_interval_millis + exporter=otel_metric_exporter, export_interval_millis=_get_metric_export_interval() ) meter_provider: MeterProvider = MeterProvider(resource=resource, metric_readers=[periodic_exporting_metric_reader]) # Construct and set application signals metrics processor @@ -319,6 +347,61 @@ def _customize_span_processors(provider: TracerProvider, resource: Resource) -> return +def _customize_metric_exporters(metric_readers: List[MetricReader], views: List[View]) -> None: + if _is_application_signals_runtime_enabled(): + _get_runtime_metric_views(views, 0 == len(metric_readers)) + + application_signals_metric_exporter = ApplicationSignalsExporterProvider().create_exporter() + scope_based_periodic_exporting_metric_reader = ScopeBasedPeriodicExportingMetricReader( + exporter=application_signals_metric_exporter, + export_interval_millis=_get_metric_export_interval(), + registered_scope_names={SYSTEM_METRICS_INSTRUMENTATION_SCOPE_NAME}, + ) + metric_readers.append(scope_based_periodic_exporting_metric_reader) + + +def _get_runtime_metric_views(views: List[View], retain_runtime_only: bool) -> None: + runtime_metrics_scope_name = SYSTEM_METRICS_INSTRUMENTATION_SCOPE_NAME + _logger.info("Registered scope %s", runtime_metrics_scope_name) + views.append( + View( + instrument_name="system.network.connections", + meter_name=runtime_metrics_scope_name, + aggregation=LastValueAggregation(), + ) + ) + views.append( + View( + instrument_name="process.open_file_descriptor.count", + meter_name=runtime_metrics_scope_name, + aggregation=LastValueAggregation(), + ) + ) + views.append( + View( + instrument_name="process.runtime.*.memory", + meter_name=runtime_metrics_scope_name, + aggregation=LastValueAggregation(), + ) + ) + views.append( + View( + instrument_name="process.runtime.*.gc_count", + meter_name=runtime_metrics_scope_name, + aggregation=LastValueAggregation(), + ) + ) + views.append( + View( + instrument_name="process.runtime.*.thread_count", + meter_name=runtime_metrics_scope_name, + aggregation=LastValueAggregation(), + ) + ) + if retain_runtime_only: + views.append(ScopeBasedRetainingView(meter_name=runtime_metrics_scope_name)) + + def _customize_versions(auto_resource: Dict[str, any]) -> Dict[str, any]: distro_version = version("aws-opentelemetry-distro") auto_resource[ResourceAttributes.TELEMETRY_AUTO_VERSION] = distro_version + "-aws" @@ -326,18 +409,44 @@ def _customize_versions(auto_resource: Dict[str, any]) -> Dict[str, any]: return auto_resource +def _customize_resource(resource: Resource) -> Resource: + service_name, is_unknown = get_service_attribute(resource) + if is_unknown: + _logger.debug("No valid service name found") + + return resource.merge(Resource.create({AWS_LOCAL_SERVICE: service_name})) + + def _is_application_signals_enabled(): return ( - os.environ.get(APPLICATION_SIGNALS_ENABLED_CONFIG, os.environ.get(APP_SIGNALS_ENABLED_CONFIG, "false")).lower() + os.environ.get( + APPLICATION_SIGNALS_ENABLED_CONFIG, os.environ.get(DEPRECATED_APP_SIGNALS_ENABLED_CONFIG, "false") + ).lower() == "true" ) +def _is_application_signals_runtime_enabled(): + return _is_application_signals_enabled() and ( + os.environ.get(APPLICATION_SIGNALS_RUNTIME_ENABLED_CONFIG, "true").lower() == "true" + ) + + def _is_lambda_environment(): # detect if running in AWS Lambda environment return AWS_LAMBDA_FUNCTION_NAME_CONFIG in os.environ +def _get_metric_export_interval(): + export_interval_millis = float(os.environ.get(METRIC_EXPORT_INTERVAL_CONFIG, DEFAULT_METRIC_EXPORT_INTERVAL)) + _logger.debug("Span Metrics export interval: %s", export_interval_millis) + # Cap export interval to 60 seconds. This is currently required for metrics-trace correlation to work correctly. + if export_interval_millis > DEFAULT_METRIC_EXPORT_INTERVAL: + export_interval_millis = DEFAULT_METRIC_EXPORT_INTERVAL + _logger.info("AWS Application Signals metrics export interval capped to %s", export_interval_millis) + return export_interval_millis + + def _span_export_batch_size(): return LAMBDA_SPAN_EXPORT_BATCH_SIZE if _is_lambda_environment() else None @@ -372,7 +481,7 @@ def create_exporter(self): if protocol == "http/protobuf": application_signals_endpoint = os.environ.get( APPLICATION_SIGNALS_EXPORTER_ENDPOINT_CONFIG, - os.environ.get(APP_SIGNALS_EXPORTER_ENDPOINT_CONFIG, "http://localhost:4316/v1/metrics"), + os.environ.get(DEPRECATED_APP_SIGNALS_EXPORTER_ENDPOINT_CONFIG, "http://localhost:4316/v1/metrics"), ) _logger.debug("AWS Application Signals export endpoint: %s", application_signals_endpoint) return OTLPHttpOTLPMetricExporter( @@ -388,7 +497,7 @@ def create_exporter(self): application_signals_endpoint = os.environ.get( APPLICATION_SIGNALS_EXPORTER_ENDPOINT_CONFIG, - os.environ.get(APP_SIGNALS_EXPORTER_ENDPOINT_CONFIG, "localhost:4315"), + os.environ.get(DEPRECATED_APP_SIGNALS_EXPORTER_ENDPOINT_CONFIG, "localhost:4315"), ) _logger.debug("AWS Application Signals export endpoint: %s", application_signals_endpoint) return OTLPGrpcOTLPMetricExporter( diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/scope_based_exporter.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/scope_based_exporter.py new file mode 100644 index 000000000..916d7b6a3 --- /dev/null +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/scope_based_exporter.py @@ -0,0 +1,54 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +from logging import Logger, getLogger +from typing import Optional, Set + +from opentelemetry.context import _SUPPRESS_INSTRUMENTATION_KEY, attach, detach, set_value +from opentelemetry.sdk.metrics.export import MetricExporter, MetricsData, PeriodicExportingMetricReader, ResourceMetrics + +_logger: Logger = getLogger(__name__) + + +class ScopeBasedPeriodicExportingMetricReader(PeriodicExportingMetricReader): + + def __init__( + self, + exporter: MetricExporter, + export_interval_millis: Optional[float] = None, + export_timeout_millis: Optional[float] = None, + registered_scope_names: Set[str] = None, + ): + super().__init__(exporter, export_interval_millis, export_timeout_millis) + self._registered_scope_names = registered_scope_names + + def _receive_metrics( + self, + metrics_data: MetricsData, + timeout_millis: float = 10_000, + **kwargs, + ) -> None: + + token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)) + # pylint: disable=broad-exception-caught,invalid-name + try: + with self._export_lock: + exporting_resource_metrics = [] + for metric in metrics_data.resource_metrics: + exporting_scope_metrics = [] + for scope_metric in metric.scope_metrics: + if scope_metric.scope.name in self._registered_scope_names: + exporting_scope_metrics.append(scope_metric) + if len(exporting_scope_metrics) > 0: + exporting_resource_metrics.append( + ResourceMetrics( + resource=metric.resource, + scope_metrics=exporting_scope_metrics, + schema_url=metric.schema_url, + ) + ) + if len(exporting_resource_metrics) > 0: + new_metrics_data = MetricsData(resource_metrics=exporting_resource_metrics) + self._exporter.export(new_metrics_data, timeout_millis=timeout_millis) + except Exception as e: + _logger.exception("Exception while exporting metrics %s", str(e)) + detach(token) diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/scope_based_filtering_view.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/scope_based_filtering_view.py new file mode 100644 index 000000000..5d7744169 --- /dev/null +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/scope_based_filtering_view.py @@ -0,0 +1,20 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +from typing import Optional + +from opentelemetry.metrics import Instrument +from opentelemetry.sdk.metrics.view import DropAggregation, View + + +class ScopeBasedRetainingView(View): + def __init__( + self, + meter_name: Optional[str] = None, + ) -> None: + super().__init__(meter_name=meter_name, aggregation=DropAggregation()) + + def _match(self, instrument: Instrument) -> bool: + if instrument.instrumentation_scope.name != self._meter_name: + return True + + return False diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py index cecb9d9cf..cb29d0533 100644 --- a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py @@ -15,10 +15,12 @@ AwsOpenTelemetryConfigurator, _custom_import_sampler, _customize_exporter, + _customize_metric_exporters, _customize_sampler, _customize_span_processors, _export_unsampled_span_for_lambda, _is_application_signals_enabled, + _is_application_signals_runtime_enabled, _is_defer_to_workers_enabled, _is_wsgi_master_process, ) @@ -27,12 +29,14 @@ from amazon.opentelemetry.distro.otlp_udp_exporter import OTLPUdpSpanExporter from amazon.opentelemetry.distro.sampler._aws_xray_sampling_client import _AwsXRaySamplingClient from amazon.opentelemetry.distro.sampler.aws_xray_remote_sampler import _AwsXRayRemoteSampler +from amazon.opentelemetry.distro.scope_based_exporter import ScopeBasedPeriodicExportingMetricReader from opentelemetry.environment_variables import OTEL_LOGS_EXPORTER, OTEL_METRICS_EXPORTER, OTEL_TRACES_EXPORTER from opentelemetry.exporter.otlp.proto.common._internal.metrics_encoder import OTLPMetricExporterMixin from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter as OTLPGrpcOTLPMetricExporter from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter as OTLPHttpOTLPMetricExporter from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter from opentelemetry.sdk.environment_variables import OTEL_TRACES_SAMPLER, OTEL_TRACES_SAMPLER_ARG +from opentelemetry.sdk.metrics._internal.export import PeriodicExportingMetricReader from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import Span, SpanProcessor, Tracer, TracerProvider from opentelemetry.sdk.trace.export import SpanExporter @@ -57,7 +61,7 @@ def setUpClass(cls): # Overwrite exporter configs to keep tests clean, set sampler configs for tests os.environ[OTEL_TRACES_EXPORTER] = "none" - os.environ[OTEL_METRICS_EXPORTER] = "none" + os.environ[OTEL_METRICS_EXPORTER] = "console" os.environ[OTEL_LOGS_EXPORTER] = "none" os.environ[OTEL_TRACES_SAMPLER] = "traceidratio" os.environ[OTEL_TRACES_SAMPLER_ARG] = "0.01" @@ -67,6 +71,10 @@ def setUpClass(cls): aws_otel_configurator.configure() cls.tracer_provider: TracerProvider = get_tracer_provider() + def tearDown(self): + os.environ.pop("OTEL_AWS_APPLICATION_SIGNALS_ENABLED", None) + os.environ.pop("OTEL_AWS_APPLICATION_SIGNALS_RUNTIME_ENABLED", None) + # The probability of this passing once without correct IDs is low, 20 times is inconceivable. def test_provide_generate_xray_ids(self): for _ in range(20): @@ -240,17 +248,31 @@ def test_is_application_signals_enabled(self): os.environ.pop("OTEL_AWS_APPLICATION_SIGNALS_ENABLED", None) self.assertFalse(_is_application_signals_enabled()) + def test_is_application_signals_runtime_enabled(self): + os.environ.setdefault("OTEL_AWS_APPLICATION_SIGNALS_ENABLED", "True") + self.assertTrue(_is_application_signals_runtime_enabled()) + os.environ.setdefault("OTEL_AWS_APPLICATION_SIGNALS_RUNTIME_ENABLED", "False") + self.assertFalse(_is_application_signals_runtime_enabled()) + + os.environ.setdefault("OTEL_AWS_APPLICATION_SIGNALS_ENABLED", "False") + os.environ.setdefault("OTEL_AWS_APPLICATION_SIGNALS_RUNTIME_ENABLED", "True") + self.assertFalse(_is_application_signals_runtime_enabled()) + + os.environ.pop("OTEL_AWS_APPLICATION_SIGNALS_ENABLED", None) + os.environ.setdefault("OTEL_AWS_APPLICATION_SIGNALS_RUNTIME_ENABLED", None) + self.assertFalse(_is_application_signals_enabled()) + def test_customize_sampler(self): mock_sampler: Sampler = MagicMock() customized_sampler: Sampler = _customize_sampler(mock_sampler) self.assertEqual(mock_sampler, customized_sampler) os.environ.setdefault("OTEL_AWS_APPLICATION_SIGNALS_ENABLED", "True") + os.environ.setdefault("OTEL_AWS_APPLICATION_SIGNALS_RUNTIME_ENABLED", "False") customized_sampler = _customize_sampler(mock_sampler) self.assertNotEqual(mock_sampler, customized_sampler) self.assertIsInstance(customized_sampler, AlwaysRecordSampler) self.assertEqual(mock_sampler, customized_sampler._root_sampler) - os.environ.pop("OTEL_AWS_APPLICATION_SIGNALS_ENABLED", None) def test_customize_exporter(self): mock_exporter: SpanExporter = MagicMock(spec=OTLPSpanExporter) @@ -258,20 +280,20 @@ def test_customize_exporter(self): self.assertEqual(mock_exporter, customized_exporter) os.environ.setdefault("OTEL_AWS_APPLICATION_SIGNALS_ENABLED", "True") + os.environ.setdefault("OTEL_AWS_APPLICATION_SIGNALS_RUNTIME_ENABLED", "False") customized_exporter = _customize_exporter(mock_exporter, Resource.get_empty()) self.assertNotEqual(mock_exporter, customized_exporter) self.assertIsInstance(customized_exporter, AwsMetricAttributesSpanExporter) self.assertEqual(mock_exporter, customized_exporter._delegate) - os.environ.pop("OTEL_AWS_APPLICATION_SIGNALS_ENABLED", None) # when Application Signals is enabled and running in lambda os.environ.setdefault("OTEL_AWS_APPLICATION_SIGNALS_ENABLED", "True") + os.environ.setdefault("OTEL_AWS_APPLICATION_SIGNALS_RUNTIME_ENABLED", "False") os.environ.setdefault("AWS_LAMBDA_FUNCTION_NAME", "myLambdaFunc") customized_exporter = _customize_exporter(mock_exporter, Resource.get_empty()) self.assertNotEqual(mock_exporter, customized_exporter) self.assertIsInstance(customized_exporter, AwsMetricAttributesSpanExporter) self.assertIsInstance(customized_exporter._delegate, OTLPUdpSpanExporter) - os.environ.pop("OTEL_AWS_APPLICATION_SIGNALS_ENABLED", None) os.environ.pop("AWS_LAMBDA_FUNCTION_NAME", None) def test_customize_span_processors(self): @@ -280,13 +302,13 @@ def test_customize_span_processors(self): self.assertEqual(mock_tracer_provider.add_span_processor.call_count, 0) os.environ.setdefault("OTEL_AWS_APPLICATION_SIGNALS_ENABLED", "True") + os.environ.setdefault("OTEL_AWS_APPLICATION_SIGNALS_RUNTIME_ENABLED", "False") _customize_span_processors(mock_tracer_provider, Resource.get_empty()) self.assertEqual(mock_tracer_provider.add_span_processor.call_count, 2) first_processor: SpanProcessor = mock_tracer_provider.add_span_processor.call_args_list[0].args[0] self.assertIsInstance(first_processor, AttributePropagatingSpanProcessor) second_processor: SpanProcessor = mock_tracer_provider.add_span_processor.call_args_list[1].args[0] self.assertIsInstance(second_processor, AwsSpanMetricsProcessor) - os.environ.pop("OTEL_AWS_APPLICATION_SIGNALS_ENABLED", None) def test_customize_span_processors_lambda(self): mock_tracer_provider: TracerProvider = MagicMock() @@ -325,6 +347,7 @@ def test_application_signals_exporter_provider(self): def test_is_defer_to_workers_enabled(self): os.environ.setdefault("OTEL_AWS_PYTHON_DEFER_TO_WORKERS_ENABLED", "True") + os.environ.setdefault("OTEL_AWS_APPLICATION_SIGNALS_RUNTIME_ENABLED", "False") self.assertTrue(_is_defer_to_workers_enabled()) os.environ.pop("OTEL_AWS_PYTHON_DEFER_TO_WORKERS_ENABLED", None) @@ -381,6 +404,35 @@ def test_export_unsampled_span_for_lambda(self): os.environ.pop("OTEL_AWS_APPLICATION_SIGNALS_ENABLED", None) os.environ.pop("AWS_LAMBDA_FUNCTION_NAME", None) + def test_customize_metric_exporter(self): + metric_readers = [] + views = [] + + os.environ.setdefault("OTEL_AWS_APPLICATION_SIGNALS_ENABLED", "True") + os.environ.setdefault("OTEL_AWS_APPLICATION_SIGNALS_RUNTIME_ENABLED", "True") + os.environ.setdefault("OTEL_METRIC_EXPORT_INTERVAL", "1000") + + _customize_metric_exporters(metric_readers, views) + self.assertEqual(1, len(metric_readers)) + self.assertEqual(6, len(views)) + self.assertIsInstance(metric_readers[0], ScopeBasedPeriodicExportingMetricReader) + pmr: ScopeBasedPeriodicExportingMetricReader = metric_readers[0] + self.assertEqual(1000, pmr._export_interval_millis) + pmr.shutdown() + + periodic_exporting_metric_reader: PeriodicExportingMetricReader = MagicMock() + metric_readers = [periodic_exporting_metric_reader] + views = [] + _customize_metric_exporters(metric_readers, views) + self.assertEqual(2, len(metric_readers)) + self.assertIsInstance(metric_readers[1], ScopeBasedPeriodicExportingMetricReader) + pmr: ScopeBasedPeriodicExportingMetricReader = metric_readers[1] + self.assertEqual(1000, pmr._export_interval_millis) + pmr.shutdown() + self.assertEqual(5, len(views)) + + os.environ.pop("OTEL_METRIC_EXPORT_INTERVAL", None) + def validate_distro_environ(): tc: TestCase = TestCase() diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_scope_based_exporter.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_scope_based_exporter.py new file mode 100644 index 000000000..9040d6da6 --- /dev/null +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_scope_based_exporter.py @@ -0,0 +1,100 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +import unittest +from time import time_ns +from unittest.mock import MagicMock + +from amazon.opentelemetry.distro.scope_based_exporter import ScopeBasedPeriodicExportingMetricReader +from opentelemetry.sdk.metrics.export import ( + Metric, + MetricExporter, + MetricsData, + NumberDataPoint, + ResourceMetrics, + ScopeMetrics, + Sum, +) +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.util.instrumentation import InstrumentationScope + + +class TestScopeBasedPeriodicExportingMetricReader(unittest.TestCase): + def setUp(self): + self.metric_exporter: MetricExporter = MagicMock() + + def _create_periodic_reader(self, metrics, interval=60000, timeout=30000): + pmr = ScopeBasedPeriodicExportingMetricReader( + self.metric_exporter, + export_interval_millis=interval, + export_timeout_millis=timeout, + registered_scope_names={"io.test.retained"}, + ) + + def _collect(reader, timeout_millis): + pmr._receive_metrics(metrics, timeout_millis) + + pmr._set_collect_callback(_collect) + return pmr + + def test_scope_based_metric_filter(self): + scope_metrics = _scope_metrics(5, "io.test.retained") + _scope_metrics(3, "io.test.dropped") + md = MetricsData( + resource_metrics=[ + ResourceMetrics( + schema_url="", + resource=Resource.create(), + scope_metrics=scope_metrics, + ) + ] + ) + pmr = self._create_periodic_reader(md) + pmr.collect() + args, _ = self.metric_exporter.export.call_args + + exporting_metric_data: MetricsData = args[0] + self.assertEqual(len(exporting_metric_data.resource_metrics[0].scope_metrics), 5) + + def test_empty_metrics(self): + md = MetricsData( + resource_metrics=[ + ResourceMetrics( + schema_url="", + resource=Resource.create(), + scope_metrics=[], + ) + ] + ) + pmr = self._create_periodic_reader(md) + pmr.collect() + self.metric_exporter.export.assert_not_called() + + +def _scope_metrics(num: int, scope_name: str): + scope_metrics = [] + for _ in range(num): + scope_metrics.append( + ScopeMetrics( + schema_url="", + scope=InstrumentationScope(name=scope_name), + metrics=[ + Metric( + name="sum_name", + description="", + unit="", + data=Sum( + data_points=[ + NumberDataPoint( + attributes={}, + start_time_unix_nano=time_ns(), + time_unix_nano=time_ns(), + value=2, + ) + ], + aggregation_temporality=1, + is_monotonic=True, + ), + ) + ], + ), + ) + return scope_metrics diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_scope_based_filtering_view.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_scope_based_filtering_view.py new file mode 100644 index 000000000..123cb5651 --- /dev/null +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_scope_based_filtering_view.py @@ -0,0 +1,21 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +import unittest +from unittest.mock import MagicMock + +from amazon.opentelemetry.distro.scope_based_filtering_view import ScopeBasedRetainingView +from opentelemetry.metrics import Instrument + + +class TestScopeBasedRetainingView(unittest.TestCase): + def test_retained(self): + instrument: Instrument = MagicMock() + instrument.instrumentation_scope.name = "not_matched" + view = ScopeBasedRetainingView(meter_name="test_meter") + self.assertTrue(view._match(instrument)) + + def test_dropped(self): + instrument: Instrument = MagicMock() + instrument.instrumentation_scope.name = "test_meter" + view = ScopeBasedRetainingView(meter_name="test_meter") + self.assertFalse(view._match(instrument)) diff --git a/contract-tests/images/mock-collector/mock_collector_client.py b/contract-tests/images/mock-collector/mock_collector_client.py index 21389cead..66a35cba1 100644 --- a/contract-tests/images/mock-collector/mock_collector_client.py +++ b/contract-tests/images/mock-collector/mock_collector_client.py @@ -87,7 +87,7 @@ def wait_condition(exported: List[ExportTraceServiceRequest], current: List[Expo spans.append(ResourceScopeSpan(resource_span, scope_span, span)) return spans - def get_metrics(self, present_metrics: Set[str]) -> List[ResourceScopeMetric]: + def get_metrics(self, present_metrics: Set[str], exact_match=True) -> List[ResourceScopeMetric]: """Get all metrics that are currently stored in the mock collector. Returns: @@ -111,7 +111,9 @@ def wait_condition( for scope_metric in resource_metric.scope_metrics: for metric in scope_metric.metrics: received_metrics.add(metric.name.lower()) - return 0 < len(exported) == len(current) and present_metrics_lower.issubset(received_metrics) + if exact_match: + return 0 < len(exported) == len(current) and present_metrics_lower.issubset(received_metrics) + return present_metrics_lower.issubset(received_metrics) exported_metrics: List[ExportMetricsServiceRequest] = _wait_for_content(get_export, wait_condition) metrics: List[ResourceScopeMetric] = [] diff --git a/contract-tests/tests/test/amazon/base/contract_test_base.py b/contract-tests/tests/test/amazon/base/contract_test_base.py index ab3a02fe2..ba96530b0 100644 --- a/contract-tests/tests/test/amazon/base/contract_test_base.py +++ b/contract-tests/tests/test/amazon/base/contract_test_base.py @@ -90,6 +90,7 @@ def setUp(self) -> None: .with_exposed_ports(self.get_application_port()) .with_env("OTEL_METRIC_EXPORT_INTERVAL", "50") .with_env("OTEL_AWS_APPLICATION_SIGNALS_ENABLED", "true") + .with_env("OTEL_AWS_APPLICATION_SIGNALS_RUNTIME_ENABLED", self.is_runtime_enabled()) .with_env("OTEL_METRICS_EXPORTER", "none") .with_env("OTEL_EXPORTER_OTLP_PROTOCOL", "grpc") .with_env("OTEL_BSP_SCHEDULE_DELAY", "1") @@ -216,6 +217,9 @@ def get_application_otel_service_name(self) -> str: def get_application_otel_resource_attributes(self) -> str: return "service.name=" + self.get_application_otel_service_name() + def is_runtime_enabled(self) -> str: + return "false" + def _assert_aws_span_attributes(self, resource_scope_spans: List[ResourceScopeSpan], path: str, **kwargs): self.fail("Tests must implement this function") diff --git a/contract-tests/tests/test/amazon/runtime/runtime_metrics_test.py b/contract-tests/tests/test/amazon/runtime/runtime_metrics_test.py new file mode 100644 index 000000000..bd98bafc5 --- /dev/null +++ b/contract-tests/tests/test/amazon/runtime/runtime_metrics_test.py @@ -0,0 +1,104 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +from typing import Dict, List + +from mock_collector_client import ResourceScopeMetric +from requests import Response +from typing_extensions import override + +import amazon.utils.application_signals_constants as constants +from amazon.base.contract_test_base import ContractTestBase +from opentelemetry.proto.common.v1.common_pb2 import AnyValue +from opentelemetry.proto.metrics.v1.metrics_pb2 import Metric, NumberDataPoint + + +class RuntimeMetricsTest(ContractTestBase): + @override + def is_runtime_enabled(self) -> str: + return "true" + + @override + @staticmethod + def get_application_image_name() -> str: + return "aws-application-signals-tests-django-app" + + @override + def get_application_wait_pattern(self) -> str: + return "Quit the server with CONTROL-C." + + @override + def get_application_extra_environment_variables(self): + return {"DJANGO_SETTINGS_MODULE": "django_server.settings"} + + def test_runtime_succeeds(self) -> None: + self.mock_collector_client.clear_signals() + response: Response = self.send_request("GET", "success") + self.assertEqual(200, response.status_code) + + metrics: List[ResourceScopeMetric] = self.mock_collector_client.get_metrics( + { + constants.LATENCY_METRIC, + constants.ERROR_METRIC, + constants.FAULT_METRIC, + constants.PYTHON_PROCESS_CPU_TIME, + constants.PYTHON_PROCESS_CPU_UTILIZATION, + constants.PYTHON_PROCESS_GC_COUNT, + constants.PYTHON_PROCESS_MEMORY_USED, + constants.PYTHON_PROCESS_THREAD_COUNT, + }, + False, + ) + self._assert_resource_attributes(metrics) + self._assert_counter_attribute_exists(metrics, constants.PYTHON_PROCESS_CPU_TIME, "") + self._assert_gauge_attribute_exists(metrics, constants.PYTHON_PROCESS_CPU_UTILIZATION, "") + self._assert_gauge_attribute_exists(metrics, constants.PYTHON_PROCESS_GC_COUNT, "count") + self._assert_gauge_attribute_exists(metrics, constants.PYTHON_PROCESS_MEMORY_USED, "type") + self._assert_gauge_attribute_exists(metrics, constants.PYTHON_PROCESS_THREAD_COUNT, "") + + def _assert_resource_attributes( + self, + resource_scope_metrics: List[ResourceScopeMetric], + ) -> None: + for metric in resource_scope_metrics: + attribute_dict: Dict[str, AnyValue] = self._get_attributes_dict(metric.resource_metrics.resource.attributes) + self._assert_str_attribute( + attribute_dict, constants.AWS_LOCAL_SERVICE, self.get_application_otel_service_name() + ) + + def _assert_gauge_attribute_exists( + self, + resource_scope_metrics: List[ResourceScopeMetric], + metric_name: str, + attribute_key: str, + ) -> None: + target_metrics: List[Metric] = [] + for resource_scope_metric in resource_scope_metrics: + if resource_scope_metric.metric.name.lower() == metric_name.lower(): + target_metrics.append(resource_scope_metric.metric) + self.assertTrue(len(target_metrics) > 0) + + for target_metric in target_metrics: + dp_list: List[NumberDataPoint] = target_metric.gauge.data_points + self.assertTrue(len(dp_list) > 0) + if attribute_key != "": + attribute_dict: Dict[str, AnyValue] = self._get_attributes_dict(dp_list[0].attributes) + self.assertIsNotNone(attribute_dict.get(attribute_key)) + + def _assert_counter_attribute_exists( + self, + resource_scope_metrics: List[ResourceScopeMetric], + metric_name: str, + attribute_key: str, + ) -> None: + target_metrics: List[Metric] = [] + for resource_scope_metric in resource_scope_metrics: + if resource_scope_metric.metric.name.lower() == metric_name.lower(): + target_metrics.append(resource_scope_metric.metric) + self.assertTrue(len(target_metrics) > 0) + + for target_metric in target_metrics: + dp_list: List[NumberDataPoint] = target_metric.sum.data_points + self.assertTrue(len(dp_list) > 0) + if attribute_key != "": + attribute_dict: Dict[str, AnyValue] = self._get_attributes_dict(dp_list[0].attributes) + self.assertIsNotNone(attribute_dict.get(attribute_key)) diff --git a/contract-tests/tests/test/amazon/utils/application_signals_constants.py b/contract-tests/tests/test/amazon/utils/application_signals_constants.py index 93b6e084c..e77c7c4ce 100644 --- a/contract-tests/tests/test/amazon/utils/application_signals_constants.py +++ b/contract-tests/tests/test/amazon/utils/application_signals_constants.py @@ -9,6 +9,12 @@ ERROR_METRIC: str = "error" FAULT_METRIC: str = "fault" +PYTHON_PROCESS_GC_COUNT = "process.runtime.cpython.gc_count" +PYTHON_PROCESS_MEMORY_USED = "process.runtime.cpython.memory" +PYTHON_PROCESS_THREAD_COUNT = "process.runtime.cpython.thread_count" +PYTHON_PROCESS_CPU_TIME = "process.runtime.cpython.cpu_time" +PYTHON_PROCESS_CPU_UTILIZATION = "process.runtime.cpython.cpu.utilization" + # Attribute names AWS_CLOUDFORMATION_PRIMARY_IDENTIFIER: str = "aws.remote.resource.cfn.primary.identifier" AWS_LOCAL_SERVICE: str = "aws.local.service"