Skip to content

Commit

Permalink
Added ForceFlush Implementation (#272)
Browse files Browse the repository at this point in the history
### Description:
Passed the force_flush function from MeteringProvider to the
AwsSpanMetricProcessor to forceFlush remaining metrics on shutdown to
the cwAgent/Collector. Compared to java/dotnet, looks like python
already flushes all the metrics and traces on shutdown even without the
force flush implementation. I'm adding it tho for consistency with other
languages.

### Tesing:
Increased the metricExporter interval and the BatchSpanProcessor delay
to 10 minutes using:
```
OTEL_METRIC_EXPORT_INTERVAL=600000 \
OTEL_BSP_SCHEDULE_DELAY=600000 \
```
With and without the force flush implementation, exiting the [sample
app](https://github.com/aws-observability/aws-otel-python-instrumentation/blob/main/sample-applications/simple-client-server/server_automatic_s3client.py)
flushed both the traces and the metrics to the collector.


By submitting this pull request, I confirm that my contribution is made
under the terms of the Apache 2.0 license.
  • Loading branch information
AsakerMohd authored Oct 10, 2024
1 parent 4f36d47 commit 75af98a
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
from typing import Dict, Optional
from typing import Callable, Dict, Optional

from typing_extensions import override

Expand Down Expand Up @@ -45,19 +45,29 @@ class AwsSpanMetricsProcessor(SpanProcessor):
_generator: MetricAttributeGenerator
_resource: Resource

_force_flush_function: Callable

# no op function to act as a default function in case forceFlushFunction was
# not supplied to the the constructor.
# pylint: disable=no-self-use
def _no_op_function(self, timeout_millis: float = None) -> bool:
return True

def __init__(
self,
error_histogram: Histogram,
fault_histogram: Histogram,
latency_histogram: Histogram,
generator: MetricAttributeGenerator,
resource: Resource,
force_flush_function: Callable = _no_op_function,
):
self._error_histogram = error_histogram
self._fault_histogram = fault_histogram
self._latency_histogram = latency_histogram
self._generator = generator
self._resource = resource
self._force_flush_function = force_flush_function

# pylint: disable=no-self-use
@override
Expand All @@ -78,8 +88,8 @@ def shutdown(self) -> None:

# pylint: disable=no-self-use
@override
def force_flush(self, timeout_millis: int = None) -> bool:
return True
def force_flush(self, timeout_millis: float = 10_000) -> bool:
return self._force_flush_function(timeout_millis)

def _record_metrics(self, span: ReadableSpan, attributes: BoundedAttributes) -> None:
# Only record metrics if non-empty attributes are returned.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,5 +63,10 @@ def build(self) -> AwsSpanMetricsProcessor:
latency_histogram.name = _LATENCY

return AwsSpanMetricsProcessor(
error_histogram, fault_histogram, latency_histogram, self._generator, self._resource
error_histogram,
fault_histogram,
latency_histogram,
self._generator,
self._resource,
self._meter_provider.force_flush,
)

0 comments on commit 75af98a

Please sign in to comment.