Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Direct Exporter - Blob store #41

Merged
merged 11 commits into from
Oct 14, 2024
9 changes: 9 additions & 0 deletions Monocle_User_Guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ It’s typically the workflow code components of an application that generate th
```
> pip install monocle_apptrace
```

- For Azure support (to upload traces to Azure), install with the azure extra:
```
> pip install monocle_apptrace[azure]
```

- For AWS support (to upload traces to AWS), install with the aws extra:
```
> pip install monocle_apptrace[aws]
Expand Down Expand Up @@ -69,6 +75,9 @@ setup_monocle_telemetry(workflow_name = "simple_math_app",
```
To print the trace on the console, use ```ConsoleSpanExporter()``` instead of ```FileSpanExporter()```

For Azure:
Install the Azure support as shown in the setup section, then use ```AzureBlobSpanExporter()``` to upload the traces to Azure.

For AWS:
Install the AWS support as shown in the setup section, then use ```S3SpanExporter()``` to upload the traces to an S3 bucket.

Expand Down
5 changes: 5 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ dev = [
'llama-index-vector-stores-chroma==0.1.9',
'parameterized==0.9.0'
]

azure = [
'azure-storage-blob==12.22.0',
]

aws = [
'boto3==1.35.19',
]
Expand Down
121 changes: 121 additions & 0 deletions src/monocle_apptrace/exporters/azure/blob_exporter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import os
import time
import random
import datetime
import logging
import asyncio
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient
from azure.core.exceptions import ResourceNotFoundError, ClientAuthenticationError, ServiceRequestError
from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
from typing import Sequence
from monocle_apptrace.exporters.base_exporter import SpanExporterBase

logger = logging.getLogger(__name__)

class AzureBlobSpanExporter(SpanExporterBase):
def __init__(self, connection_string=None, container_name=None):
super().__init__()
DEFAULT_FILE_PREFIX = "monocle_trace_"
DEFAULT_TIME_FORMAT = "%Y-%m-%d_%H.%M.%S"
self.max_batch_size = 500
self.export_interval = 1
# Use default values if none are provided
if not connection_string:
connection_string = os.getenv('CONNECTION_STRING')
if not connection_string:
raise ValueError("Azure Storage connection string is not provided or set in environment variables.")

if not container_name:
container_name = os.getenv('CONTAINER_NAME', 'default-container')

self.blob_service_client = BlobServiceClient.from_connection_string(connection_string)
self.container_name = container_name
self.file_prefix = DEFAULT_FILE_PREFIX
self.time_format = DEFAULT_TIME_FORMAT

# Check if container exists or create it
if not self.__container_exists(container_name):
try:
self.blob_service_client.create_container(container_name)
logger.info(f"Container {container_name} created successfully.")
except Exception as e:
logger.error(f"Error creating container {container_name}: {e}")
raise e

def __container_exists(self, container_name):
try:
container_client = self.blob_service_client.get_container_client(container_name)
container_client.get_container_properties()
return True
except ResourceNotFoundError:
logger.error(f"Container {container_name} not found (404).")
return False
except ClientAuthenticationError:
logger.error(f"Access to container {container_name} is forbidden (403).")
raise PermissionError(f"Access to container {container_name} is forbidden.")
except Exception as e:
logger.error(f"Unexpected error when checking if container {container_name} exists: {e}")
raise e

def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
"""Synchronous export method that internally handles async logic."""
try:
# Run the asynchronous export logic in an event loop
asyncio.run(self._export_async(spans))
return SpanExportResult.SUCCESS
except Exception as e:
logger.error(f"Error exporting spans: {e}")
return SpanExportResult.FAILURE

async def _export_async(self, spans: Sequence[ReadableSpan]):
"""The actual async export logic is run here."""
# Add spans to the export queue
for span in spans:
self.export_queue.append(span)
if len(self.export_queue) >= self.max_batch_size:
await self.__export_spans()

# Force a flush if the interval has passed
current_time = time.time()
if current_time - self.last_export_time >= self.export_interval:
await self.__export_spans()
self.last_export_time = current_time

def __serialize_spans(self, spans: Sequence[ReadableSpan]) -> str:
try:
span_data_list = [span.to_json() for span in spans]
return "[" + ", ".join(span_data_list) + "]"
except Exception as e:
logger.error(f"Error serializing spans: {e}")
raise

async def __export_spans(self):
if len(self.export_queue) == 0:
return

batch_to_export = self.export_queue[:self.max_batch_size]
serialized_data = self.__serialize_spans(batch_to_export)
self.export_queue = self.export_queue[self.max_batch_size:]
try:
if asyncio.get_event_loop().is_running():
task = asyncio.create_task(self._retry_with_backoff(self.__upload_to_blob, serialized_data))
await task
else:
await self._retry_with_backoff(self.__upload_to_blob, serialized_data)
except Exception as e:
logger.error(f"Failed to upload span batch: {e}")

def __upload_to_blob(self, span_data_batch: str):
current_time = datetime.datetime.now().strftime(self.time_format)
file_name = f"{self.file_prefix}{current_time}.json"
blob_client = self.blob_service_client.get_blob_client(container=self.container_name, blob=file_name)
blob_client.upload_blob(span_data_batch, overwrite=True)
logger.info(f"Span batch uploaded to Azure Blob Storage as {file_name}.")

async def force_flush(self, timeout_millis: int = 30000) -> bool:
await self.__export_spans()
return True

def shutdown(self) -> None:
logger.info("AzureBlobSpanExporter has been shut down.")
5 changes: 4 additions & 1 deletion tests/langchain_chat_sample.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@

from multiprocessing.forkserver import connect_to_new_process

import bs4
from langchain import hub
Expand Down Expand Up @@ -27,13 +27,16 @@
region_name='us-east-1',
bucket_name='sachin-dev'
)

setup_monocle_telemetry(
workflow_name="langchain_app_1",
span_processors=[BatchSpanProcessor(exporter)],
wrapper_methods=[])


llm = ChatOpenAI(model="gpt-3.5-turbo-0125",api_key=OPENAI_API_KEY)


# Load, chunk and index the contents of the blog.
loader = WebBaseLoader(
web_paths=("https://lilianweng.github.io/posts/2023-06-23-agent/",),
Expand Down
110 changes: 110 additions & 0 deletions tests/langchain_sample_blob.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
from multiprocessing.forkserver import connect_to_new_process

import bs4
from langchain import hub
from langchain.chains import create_retrieval_chain
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain_chroma import Chroma
from langchain_community.document_loaders import WebBaseLoader
from langchain_core.messages import HumanMessage
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter
from monocle_apptrace.instrumentor import set_context_properties, setup_monocle_telemetry
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
from langhchain_patch import create_history_aware_retriever
from monocle_apptrace.exporters.azure.blob_exporter import AzureBlobSpanExporter
import os
import time
from dotenv import load_dotenv, dotenv_values
import logging
logging.basicConfig(level=logging.INFO)
load_dotenv()
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")

exporter = AzureBlobSpanExporter()
setup_monocle_telemetry(
workflow_name="langchain_app_1",
span_processors=[BatchSpanProcessor(exporter)],
wrapper_methods=[])

llm = ChatOpenAI(model="gpt-3.5-turbo-0125",openai_api_key=OPENAI_API_KEY)

# Load, chunk and index the contents of the blog.
loader = WebBaseLoader(
web_paths=("https://lilianweng.github.io/posts/2023-06-23-agent/",),
bs_kwargs=dict(
parse_only=bs4.SoupStrainer(
class_=("post-content", "post-title", "post-header")
)
),
)
docs = loader.load()

text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
splits = text_splitter.split_documents(docs)
vectorstore = Chroma.from_documents(documents=splits, embedding=OpenAIEmbeddings(openai_api_key=OPENAI_API_KEY))

# Retrieve and generate using the relevant snippets of the blog.
retriever = vectorstore.as_retriever()
prompt = hub.pull("rlm/rag-prompt")

def format_docs(docs):
return "\n\n".join(doc.page_content for doc in docs)

rag_chain = (
{"context": retriever | format_docs, "question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)

contextualize_q_system_prompt = """Given a chat history and the latest user question \
which might reference context in the chat history, formulate a standalone question \
which can be understood without the chat history. Do NOT answer the question, \
just reformulate it if needed and otherwise return it as is."""
contextualize_q_prompt = ChatPromptTemplate.from_messages(
[
("system", contextualize_q_system_prompt),
MessagesPlaceholder("chat_history"),
("human", "{input}"),
]
)
history_aware_retriever = create_history_aware_retriever(
llm, retriever, contextualize_q_prompt
)

qa_system_prompt = """You are an assistant for question-answering tasks. \
Use the following pieces of retrieved context to answer the question. \
If you don't know the answer, just say that you don't know. \
Use three sentences maximum and keep the answer concise.\

{context}"""
qa_prompt = ChatPromptTemplate.from_messages(
[
("system", qa_system_prompt),
MessagesPlaceholder("chat_history"),
("human", "{input}"),
]
)


question_answer_chain = create_stuff_documents_chain(llm, qa_prompt)

rag_chain = create_retrieval_chain(history_aware_retriever, question_answer_chain)

chat_history = []

set_context_properties({"session_id": "0x4fa6d91d1f2a4bdbb7a1287d90ec4a16"})

question = "What is Task Decomposition?"
ai_msg_1 = rag_chain.invoke({"input": question, "chat_history": chat_history})
print(ai_msg_1["answer"])
chat_history.extend([HumanMessage(content=question), ai_msg_1["answer"]])

second_question = "What are common ways of doing it?"
ai_msg_2 = rag_chain.invoke({"input": second_question, "chat_history": chat_history})

print(ai_msg_2["answer"])
Loading