-
Notifications
You must be signed in to change notification settings - Fork 10
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Resolved merge conflicts and merged main into remove_tags
Signed-off-by: hansrajr <[email protected]>
- Loading branch information
Showing
9 changed files
with
619 additions
and
34 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,151 @@ | ||
import os | ||
import time | ||
import random | ||
import datetime | ||
import logging | ||
import asyncio | ||
import boto3 | ||
from botocore.exceptions import ClientError | ||
from opentelemetry.sdk.trace import ReadableSpan | ||
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult | ||
from monocle_apptrace.exporters.base_exporter import SpanExporterBase | ||
from typing import Sequence | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
class S3SpanExporter(SpanExporterBase): | ||
def __init__(self, bucket_name=None, region_name="us-east-1"): | ||
super().__init__() | ||
# Use environment variables if credentials are not provided | ||
DEFAULT_FILE_PREFIX = "monocle_trace__" | ||
DEFAULT_TIME_FORMAT = "%Y-%m-%d__%H.%M.%S" | ||
self.max_batch_size = 500 | ||
self.export_interval = 1 | ||
self.s3_client = boto3.client( | ||
's3', | ||
aws_access_key_id=os.getenv('AWS_ACCESS_KEY_ID'), | ||
aws_secret_access_key=os.getenv('AWS_SECRET_ACCESS_KEY'), | ||
region_name=region_name, | ||
) | ||
self.bucket_name = bucket_name or os.getenv('AWS_S3_BUCKET_NAME','default-bucket') | ||
self.file_prefix = DEFAULT_FILE_PREFIX | ||
self.time_format = DEFAULT_TIME_FORMAT | ||
self.export_queue = [] | ||
self.last_export_time = time.time() | ||
|
||
# Check if bucket exists or create it | ||
if not self.__bucket_exists(self.bucket_name): | ||
try: | ||
if region_name == "us-east-1": | ||
self.s3_client.create_bucket(Bucket=self.bucket_name) | ||
else: | ||
self.s3_client.create_bucket( | ||
Bucket=self.bucket_name, | ||
CreateBucketConfiguration={'LocationConstraint': region_name} | ||
) | ||
logger.info(f"Bucket {self.bucket_name} created successfully.") | ||
except ClientError as e: | ||
logger.error(f"Error creating bucket {self.bucket_name}: {e}") | ||
raise e | ||
|
||
def __bucket_exists(self, bucket_name): | ||
try: | ||
# Check if the bucket exists by calling head_bucket | ||
self.s3_client.head_bucket(Bucket=bucket_name) | ||
return True | ||
except ClientError as e: | ||
error_code = e.response['Error']['Code'] | ||
if error_code == '404': | ||
# Bucket not found | ||
logger.error(f"Bucket {bucket_name} does not exist (404).") | ||
return False | ||
elif error_code == '403': | ||
# Permission denied | ||
logger.error(f"Access to bucket {bucket_name} is forbidden (403).") | ||
raise PermissionError(f"Access to bucket {bucket_name} is forbidden.") | ||
elif error_code == '400': | ||
# Bad request or malformed input | ||
logger.error(f"Bad request for bucket {bucket_name} (400).") | ||
raise ValueError(f"Bad request for bucket {bucket_name}.") | ||
else: | ||
# Other client errors | ||
logger.error(f"Unexpected error when accessing bucket {bucket_name}: {e}") | ||
raise e | ||
except TypeError as e: | ||
# Handle TypeError separately | ||
logger.error(f"Type error while checking bucket existence: {e}") | ||
raise e | ||
|
||
def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: | ||
"""Synchronous export method that internally handles async logic.""" | ||
try: | ||
# Run the asynchronous export logic in an event loop | ||
asyncio.run(self.__export_async(spans)) | ||
return SpanExportResult.SUCCESS | ||
except Exception as e: | ||
logger.error(f"Error exporting spans: {e}") | ||
return SpanExportResult.FAILURE | ||
|
||
async def __export_async(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: | ||
try: | ||
# Add spans to the export queue | ||
for span in spans: | ||
self.export_queue.append(span) | ||
# If the queue reaches MAX_BATCH_SIZE, export the spans | ||
if len(self.export_queue) >= self.max_batch_size: | ||
await self.__export_spans() | ||
|
||
# Check if it's time to force a flush | ||
current_time = time.time() | ||
if current_time - self.last_export_time >= self.export_interval: | ||
await self.__export_spans() # Export spans if time interval has passed | ||
self.last_export_time = current_time # Reset the last export time | ||
|
||
return SpanExportResult.SUCCESS | ||
except Exception as e: | ||
logger.error(f"Error exporting spans: {e}") | ||
return SpanExportResult.FAILURE | ||
|
||
def __serialize_spans(self, spans: Sequence[ReadableSpan]) -> str: | ||
try: | ||
# Serialize spans to JSON or any other format you prefer | ||
span_data_list = [span.to_json() for span in spans] | ||
return "[" + ", ".join(span_data_list) + "]" | ||
except Exception as e: | ||
logger.error(f"Error serializing spans: {e}") | ||
raise | ||
|
||
async def __export_spans(self): | ||
if len(self.export_queue) == 0: | ||
return | ||
|
||
# Take a batch of spans from the queue | ||
batch_to_export = self.export_queue[:self.max_batch_size] | ||
serialized_data = self.__serialize_spans(batch_to_export) | ||
self.export_queue = self.export_queue[self.max_batch_size:] | ||
try: | ||
if asyncio.get_event_loop().is_running(): | ||
task = asyncio.create_task(self._retry_with_backoff(self.__upload_to_s3, serialized_data)) | ||
await task | ||
else: | ||
await self._retry_with_backoff(self.__upload_to_s3, serialized_data) | ||
|
||
except Exception as e: | ||
logger.error(f"Failed to upload span batch: {e}") | ||
|
||
def __upload_to_s3(self, span_data_batch: str): | ||
current_time = datetime.datetime.now().strftime(self.time_format) | ||
file_name = f"{self.file_prefix}{current_time}.json" | ||
self.s3_client.put_object( | ||
Bucket=self.bucket_name, | ||
Key=file_name, | ||
Body=span_data_batch | ||
) | ||
logger.info(f"Span batch uploaded to AWS S3 as {file_name}.") | ||
|
||
async def force_flush(self, timeout_millis: int = 30000) -> bool: | ||
await self.__export_spans() # Export any remaining spans in the queue | ||
return True | ||
|
||
def shutdown(self) -> None: | ||
logger.info("S3SpanExporter has been shut down.") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,121 @@ | ||
import os | ||
import time | ||
import random | ||
import datetime | ||
import logging | ||
import asyncio | ||
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient | ||
from azure.core.exceptions import ResourceNotFoundError, ClientAuthenticationError, ServiceRequestError | ||
from opentelemetry.sdk.trace import ReadableSpan | ||
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult | ||
from typing import Sequence | ||
from monocle_apptrace.exporters.base_exporter import SpanExporterBase | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
class AzureBlobSpanExporter(SpanExporterBase): | ||
def __init__(self, connection_string=None, container_name=None): | ||
super().__init__() | ||
DEFAULT_FILE_PREFIX = "monocle_trace_" | ||
DEFAULT_TIME_FORMAT = "%Y-%m-%d_%H.%M.%S" | ||
self.max_batch_size = 500 | ||
self.export_interval = 1 | ||
# Use default values if none are provided | ||
if not connection_string: | ||
connection_string = os.getenv('CONNECTION_STRING') | ||
if not connection_string: | ||
raise ValueError("Azure Storage connection string is not provided or set in environment variables.") | ||
|
||
if not container_name: | ||
container_name = os.getenv('CONTAINER_NAME', 'default-container') | ||
|
||
self.blob_service_client = BlobServiceClient.from_connection_string(connection_string) | ||
self.container_name = container_name | ||
self.file_prefix = DEFAULT_FILE_PREFIX | ||
self.time_format = DEFAULT_TIME_FORMAT | ||
|
||
# Check if container exists or create it | ||
if not self.__container_exists(container_name): | ||
try: | ||
self.blob_service_client.create_container(container_name) | ||
logger.info(f"Container {container_name} created successfully.") | ||
except Exception as e: | ||
logger.error(f"Error creating container {container_name}: {e}") | ||
raise e | ||
|
||
def __container_exists(self, container_name): | ||
try: | ||
container_client = self.blob_service_client.get_container_client(container_name) | ||
container_client.get_container_properties() | ||
return True | ||
except ResourceNotFoundError: | ||
logger.error(f"Container {container_name} not found (404).") | ||
return False | ||
except ClientAuthenticationError: | ||
logger.error(f"Access to container {container_name} is forbidden (403).") | ||
raise PermissionError(f"Access to container {container_name} is forbidden.") | ||
except Exception as e: | ||
logger.error(f"Unexpected error when checking if container {container_name} exists: {e}") | ||
raise e | ||
|
||
def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: | ||
"""Synchronous export method that internally handles async logic.""" | ||
try: | ||
# Run the asynchronous export logic in an event loop | ||
asyncio.run(self._export_async(spans)) | ||
return SpanExportResult.SUCCESS | ||
except Exception as e: | ||
logger.error(f"Error exporting spans: {e}") | ||
return SpanExportResult.FAILURE | ||
|
||
async def _export_async(self, spans: Sequence[ReadableSpan]): | ||
"""The actual async export logic is run here.""" | ||
# Add spans to the export queue | ||
for span in spans: | ||
self.export_queue.append(span) | ||
if len(self.export_queue) >= self.max_batch_size: | ||
await self.__export_spans() | ||
|
||
# Force a flush if the interval has passed | ||
current_time = time.time() | ||
if current_time - self.last_export_time >= self.export_interval: | ||
await self.__export_spans() | ||
self.last_export_time = current_time | ||
|
||
def __serialize_spans(self, spans: Sequence[ReadableSpan]) -> str: | ||
try: | ||
span_data_list = [span.to_json() for span in spans] | ||
return "[" + ", ".join(span_data_list) + "]" | ||
except Exception as e: | ||
logger.error(f"Error serializing spans: {e}") | ||
raise | ||
|
||
async def __export_spans(self): | ||
if len(self.export_queue) == 0: | ||
return | ||
|
||
batch_to_export = self.export_queue[:self.max_batch_size] | ||
serialized_data = self.__serialize_spans(batch_to_export) | ||
self.export_queue = self.export_queue[self.max_batch_size:] | ||
try: | ||
if asyncio.get_event_loop().is_running(): | ||
task = asyncio.create_task(self._retry_with_backoff(self.__upload_to_blob, serialized_data)) | ||
await task | ||
else: | ||
await self._retry_with_backoff(self.__upload_to_blob, serialized_data) | ||
except Exception as e: | ||
logger.error(f"Failed to upload span batch: {e}") | ||
|
||
def __upload_to_blob(self, span_data_batch: str): | ||
current_time = datetime.datetime.now().strftime(self.time_format) | ||
file_name = f"{self.file_prefix}{current_time}.json" | ||
blob_client = self.blob_service_client.get_blob_client(container=self.container_name, blob=file_name) | ||
blob_client.upload_blob(span_data_batch, overwrite=True) | ||
logger.info(f"Span batch uploaded to Azure Blob Storage as {file_name}.") | ||
|
||
async def force_flush(self, timeout_millis: int = 30000) -> bool: | ||
await self.__export_spans() | ||
return True | ||
|
||
def shutdown(self) -> None: | ||
logger.info("AzureBlobSpanExporter has been shut down.") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
import time | ||
import random | ||
import logging | ||
from abc import ABC, abstractmethod | ||
from azure.core.exceptions import ServiceRequestError, ClientAuthenticationError | ||
from opentelemetry.sdk.trace import ReadableSpan | ||
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult | ||
from typing import Sequence | ||
import asyncio | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
class SpanExporterBase(ABC): | ||
def __init__(self): | ||
self.backoff_factor = 2 | ||
self.max_retries = 10 | ||
self.export_queue = [] | ||
self.last_export_time = time.time() | ||
|
||
@abstractmethod | ||
async def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: | ||
pass | ||
|
||
@abstractmethod | ||
async def force_flush(self, timeout_millis: int = 30000) -> bool: | ||
pass | ||
|
||
def shutdown(self) -> None: | ||
pass | ||
|
||
async def _retry_with_backoff(self, func, *args, **kwargs): | ||
"""Handle retries with exponential backoff.""" | ||
attempt = 0 | ||
while attempt < self.max_retries: | ||
try: | ||
return func(*args, **kwargs) | ||
except ServiceRequestError as e: | ||
logger.warning(f"Network connectivity error: {e}. Retrying in {self.backoff_factor ** attempt} seconds...") | ||
sleep_time = self.backoff_factor * (2 ** attempt) + random.uniform(0, 1) | ||
await asyncio.sleep(sleep_time) | ||
attempt += 1 | ||
except ClientAuthenticationError as e: | ||
logger.error(f"Failed to authenticate: {str(e)}") | ||
break | ||
|
||
logger.error("Max retries exceeded.") | ||
raise ServiceRequestError(message="Max retries exceeded.") |
Oops, something went wrong.