Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Direct Exporter - Blob store #41

Merged
merged 11 commits into from
Oct 14, 2024
22 changes: 18 additions & 4 deletions Monocle_User_Guide.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#Monocle User Guide
# Monocle User Guide

## Monocle Concepts
### Traces
Expand All @@ -13,15 +13,24 @@ It’s typically the workflow code components of an application that generate th
```
> pip install monocle_apptrace
```
- For AWS support (to upload traces to AWS), install with the aws extra:
```
> pip install monocle_apptrace[aws]
```
- For Azure support (to upload traces to Azure), install with the azure extra:
```
> pip install monocle_apptrace[azure]
```
- You can locally build and install Monocle library from source
```
> pip install .
> pip install .
```
- Install the optional test dependencies listed against dev in pyproject.toml in editable mode
```
> pip install -e ".[dev]"
> pip install -e ".[dev]"
```


## Using Monocle with your application to generate traces
### Enable Monocle tracing
You need to import monocle package and invoke the API ``setup_monocle_telemetry(workflow=<workflow-name>)`` to enable the tracing. The 'workflow-name' is what you define to identify the give application workflow, for example "customer-chatbot". Monocle trace will include this name in every trace. The trace output will include a list of spans in the traces. You can print the output on the console or send it to an HTTP endpoint.
Expand All @@ -48,7 +57,7 @@ chain.invoke({"number":2})
# Request callbacks: Finally, let's use the request `callbacks` to achieve the same result
chain = LLMChain(llm=llm, prompt=prompt)
chain.invoke({"number":2}, {"callbacks":[handler]})

```

### Accessing monocle trace
Expand All @@ -63,6 +72,11 @@ setup_monocle_telemetry(workflow_name = "simple_math_app",
```
To print the trace on the console, use ```ConsoleSpanExporter()``` instead of ```FileSpanExporter()```

For AWS:
Install the AWS support as shown in the setup section, then use ```S3SpanExporter()``` to upload the traces to an S3 bucket.

For Azure:
Install the Azure support as shown in the setup section, then use ```AzureBlobSpanExporter()``` to upload the traces to Azure.
### Leveraging Monocle's extensibility to handle customization
When the out of box features from app frameworks are not sufficent, the app developers have to add custom code. For example, if you are extending a LLM class in LlamaIndex to use a model hosted in NVIDIA Triton. This new class is not know to Monocle. You can specify this new class method part of Monocle enabling API and it will be able to trace it.

Expand Down
7 changes: 6 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,12 @@ dev = [
'llama-index-vector-stores-chroma==0.1.9',
'parameterized==0.9.0'
]

aws = [
'boto3==1.35.19',
]
azure = [
'azure-storage-blob==12.22.0',
]
[project.urls]
Homepage = "https://github.com/monocle2ai/monocle"
Issues = "https://github.com/monocle2ai/monocle/issues"
Expand Down
153 changes: 153 additions & 0 deletions src/monocle_apptrace/exporters/aws/aws_exporter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
import os
from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
import boto3
from botocore.exceptions import EndpointConnectionError, NoCredentialsError, ClientError
import datetime
from typing import Sequence
import logging
import time
import random

# Configuration
DEFAULT_FILE_PREFIX = "monocle_trace_"
DEFAULT_TIME_FORMAT = "%Y-%m-%d_%H.%M.%S"
MAX_BATCH_SIZE = 500 # Max number of spans per batch
MAX_QUEUE_SIZE = 10000 # Max number of spans in the queue
EXPORT_INTERVAL = 1 # Maximum interval (in seconds) to wait before exporting spans
BACKOFF_FACTOR = 2
MAX_RETRIES = 10

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class S3SpanExporter(SpanExporter):
def __init__(self, aws_access_key_id=None, aws_secret_access_key=None, bucket_name=None, region_name="us-east-1"):
sachintendulkar576123 marked this conversation as resolved.
Show resolved Hide resolved
# Use environment variables if credentials are not provided
self.s3_client = boto3.client(
's3',
aws_access_key_id=aws_access_key_id or os.getenv('AWS_ACCESS_KEY_ID'),
aws_secret_access_key=aws_secret_access_key or os.getenv('AWS_SECRET_ACCESS_KEY'),
region_name=region_name
)
self.bucket_name = bucket_name or os.getenv('AWS_S3_BUCKET_NAME', 'default-bucket')
self.file_prefix = DEFAULT_FILE_PREFIX
self.time_format = DEFAULT_TIME_FORMAT
self.export_queue = []
self.last_export_time = time.time()

# Check if bucket exists or create it
if not self._bucket_exists(self.bucket_name):
try:
self.s3_client.create_bucket(
Bucket=self.bucket_name,
CreateBucketConfiguration={'LocationConstraint': region_name}
)
logger.info(f"Bucket {self.bucket_name} created successfully.")
except ClientError as e:
logger.error(f"Error creating bucket {self.bucket_name}: {e}")
raise e

def _bucket_exists(self, bucket_name):
try:
self.s3_client.head_bucket(Bucket=bucket_name)
return True
except self.s3_client.exceptions.NoSuchBucket:
return False
except Exception as e:
logger.error(f"Error checking if bucket {bucket_name} exists: {e}")
return False

def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
try:
# Add spans to the export queue
for span in spans:
self.export_queue.append(span)
# If the queue reaches MAX_BATCH_SIZE, export the spans
if len(self.export_queue) >= MAX_BATCH_SIZE:
self._export_spans()

# Check if it's time to force a flush
current_time = time.time()
if current_time - self.last_export_time >= EXPORT_INTERVAL:
self._export_spans() # Export spans if time interval has passed
self.last_export_time = current_time # Reset the last export time

return SpanExportResult.SUCCESS
except Exception as e:
logger.error(f"Error exporting spans: {e}")
return SpanExportResult.FAILURE

def _serialize_spans(self, spans: Sequence[ReadableSpan]) -> str:
try:
# Serialize spans to JSON or any other format you prefer
span_data_list = [span.to_json() for span in spans]
return "[" + ", ".join(span_data_list) + "]"
except Exception as e:
logger.error(f"Error serializing spans: {e}")
raise

def _export_spans(self):
if len(self.export_queue) == 0:
return # Nothing to export

# Take a batch of spans from the queue
batch_to_export = self.export_queue[:MAX_BATCH_SIZE]
serialized_data = self._serialize_spans(batch_to_export)
self.export_queue = self.export_queue[MAX_BATCH_SIZE:] # Remove exported spans from the queue

try:
self._upload_to_s3_with_retry(serialized_data)
except Exception as e:
logger.error(f"Failed to upload span batch: {e}")

def _upload_to_s3_with_retry(self, span_data_batch: str):
current_time = datetime.datetime.now().strftime(self.time_format)
file_name = f"{self.file_prefix}{current_time}.json"
attempt = 0

while attempt < MAX_RETRIES:
try:
self.s3_client.put_object(
Bucket=self.bucket_name,
Key=file_name,
Body=span_data_batch
)
logger.info(f"Span batch uploaded to AWS S3 as {file_name}.")
return
except EndpointConnectionError as e:
logger.warning(f"Network connectivity error: {e}. Retrying in {BACKOFF_FACTOR ** attempt} seconds...")
sleep_time = BACKOFF_FACTOR * (2 ** attempt) + random.uniform(0, 1)
time.sleep(sleep_time)
attempt += 1
except ClientError as e:
error_code = e.response.get("Error", {}).get("Code", "")
if error_code in ("RequestTimeout", "ThrottlingException", "InternalError", "ServiceUnavailable"):
logger.warning(f"Retry {attempt}/{MAX_RETRIES} failed due to network issue: {str(e)}")
else:
logger.error(f"Failed to upload trace data: {str(e)}")
break # For other types of errors, do not retry

sleep_time = BACKOFF_FACTOR * (2 ** attempt) + random.uniform(0, 1)
logger.info(f"Waiting for {sleep_time:.2f} seconds before retrying...")
time.sleep(sleep_time)
attempt += 1
except (NoCredentialsError, ClientError) as e:
logger.error(f"Error uploading spans to S3: {e}")
raise
except Exception as e:
logger.error(f"Unexpected error uploading spans to S3: {e}")
raise

logger.error("Max retries exceeded. Failed to upload spans to S3.")
raise EndpointConnectionError(endpoint_url="S3 Upload Endpoint")

def force_flush(self, timeout_millis: int = 30000) -> bool:
self._export_spans() # Export any remaining spans in the queue
return True

def shutdown(self) -> None:
logger.info("S3SpanExporter has been shut down.")

142 changes: 142 additions & 0 deletions src/monocle_apptrace/exporters/azure/azure_exporter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
import os
sachintendulkar576123 marked this conversation as resolved.
Show resolved Hide resolved
from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient
from azure.core.exceptions import ResourceNotFoundError, ClientAuthenticationError, ServiceRequestError
import datetime
from typing import Sequence
import logging
import time
import random

# Configuration
DEFAULT_FILE_PREFIX = "monocle_trace_"
DEFAULT_TIME_FORMAT = "%Y-%m-%d_%H.%M.%S"
MAX_BATCH_SIZE = 500 # Max number of spans per batch
MAX_QUEUE_SIZE = 10000 # Max number of spans in the queue
sachintendulkar576123 marked this conversation as resolved.
Show resolved Hide resolved
EXPORT_INTERVAL = 1 # Maximum interval (in seconds) to wait before exporting spans
BACKOFF_FACTOR = 2
MAX_RETRIES = 10

# Set up logging
logging.basicConfig(level=logging.INFO)
sachintendulkar576123 marked this conversation as resolved.
Show resolved Hide resolved
logger = logging.getLogger(__name__)

class AzureBlobSpanExporter(SpanExporter):
def __init__(self, connection_string=None, container_name=None):
# Use default values if none are provided
if connection_string is None:
connection_string = os.getenv('AZURE_STORAGE_CONNECTION_STRING')
if connection_string is None:
sachintendulkar576123 marked this conversation as resolved.
Show resolved Hide resolved
raise ValueError("Azure Storage connection string is not provided or set in environment variables.")

if container_name is None:
container_name = os.getenv('AZURE_STORAGE_CONTAINER_NAME','default-container') # Use default container name if not provided

self.blob_service_client = BlobServiceClient.from_connection_string(connection_string)
self.container_name = container_name
self.file_prefix = DEFAULT_FILE_PREFIX
self.time_format = DEFAULT_TIME_FORMAT
self.export_queue = []
self.last_export_time = time.time()

# Check if container exists or create it
if not self._container_exists(container_name):
try:
self.blob_service_client.create_container(container_name)
logger.info(f"Container {container_name} created successfully.")
except Exception as e:
logger.error(f"Error creating container {container_name}: {e}")
raise e

def _container_exists(self, container_name):
sachintendulkar576123 marked this conversation as resolved.
Show resolved Hide resolved
try:
container_client = self.blob_service_client.get_container_client(container_name)
container_client.get_container_properties()
return True
except ResourceNotFoundError:
return False
except Exception as e:
logger.error(f"Error checking if container {container_name} exists: {e}")
return False

def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
try:
# Add spans to the export queue
for span in spans:
self.export_queue.append(span)
# If the queue reaches MAX_BATCH_SIZE, export the spans
if len(self.export_queue) >= MAX_BATCH_SIZE:
self._export_spans()

# Check if it's time to force a flush
current_time = time.time()
if current_time - self.last_export_time >= EXPORT_INTERVAL:
self._export_spans() # Export spans if time interval has passed
self.last_export_time = current_time # Reset the last export time

return SpanExportResult.SUCCESS
except Exception as e:
logger.error(f"Error exporting spans: {e}")
return SpanExportResult.FAILURE

def _serialize_spans(self, spans: Sequence[ReadableSpan]) -> str:
try:
# Serialize spans to JSON or any other format you prefer
span_data_list = [span.to_json() for span in spans]
return "[" + ", ".join(span_data_list) + "]"
except Exception as e:
logger.error(f"Error serializing spans: {e}")
raise

def _export_spans(self):
if len(self.export_queue) == 0:
return # Nothing to export

# Take a batch of spans from the queue
batch_to_export = self.export_queue[:MAX_BATCH_SIZE]
serialized_data = self._serialize_spans(batch_to_export)
self.export_queue = self.export_queue[MAX_BATCH_SIZE:] # Remove exported spans from the queue

try:
self._upload_to_blob_with_retry(serialized_data)
except Exception as e:
logger.error(f"Failed to upload span batch: {e}")

def _upload_to_blob_with_retry(self, span_data_batch: str):
oi-raanne marked this conversation as resolved.
Show resolved Hide resolved
current_time = datetime.datetime.now().strftime(self.time_format)
file_name = f"{self.file_prefix}{current_time}.json"
attempt = 0

while attempt < MAX_RETRIES:
try:
blob_client = self.blob_service_client.get_blob_client(container=self.container_name, blob=file_name)
blob_client.upload_blob(span_data_batch, overwrite=True)
logger.info(f"Span batch uploaded to Azure Blob Storage as {file_name}.")
return
except ServiceRequestError as e:
logger.warning(f"Network connectivity error: {e}. Retrying in {BACKOFF_FACTOR ** attempt} seconds...")
sleep_time = BACKOFF_FACTOR * (2 ** attempt) + random.uniform(0, 1)
time.sleep(sleep_time)
attempt += 1
except ClientAuthenticationError as e:
logger.error(f"Failed to authenticate with Azure Blob Storage: {str(e)}")
break # Authentication errors should not be retried
except Exception as e:
logger.warning(f"Retry {attempt}/{MAX_RETRIES} failed due to: {str(e)}")
sachintendulkar576123 marked this conversation as resolved.
Show resolved Hide resolved
sleep_time = BACKOFF_FACTOR * (2 ** attempt) + random.uniform(0, 1)
logger.info(f"Waiting for {sleep_time:.2f} seconds before retrying...")
time.sleep(sleep_time)
attempt += 1

logger.error("Max retries exceeded. Failed to upload spans to Azure Blob Storage.")
raise ServiceRequestError(message="Azure Blob Upload Endpoint")
sachintendulkar576123 marked this conversation as resolved.
Show resolved Hide resolved

def force_flush(self, timeout_millis: int = 30000) -> bool:
sachintendulkar576123 marked this conversation as resolved.
Show resolved Hide resolved
self._export_spans() # Export any remaining spans in the queue
return True

def shutdown(self) -> None:
logger.info("AzureBlobSpanExporter has been shut down.")