Skip to content

Commit

Permalink
Merge branch 'main' into inference_span_update
Browse files Browse the repository at this point in the history
  • Loading branch information
Hansrajr committed Nov 25, 2024
2 parents 6ed43e3 + 388209f commit f0ea598
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 37 deletions.
28 changes: 14 additions & 14 deletions src/monocle_apptrace/exporters/aws/s3_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@
import asyncio
import boto3
from botocore.exceptions import ClientError
from botocore.exceptions import (
BotoCoreError,
ConnectionClosedError,
ConnectTimeoutError,
EndpointConnectionError,
ReadTimeoutError,
)
from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
from monocle_apptrace.exporters.base_exporter import SpanExporterBase
Expand All @@ -14,7 +21,7 @@
logger = logging.getLogger(__name__)

class S3SpanExporter(SpanExporterBase):
def __init__(self, bucket_name=None, region_name="us-east-1"):
def __init__(self, bucket_name=None, region_name=None):
super().__init__()
# Use environment variables if credentials are not provided
DEFAULT_FILE_PREFIX = "monocle_trace__"
Expand All @@ -36,13 +43,10 @@ def __init__(self, bucket_name=None, region_name="us-east-1"):
# Check if bucket exists or create it
if not self.__bucket_exists(self.bucket_name):
try:
if region_name == "us-east-1":
self.s3_client.create_bucket(Bucket=self.bucket_name)
else:
self.s3_client.create_bucket(
Bucket=self.bucket_name,
CreateBucketConfiguration={'LocationConstraint': region_name}
)
self.s3_client.create_bucket(
Bucket=self.bucket_name,
CreateBucketConfiguration={'LocationConstraint': region_name}
)
logger.info(f"Bucket {self.bucket_name} created successfully.")
except ClientError as e:
logger.error(f"Error creating bucket {self.bucket_name}: {e}")
Expand Down Expand Up @@ -131,15 +135,11 @@ async def __export_spans(self):
serialized_data = self.__serialize_spans(batch_to_export)
self.export_queue = self.export_queue[self.max_batch_size:]
try:
if asyncio.get_event_loop().is_running():
task = asyncio.create_task(self._retry_with_backoff(self.__upload_to_s3, serialized_data))
await task
else:
await self._retry_with_backoff(self.__upload_to_s3, serialized_data)

self.__upload_to_s3(serialized_data)
except Exception as e:
logger.error(f"Failed to upload span batch: {e}")

@SpanExporterBase.retry_with_backoff(exceptions=(EndpointConnectionError, ConnectionClosedError, ReadTimeoutError, ConnectTimeoutError))
def __upload_to_s3(self, span_data_batch: str):
current_time = datetime.datetime.now().strftime(self.time_format)
file_name = f"{self.file_prefix}{current_time}.ndjson"
Expand Down
7 changes: 2 additions & 5 deletions src/monocle_apptrace/exporters/azure/blob_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,11 @@ async def __export_spans(self):
serialized_data = self.__serialize_spans(batch_to_export)
self.export_queue = self.export_queue[self.max_batch_size:]
try:
if asyncio.get_event_loop().is_running():
task = asyncio.create_task(self._retry_with_backoff(self.__upload_to_blob, serialized_data))
await task
else:
await self._retry_with_backoff(self.__upload_to_blob, serialized_data)
self.__upload_to_blob(serialized_data)
except Exception as e:
logger.error(f"Failed to upload span batch: {e}")

@SpanExporterBase.retry_with_backoff(exceptions=(ResourceNotFoundError, ClientAuthenticationError, ServiceRequestError))
def __upload_to_blob(self, span_data_batch: str):
current_time = datetime.datetime.now().strftime(self.time_format)
file_name = f"{self.file_prefix}{current_time}.ndjson"
Expand Down
37 changes: 19 additions & 18 deletions src/monocle_apptrace/exporters/base_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import random
import logging
from abc import ABC, abstractmethod
from azure.core.exceptions import ServiceRequestError, ClientAuthenticationError
from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
from typing import Sequence
Expand All @@ -28,20 +27,22 @@ async def force_flush(self, timeout_millis: int = 30000) -> bool:
def shutdown(self) -> None:
pass

async def _retry_with_backoff(self, func, *args, **kwargs):
"""Handle retries with exponential backoff."""
attempt = 0
while attempt < self.max_retries:
try:
return func(*args, **kwargs)
except ServiceRequestError as e:
logger.warning(f"Network connectivity error: {e}. Retrying in {self.backoff_factor ** attempt} seconds...")
sleep_time = self.backoff_factor * (2 ** attempt) + random.uniform(0, 1)
await asyncio.sleep(sleep_time)
attempt += 1
except ClientAuthenticationError as e:
logger.error(f"Failed to authenticate: {str(e)}")
break

logger.error("Max retries exceeded.")
raise ServiceRequestError(message="Max retries exceeded.")
@staticmethod
def retry_with_backoff(retries=3, backoff_in_seconds=1, max_backoff_in_seconds=32, exceptions=(Exception,)):
def decorator(func):
def wrapper(*args, **kwargs):
attempt = 0
while attempt < retries:
try:
return func(*args, **kwargs)
except exceptions as e:
attempt += 1
sleep_time = min(max_backoff_in_seconds, backoff_in_seconds * (2 ** (attempt - 1)))
sleep_time = sleep_time * (1 + random.uniform(-0.1, 0.1)) # Add jitter
logger.warning(f"Network connectivity error, Attempt {attempt} failed: {e}. Retrying in {sleep_time:.2f} seconds...")
time.sleep(sleep_time)
raise Exception(f"Failed after {retries} attempts")

return wrapper

return decorator

0 comments on commit f0ea598

Please sign in to comment.