Skip to content

Commit

Permalink
Add Application Signals runtime metrics (#244)
Browse files Browse the repository at this point in the history
## Feature request
Add runtime metrics collection into Application Signals. Runtime metrics
will be enabled by default if `OTEL_AWS_APPLICATION_SIGNALS_ENABLED` is
`true`, and can be disabled separately by setting
`OTEL_AWS_APPLICATION_SIGNALS_RUNTIME_ENABLED` to `false`.

## Description of changes:
1. Add `ScopeBasedPeriodicExportingMetricReader` to copy metrics from
`opentelemetry.instrumentation.system_metrics` instrumentation scope to
Application Signals exporter.
2. Set `aws.local.service` into resource attributes. 
3. Add views to workaround
open-telemetry/opentelemetry-python-contrib#2861.
4. Add contract testing for runtime metrics.

## Result example
```
{
  "resource_metrics": [
    {
      "resource": {
        "attributes": {
          "telemetry.sdk.language": "python",
          "telemetry.sdk.name": "opentelemetry",
          "telemetry.sdk.version": "1.25.0",
          "service.name": "unknown_service",
          "cloud.provider": "aws",
          "cloud.platform": "aws_ec2",
          "cloud.account.id": "633750930120",
          "cloud.region": "us-east-1",
          "cloud.availability_zone": "us-east-1a",
          "host.id": "i-03ff80a878a803e0e",
          "host.type": "t2.medium",
          "host.name": "ip-172-31-25-215.ec2.internal",
          "telemetry.auto.version": "0.3.0.dev0-aws",
          "aws.local.service": "UnknownService"
        },
        "schema_url": ""
      },
      "scope_metrics": [
        {
          "scope": {
            "name": "opentelemetry.instrumentation.system_metrics",
            "version": "0.46b0",
            "schema_url": "https://opentelemetry.io/schemas/1.11.0"
          },
          "metrics": [
            {
              "name": "process.runtime.cpython.memory",
              "description": "Runtime cpython memory",
              "unit": "bytes",
              "data": {
                "data_points": [
                  {
                    "attributes": {
                      "type": "rss"
                    },
                    "start_time_unix_nano": 1724953385390606423,
                    "time_unix_nano": 1724953385391126083,
                    "value": 75747328
                  },
                  {
                    "attributes": {
                      "type": "vms"
                    },
                    "start_time_unix_nano": 1724953385390606423,
                    "time_unix_nano": 1724953385391126083,
                    "value": 546709504
                  }
                ],
                "aggregation_temporality": 2,
                "is_monotonic": false
              }
            }
          ]
        }
      ]
    }
  ]
}
```
By submitting this pull request, I confirm that my contribution is made
under the terms of the Apache 2.0 license.
  • Loading branch information
bjrara authored Oct 25, 2024
2 parents 83b87cf + acf7f68 commit 7bf7202
Show file tree
Hide file tree
Showing 12 changed files with 519 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
AWS_STEPFUNCTIONS_ACTIVITY_ARN,
AWS_STEPFUNCTIONS_STATEMACHINE_ARN,
)
from amazon.opentelemetry.distro._aws_resource_attribute_configurator import get_service_attribute
from amazon.opentelemetry.distro._aws_span_processing_util import (
GEN_AI_REQUEST_MODEL,
LOCAL_ROOT,
Expand All @@ -37,7 +38,6 @@
UNKNOWN_OPERATION,
UNKNOWN_REMOTE_OPERATION,
UNKNOWN_REMOTE_SERVICE,
UNKNOWN_SERVICE,
extract_api_path_value,
get_egress_operation,
get_ingress_operation,
Expand All @@ -54,12 +54,11 @@
MetricAttributeGenerator,
)
from amazon.opentelemetry.distro.sqs_url_parser import SqsUrlParser
from opentelemetry.sdk.resources import Resource, ResourceAttributes
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import BoundedAttributes, ReadableSpan
from opentelemetry.semconv.trace import SpanAttributes

# Pertinent OTEL attribute keys
_SERVICE_NAME: str = ResourceAttributes.SERVICE_NAME
_DB_CONNECTION_STRING: str = SpanAttributes.DB_CONNECTION_STRING
_DB_NAME: str = SpanAttributes.DB_NAME
_DB_OPERATION: str = SpanAttributes.DB_OPERATION
Expand Down Expand Up @@ -103,10 +102,6 @@
# Special DEPENDENCY attribute value if GRAPHQL_OPERATION_TYPE attribute key is present.
_GRAPHQL: str = "graphql"

# As per https://opentelemetry.io/docs/specs/semconv/resource/#service, if service name is not specified, SDK defaults
# the service name to unknown_service:<process name> or just unknown_service.
_OTEL_UNKNOWN_SERVICE_PREFIX: str = "unknown_service"

_logger: Logger = getLogger(__name__)


Expand Down Expand Up @@ -152,15 +147,11 @@ def _generate_dependency_metric_attributes(span: ReadableSpan, resource: Resourc


def _set_service(resource: Resource, span: ReadableSpan, attributes: BoundedAttributes) -> None:
"""Service is always derived from SERVICE_NAME"""
service: str = resource.attributes.get(_SERVICE_NAME)

# In practice the service name is never None, but we can be defensive here.
if service is None or service.startswith(_OTEL_UNKNOWN_SERVICE_PREFIX):
service_name, is_unknown = get_service_attribute(resource)
if is_unknown:
_log_unknown_attribute(AWS_LOCAL_SERVICE, span)
service = UNKNOWN_SERVICE

attributes[AWS_LOCAL_SERVICE] = service
attributes[AWS_LOCAL_SERVICE] = service_name


def _set_ingress_operation(span: ReadableSpan, attributes: BoundedAttributes) -> None:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
from amazon.opentelemetry.distro._aws_span_processing_util import UNKNOWN_SERVICE
from opentelemetry.sdk.resources import SERVICE_NAME, Resource

# As per https://opentelemetry.io/docs/specs/semconv/resource/#service, if service name is not specified, SDK defaults
# the service name to unknown_service:<process name> or just unknown_service.
_OTEL_UNKNOWN_SERVICE_PREFIX: str = "unknown_service"


def get_service_attribute(resource: Resource) -> (str, bool):
"""Service is always derived from SERVICE_NAME"""
service: str = resource.attributes.get(SERVICE_NAME)

# In practice the service name is never None, but we can be defensive here.
if service is None or service.startswith(_OTEL_UNKNOWN_SERVICE_PREFIX):
return UNKNOWN_SERVICE, True

return service, False
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
# Modifications Copyright The OpenTelemetry Authors. Licensed under the Apache License 2.0 License.
import os
from logging import Logger, getLogger
from typing import ClassVar, Dict, Type
from typing import ClassVar, Dict, List, Type, Union

from importlib_metadata import version
from typing_extensions import override

from amazon.opentelemetry.distro._aws_attribute_keys import AWS_LOCAL_SERVICE
from amazon.opentelemetry.distro._aws_resource_attribute_configurator import get_service_attribute
from amazon.opentelemetry.distro.always_record_sampler import AlwaysRecordSampler
from amazon.opentelemetry.distro.attribute_propagating_span_processor_builder import (
AttributePropagatingSpanProcessorBuilder,
Expand All @@ -19,8 +21,11 @@
from amazon.opentelemetry.distro.aws_span_metrics_processor_builder import AwsSpanMetricsProcessorBuilder
from amazon.opentelemetry.distro.otlp_udp_exporter import OTLPUdpSpanExporter
from amazon.opentelemetry.distro.sampler.aws_xray_remote_sampler import AwsXRayRemoteSampler
from amazon.opentelemetry.distro.scope_based_exporter import ScopeBasedPeriodicExportingMetricReader
from amazon.opentelemetry.distro.scope_based_filtering_view import ScopeBasedRetainingView
from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter as OTLPHttpOTLPMetricExporter
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.metrics import set_meter_provider
from opentelemetry.sdk._configuration import (
_get_exporter_names,
_get_id_generator,
Expand All @@ -29,7 +34,6 @@
_import_id_generator,
_import_sampler,
_init_logging,
_init_metrics,
_OTelSDKConfigurator,
)
from opentelemetry.sdk.environment_variables import (
Expand All @@ -50,7 +54,13 @@
ObservableUpDownCounter,
UpDownCounter,
)
from opentelemetry.sdk.metrics.export import AggregationTemporality, PeriodicExportingMetricReader
from opentelemetry.sdk.metrics.export import (
AggregationTemporality,
MetricExporter,
MetricReader,
PeriodicExportingMetricReader,
)
from opentelemetry.sdk.metrics.view import LastValueAggregation, View
from opentelemetry.sdk.resources import Resource, get_aggregated_resources
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, SpanExporter
Expand All @@ -59,15 +69,17 @@
from opentelemetry.semconv.resource import ResourceAttributes
from opentelemetry.trace import set_tracer_provider

APP_SIGNALS_ENABLED_CONFIG = "OTEL_AWS_APP_SIGNALS_ENABLED"
DEPRECATED_APP_SIGNALS_ENABLED_CONFIG = "OTEL_AWS_APP_SIGNALS_ENABLED"
APPLICATION_SIGNALS_ENABLED_CONFIG = "OTEL_AWS_APPLICATION_SIGNALS_ENABLED"
APP_SIGNALS_EXPORTER_ENDPOINT_CONFIG = "OTEL_AWS_APP_SIGNALS_EXPORTER_ENDPOINT"
APPLICATION_SIGNALS_RUNTIME_ENABLED_CONFIG = "OTEL_AWS_APPLICATION_SIGNALS_RUNTIME_ENABLED"
DEPRECATED_APP_SIGNALS_EXPORTER_ENDPOINT_CONFIG = "OTEL_AWS_APP_SIGNALS_EXPORTER_ENDPOINT"
APPLICATION_SIGNALS_EXPORTER_ENDPOINT_CONFIG = "OTEL_AWS_APPLICATION_SIGNALS_EXPORTER_ENDPOINT"
METRIC_EXPORT_INTERVAL_CONFIG = "OTEL_METRIC_EXPORT_INTERVAL"
DEFAULT_METRIC_EXPORT_INTERVAL = 60000.0
AWS_LAMBDA_FUNCTION_NAME_CONFIG = "AWS_LAMBDA_FUNCTION_NAME"
AWS_XRAY_DAEMON_ADDRESS_CONFIG = "AWS_XRAY_DAEMON_ADDRESS"
OTEL_AWS_PYTHON_DEFER_TO_WORKERS_ENABLED_CONFIG = "OTEL_AWS_PYTHON_DEFER_TO_WORKERS_ENABLED"
SYSTEM_METRICS_INSTRUMENTATION_SCOPE_NAME = "opentelemetry.instrumentation.system_metrics"
OTEL_EXPORTER_OTLP_TRACES_ENDPOINT = "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT"
# UDP package size is not larger than 64KB
LAMBDA_SPAN_EXPORT_BATCH_SIZE = 10
Expand Down Expand Up @@ -127,7 +139,7 @@ def _initialize_components():
else []
)

resource = get_aggregated_resources(resource_detectors).merge(Resource.create(auto_resource))
resource = _customize_resource(get_aggregated_resources(resource_detectors).merge(Resource.create(auto_resource)))

sampler_name = _get_sampler()
sampler = _custom_import_sampler(sampler_name, resource)
Expand Down Expand Up @@ -171,6 +183,27 @@ def _init_tracing(
set_tracer_provider(trace_provider)


def _init_metrics(
exporters_or_readers: Dict[str, Union[Type[MetricExporter], Type[MetricReader]]],
resource: Resource = None,
):
metric_readers = []
views = []

for _, exporter_or_reader_class in exporters_or_readers.items():
exporter_args = {}

if issubclass(exporter_or_reader_class, MetricReader):
metric_readers.append(exporter_or_reader_class(**exporter_args))
else:
metric_readers.append(PeriodicExportingMetricReader(exporter_or_reader_class(**exporter_args)))

_customize_metric_exporters(metric_readers, views)

provider = MeterProvider(resource=resource, metric_readers=metric_readers, views=views)
set_meter_provider(provider)


# END The OpenTelemetry Authors code


Expand Down Expand Up @@ -303,14 +336,9 @@ def _customize_span_processors(provider: TracerProvider, resource: Resource) ->
# Construct meterProvider
_logger.info("AWS Application Signals enabled")
otel_metric_exporter = ApplicationSignalsExporterProvider().create_exporter()
export_interval_millis = float(os.environ.get(METRIC_EXPORT_INTERVAL_CONFIG, DEFAULT_METRIC_EXPORT_INTERVAL))
_logger.debug("Span Metrics export interval: %s", export_interval_millis)
# Cap export interval to 60 seconds. This is currently required for metrics-trace correlation to work correctly.
if export_interval_millis > DEFAULT_METRIC_EXPORT_INTERVAL:
export_interval_millis = DEFAULT_METRIC_EXPORT_INTERVAL
_logger.info("AWS Application Signals metrics export interval capped to %s", export_interval_millis)

periodic_exporting_metric_reader = PeriodicExportingMetricReader(
exporter=otel_metric_exporter, export_interval_millis=export_interval_millis
exporter=otel_metric_exporter, export_interval_millis=_get_metric_export_interval()
)
meter_provider: MeterProvider = MeterProvider(resource=resource, metric_readers=[periodic_exporting_metric_reader])
# Construct and set application signals metrics processor
Expand All @@ -319,25 +347,106 @@ def _customize_span_processors(provider: TracerProvider, resource: Resource) ->
return


def _customize_metric_exporters(metric_readers: List[MetricReader], views: List[View]) -> None:
if _is_application_signals_runtime_enabled():
_get_runtime_metric_views(views, 0 == len(metric_readers))

application_signals_metric_exporter = ApplicationSignalsExporterProvider().create_exporter()
scope_based_periodic_exporting_metric_reader = ScopeBasedPeriodicExportingMetricReader(
exporter=application_signals_metric_exporter,
export_interval_millis=_get_metric_export_interval(),
registered_scope_names={SYSTEM_METRICS_INSTRUMENTATION_SCOPE_NAME},
)
metric_readers.append(scope_based_periodic_exporting_metric_reader)


def _get_runtime_metric_views(views: List[View], retain_runtime_only: bool) -> None:
runtime_metrics_scope_name = SYSTEM_METRICS_INSTRUMENTATION_SCOPE_NAME
_logger.info("Registered scope %s", runtime_metrics_scope_name)
views.append(
View(
instrument_name="system.network.connections",
meter_name=runtime_metrics_scope_name,
aggregation=LastValueAggregation(),
)
)
views.append(
View(
instrument_name="process.open_file_descriptor.count",
meter_name=runtime_metrics_scope_name,
aggregation=LastValueAggregation(),
)
)
views.append(
View(
instrument_name="process.runtime.*.memory",
meter_name=runtime_metrics_scope_name,
aggregation=LastValueAggregation(),
)
)
views.append(
View(
instrument_name="process.runtime.*.gc_count",
meter_name=runtime_metrics_scope_name,
aggregation=LastValueAggregation(),
)
)
views.append(
View(
instrument_name="process.runtime.*.thread_count",
meter_name=runtime_metrics_scope_name,
aggregation=LastValueAggregation(),
)
)
if retain_runtime_only:
views.append(ScopeBasedRetainingView(meter_name=runtime_metrics_scope_name))


def _customize_versions(auto_resource: Dict[str, any]) -> Dict[str, any]:
distro_version = version("aws-opentelemetry-distro")
auto_resource[ResourceAttributes.TELEMETRY_AUTO_VERSION] = distro_version + "-aws"
_logger.debug("aws-opentelementry-distro - version: %s", auto_resource[ResourceAttributes.TELEMETRY_AUTO_VERSION])
return auto_resource


def _customize_resource(resource: Resource) -> Resource:
service_name, is_unknown = get_service_attribute(resource)
if is_unknown:
_logger.debug("No valid service name found")

return resource.merge(Resource.create({AWS_LOCAL_SERVICE: service_name}))


def _is_application_signals_enabled():
return (
os.environ.get(APPLICATION_SIGNALS_ENABLED_CONFIG, os.environ.get(APP_SIGNALS_ENABLED_CONFIG, "false")).lower()
os.environ.get(
APPLICATION_SIGNALS_ENABLED_CONFIG, os.environ.get(DEPRECATED_APP_SIGNALS_ENABLED_CONFIG, "false")
).lower()
== "true"
)


def _is_application_signals_runtime_enabled():
return _is_application_signals_enabled() and (
os.environ.get(APPLICATION_SIGNALS_RUNTIME_ENABLED_CONFIG, "true").lower() == "true"
)


def _is_lambda_environment():
# detect if running in AWS Lambda environment
return AWS_LAMBDA_FUNCTION_NAME_CONFIG in os.environ


def _get_metric_export_interval():
export_interval_millis = float(os.environ.get(METRIC_EXPORT_INTERVAL_CONFIG, DEFAULT_METRIC_EXPORT_INTERVAL))
_logger.debug("Span Metrics export interval: %s", export_interval_millis)
# Cap export interval to 60 seconds. This is currently required for metrics-trace correlation to work correctly.
if export_interval_millis > DEFAULT_METRIC_EXPORT_INTERVAL:
export_interval_millis = DEFAULT_METRIC_EXPORT_INTERVAL
_logger.info("AWS Application Signals metrics export interval capped to %s", export_interval_millis)
return export_interval_millis


def _span_export_batch_size():
return LAMBDA_SPAN_EXPORT_BATCH_SIZE if _is_lambda_environment() else None

Expand Down Expand Up @@ -372,7 +481,7 @@ def create_exporter(self):
if protocol == "http/protobuf":
application_signals_endpoint = os.environ.get(
APPLICATION_SIGNALS_EXPORTER_ENDPOINT_CONFIG,
os.environ.get(APP_SIGNALS_EXPORTER_ENDPOINT_CONFIG, "http://localhost:4316/v1/metrics"),
os.environ.get(DEPRECATED_APP_SIGNALS_EXPORTER_ENDPOINT_CONFIG, "http://localhost:4316/v1/metrics"),
)
_logger.debug("AWS Application Signals export endpoint: %s", application_signals_endpoint)
return OTLPHttpOTLPMetricExporter(
Expand All @@ -388,7 +497,7 @@ def create_exporter(self):

application_signals_endpoint = os.environ.get(
APPLICATION_SIGNALS_EXPORTER_ENDPOINT_CONFIG,
os.environ.get(APP_SIGNALS_EXPORTER_ENDPOINT_CONFIG, "localhost:4315"),
os.environ.get(DEPRECATED_APP_SIGNALS_EXPORTER_ENDPOINT_CONFIG, "localhost:4315"),
)
_logger.debug("AWS Application Signals export endpoint: %s", application_signals_endpoint)
return OTLPGrpcOTLPMetricExporter(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
from logging import Logger, getLogger
from typing import Optional, Set

from opentelemetry.context import _SUPPRESS_INSTRUMENTATION_KEY, attach, detach, set_value
from opentelemetry.sdk.metrics.export import MetricExporter, MetricsData, PeriodicExportingMetricReader, ResourceMetrics

_logger: Logger = getLogger(__name__)


class ScopeBasedPeriodicExportingMetricReader(PeriodicExportingMetricReader):

def __init__(
self,
exporter: MetricExporter,
export_interval_millis: Optional[float] = None,
export_timeout_millis: Optional[float] = None,
registered_scope_names: Set[str] = None,
):
super().__init__(exporter, export_interval_millis, export_timeout_millis)
self._registered_scope_names = registered_scope_names

def _receive_metrics(
self,
metrics_data: MetricsData,
timeout_millis: float = 10_000,
**kwargs,
) -> None:

token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
# pylint: disable=broad-exception-caught,invalid-name
try:
with self._export_lock:
exporting_resource_metrics = []
for metric in metrics_data.resource_metrics:
exporting_scope_metrics = []
for scope_metric in metric.scope_metrics:
if scope_metric.scope.name in self._registered_scope_names:
exporting_scope_metrics.append(scope_metric)
if len(exporting_scope_metrics) > 0:
exporting_resource_metrics.append(
ResourceMetrics(
resource=metric.resource,
scope_metrics=exporting_scope_metrics,
schema_url=metric.schema_url,
)
)
if len(exporting_resource_metrics) > 0:
new_metrics_data = MetricsData(resource_metrics=exporting_resource_metrics)
self._exporter.export(new_metrics_data, timeout_millis=timeout_millis)
except Exception as e:
_logger.exception("Exception while exporting metrics %s", str(e))
detach(token)
Loading

0 comments on commit 7bf7202

Please sign in to comment.