diff --git a/src/monocle_apptrace/exporters/aws/s3_exporter.py b/src/monocle_apptrace/exporters/aws/s3_exporter.py index f33fb3d..d9445b1 100644 --- a/src/monocle_apptrace/exporters/aws/s3_exporter.py +++ b/src/monocle_apptrace/exporters/aws/s3_exporter.py @@ -6,6 +6,13 @@ import asyncio import boto3 from botocore.exceptions import ClientError +from botocore.exceptions import ( + BotoCoreError, + ConnectionClosedError, + ConnectTimeoutError, + EndpointConnectionError, + ReadTimeoutError, +) from opentelemetry.sdk.trace import ReadableSpan from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult from monocle_apptrace.exporters.base_exporter import SpanExporterBase @@ -14,7 +21,7 @@ logger = logging.getLogger(__name__) class S3SpanExporter(SpanExporterBase): - def __init__(self, bucket_name=None, region_name="us-east-1"): + def __init__(self, bucket_name=None, region_name=None): super().__init__() # Use environment variables if credentials are not provided DEFAULT_FILE_PREFIX = "monocle_trace__" @@ -36,13 +43,10 @@ def __init__(self, bucket_name=None, region_name="us-east-1"): # Check if bucket exists or create it if not self.__bucket_exists(self.bucket_name): try: - if region_name == "us-east-1": - self.s3_client.create_bucket(Bucket=self.bucket_name) - else: - self.s3_client.create_bucket( - Bucket=self.bucket_name, - CreateBucketConfiguration={'LocationConstraint': region_name} - ) + self.s3_client.create_bucket( + Bucket=self.bucket_name, + CreateBucketConfiguration={'LocationConstraint': region_name} + ) logger.info(f"Bucket {self.bucket_name} created successfully.") except ClientError as e: logger.error(f"Error creating bucket {self.bucket_name}: {e}") @@ -131,15 +135,11 @@ async def __export_spans(self): serialized_data = self.__serialize_spans(batch_to_export) self.export_queue = self.export_queue[self.max_batch_size:] try: - if asyncio.get_event_loop().is_running(): - task = asyncio.create_task(self._retry_with_backoff(self.__upload_to_s3, serialized_data)) - await task - else: - await self._retry_with_backoff(self.__upload_to_s3, serialized_data) - + self.__upload_to_s3(serialized_data) except Exception as e: logger.error(f"Failed to upload span batch: {e}") + @SpanExporterBase.retry_with_backoff(exceptions=(EndpointConnectionError, ConnectionClosedError, ReadTimeoutError, ConnectTimeoutError)) def __upload_to_s3(self, span_data_batch: str): current_time = datetime.datetime.now().strftime(self.time_format) file_name = f"{self.file_prefix}{current_time}.ndjson" diff --git a/src/monocle_apptrace/exporters/azure/blob_exporter.py b/src/monocle_apptrace/exporters/azure/blob_exporter.py index 75ffe79..54f2a61 100644 --- a/src/monocle_apptrace/exporters/azure/blob_exporter.py +++ b/src/monocle_apptrace/exporters/azure/blob_exporter.py @@ -105,14 +105,11 @@ async def __export_spans(self): serialized_data = self.__serialize_spans(batch_to_export) self.export_queue = self.export_queue[self.max_batch_size:] try: - if asyncio.get_event_loop().is_running(): - task = asyncio.create_task(self._retry_with_backoff(self.__upload_to_blob, serialized_data)) - await task - else: - await self._retry_with_backoff(self.__upload_to_blob, serialized_data) + self.__upload_to_blob(serialized_data) except Exception as e: logger.error(f"Failed to upload span batch: {e}") + @SpanExporterBase.retry_with_backoff(exceptions=(ResourceNotFoundError, ClientAuthenticationError, ServiceRequestError)) def __upload_to_blob(self, span_data_batch: str): current_time = datetime.datetime.now().strftime(self.time_format) file_name = f"{self.file_prefix}{current_time}.ndjson" diff --git a/src/monocle_apptrace/exporters/base_exporter.py b/src/monocle_apptrace/exporters/base_exporter.py index 10db5d0..112b871 100644 --- a/src/monocle_apptrace/exporters/base_exporter.py +++ b/src/monocle_apptrace/exporters/base_exporter.py @@ -2,7 +2,6 @@ import random import logging from abc import ABC, abstractmethod -from azure.core.exceptions import ServiceRequestError, ClientAuthenticationError from opentelemetry.sdk.trace import ReadableSpan from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult from typing import Sequence @@ -28,20 +27,22 @@ async def force_flush(self, timeout_millis: int = 30000) -> bool: def shutdown(self) -> None: pass - async def _retry_with_backoff(self, func, *args, **kwargs): - """Handle retries with exponential backoff.""" - attempt = 0 - while attempt < self.max_retries: - try: - return func(*args, **kwargs) - except ServiceRequestError as e: - logger.warning(f"Network connectivity error: {e}. Retrying in {self.backoff_factor ** attempt} seconds...") - sleep_time = self.backoff_factor * (2 ** attempt) + random.uniform(0, 1) - await asyncio.sleep(sleep_time) - attempt += 1 - except ClientAuthenticationError as e: - logger.error(f"Failed to authenticate: {str(e)}") - break - - logger.error("Max retries exceeded.") - raise ServiceRequestError(message="Max retries exceeded.") \ No newline at end of file + @staticmethod + def retry_with_backoff(retries=3, backoff_in_seconds=1, max_backoff_in_seconds=32, exceptions=(Exception,)): + def decorator(func): + def wrapper(*args, **kwargs): + attempt = 0 + while attempt < retries: + try: + return func(*args, **kwargs) + except exceptions as e: + attempt += 1 + sleep_time = min(max_backoff_in_seconds, backoff_in_seconds * (2 ** (attempt - 1))) + sleep_time = sleep_time * (1 + random.uniform(-0.1, 0.1)) # Add jitter + logger.warning(f"Network connectivity error, Attempt {attempt} failed: {e}. Retrying in {sleep_time:.2f} seconds...") + time.sleep(sleep_time) + raise Exception(f"Failed after {retries} attempts") + + return wrapper + + return decorator \ No newline at end of file