Skip to content

Commit

Permalink
Add local storage and retry logic for Azure Metrics Exporter + flush …
Browse files Browse the repository at this point in the history
…telemetry on exit (#845)
  • Loading branch information
lzchen authored Jan 30, 2020
1 parent 4f4f020 commit e877825
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 452 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import atexit
import logging

import requests

from opencensus.common import utils as common_utils
from opencensus.ext.azure.common import Options, utils
from opencensus.ext.azure.common.processor import ProcessorMixin
Expand All @@ -26,6 +24,8 @@
Envelope,
MetricData,
)
from opencensus.ext.azure.common.storage import LocalFileStorage
from opencensus.ext.azure.common.transport import TransportMixin
from opencensus.ext.azure.metrics_exporter import standard_metrics
from opencensus.metrics import transport
from opencensus.metrics.export.metric_descriptor import MetricDescriptorType
Expand All @@ -36,52 +36,67 @@
logger = logging.getLogger(__name__)


class MetricsExporter(ProcessorMixin):
class MetricsExporter(TransportMixin, ProcessorMixin):
"""Metrics exporter for Microsoft Azure Monitor."""

def __init__(self, options=None):
if options is None:
options = Options()
self.options = options
def __init__(self, **options):
self.options = Options(**options)
utils.validate_instrumentation_key(self.options.instrumentation_key)
if self.options.max_batch_size <= 0:
raise ValueError('Max batch size must be at least 1.')
self.export_interval = self.options.export_interval
self.max_batch_size = self.options.max_batch_size
self._telemetry_processors = []
self.storage = LocalFileStorage(
path=self.options.storage_path,
max_size=self.options.storage_max_size,
maintenance_period=self.options.storage_maintenance_period,
retention_period=self.options.storage_retention_period,
)
super(MetricsExporter, self).__init__()

def export_metrics(self, metrics):
if metrics:
envelopes = []
for metric in metrics:
# No support for histogram aggregations
type_ = metric.descriptor.type
if type_ != MetricDescriptorType.CUMULATIVE_DISTRIBUTION:
md = metric.descriptor
# Each time series will be uniquely identified by its
# label values
for time_series in metric.time_series:
# Using stats, time_series should only have one point
# which contains the aggregated value
data_point = self.create_data_points(
time_series, md)[0]
# The timestamp is when the metric was recorded
time_stamp = time_series.points[0].timestamp
# Get the properties using label keys from metric and
# label values of the time series
properties = self.create_properties(time_series, md)
envelopes.append(self.create_envelope(data_point,
time_stamp,
properties))
# Send data in batches of max_batch_size
if envelopes:
batched_envelopes = list(common_utils.window(
envelopes, self.max_batch_size))
for batch in batched_envelopes:
batch = self.apply_telemetry_processors(batch)
self._transmit_without_retry(batch)

def create_data_points(self, time_series, metric_descriptor):
envelopes = []
for metric in metrics:
envelopes.extend(self.metric_to_envelopes(metric))
# Send data in batches of max_batch_size
batched_envelopes = list(common_utils.window(
envelopes, self.max_batch_size))
for batch in batched_envelopes:
batch = self.apply_telemetry_processors(batch)
result = self._transmit(batch)
if result > 0:
self.storage.put(batch, result)

# If there is still room to transmit envelopes, transmit from storage
# if available
if len(envelopes) < self.options.max_batch_size:
self._transmit_from_storage()

def metric_to_envelopes(self, metric):
envelopes = []
# No support for histogram aggregations
if (metric.descriptor.type !=
MetricDescriptorType.CUMULATIVE_DISTRIBUTION):
md = metric.descriptor
# Each time series will be uniquely identified by its
# label values
for time_series in metric.time_series:
# Using stats, time_series should only have one
# point which contains the aggregated value
data_point = self._create_data_points(
time_series, md)[0]
# The timestamp is when the metric was recorded
timestamp = time_series.points[0].timestamp
# Get the properties using label keys from metric
# and label values of the time series
properties = self._create_properties(time_series, md)
envelopes.append(self._create_envelope(data_point,
timestamp,
properties))
return envelopes

def _create_data_points(self, time_series, metric_descriptor):
"""Convert a metric's OC time series to list of Azure data points."""
data_points = []
for point in time_series.points:
Expand All @@ -92,7 +107,7 @@ def create_data_points(self, time_series, metric_descriptor):
data_points.append(data_point)
return data_points

def create_properties(self, time_series, metric_descriptor):
def _create_properties(self, time_series, metric_descriptor):
properties = {}
# We construct a properties map from the label keys and values. We
# assume the ordering is already correct
Expand All @@ -104,11 +119,11 @@ def create_properties(self, time_series, metric_descriptor):
properties[metric_descriptor.label_keys[i].key] = value
return properties

def create_envelope(self, data_point, time_stamp, properties):
def _create_envelope(self, data_point, timestamp, properties):
envelope = Envelope(
iKey=self.options.instrumentation_key,
tags=dict(utils.azure_monitor_context),
time=time_stamp.isoformat(),
time=timestamp.isoformat(),
)
envelope.name = "Microsoft.ApplicationInsights.Metric"
data = MetricData(
Expand All @@ -118,125 +133,14 @@ def create_envelope(self, data_point, time_stamp, properties):
envelope.data = Data(baseData=data, baseType="MetricData")
return envelope

def _transmit_without_retry(self, envelopes):
# Contains logic from transport._transmit
# TODO: Remove this function from exporter and consolidate with
# transport._transmit to cover all exporter use cases. Uses cases
# pertain to properly handling failures and implementing a retry
# policy for this exporter.
# TODO: implement retry policy
"""
Transmit the data envelopes to the ingestion service.
Does not perform retry logic. For partial success and
non-retryable failure, simply outputs result to logs.
This function should never throw exception.
"""
try:
response = requests.post(
url=self.options.endpoint,
data=json.dumps(envelopes),
headers={
'Accept': 'application/json',
'Content-Type': 'application/json; charset=utf-8',
},
timeout=self.options.timeout,
)
except Exception as ex:
# No retry policy, log output
logger.warning('Transient client side error %s.', ex)
return

text = 'N/A'
data = None
# Handle the possible results from the response
if response is None:
logger.warning('Error: cannot read response.')
return
try:
status_code = response.status_code
except Exception as ex:
logger.warning('Error while reading response status code %s.', ex)
return
try:
text = response.text
except Exception as ex:
logger.warning('Error while reading response body %s.', ex)
return
try:
data = json.loads(text)
except Exception as ex:
logger.warning('Error while loading ' +
'json from response body %s.', ex)
return
if status_code == 200:
logger.info('Transmission succeeded: %s.', text)
return
# Check for retryable partial content
if status_code == 206:
if data:
try:
retryable_envelopes = []
for error in data['errors']:
if error['statusCode'] in (
429, # Too Many Requests
500, # Internal Server Error
503, # Service Unavailable
):
retryable_envelopes.append(
envelopes[error['index']])
else:
logger.error(
'Data drop %s: %s %s.',
error['statusCode'],
error['message'],
envelopes[error['index']],
)
# show the envelopes that can be retried manually for
# visibility
if retryable_envelopes:
logger.warning(
'Error while processing data. Data dropped. ' +
'Consider manually retrying for envelopes: %s.',
retryable_envelopes
)
return
except Exception:
logger.exception(
'Error while processing %s: %s.',
status_code,
text
)
return
# Check for non-retryable result
if status_code in (
206, # Partial Content
429, # Too Many Requests
500, # Internal Server Error
503, # Service Unavailable
):
# server side error (retryable)
logger.warning(
'Transient server side error %s: %s. ' +
'Consider manually trying.',
status_code,
text,
)
else:
# server side error (non-retryable)
logger.error(
'Non-retryable server side error %s: %s.',
status_code,
text,
)


def new_metrics_exporter(**options):
options_ = Options(**options)
exporter = MetricsExporter(options=options_)
exporter = MetricsExporter(**options)
producers = [stats_module.stats]
if options_.enable_standard_metrics:
if exporter.options.enable_standard_metrics:
producers.append(standard_metrics.producer)
transport.get_exporter_thread(producers,
exporter,
interval=options_.export_interval)
interval=exporter.options.export_interval)
atexit.register(exporter.export_metrics, stats_module.stats.get_metrics())
return exporter
Loading

0 comments on commit e877825

Please sign in to comment.