Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ndjson format for S3 and Blob exporters #61

Merged
merged 5 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions src/monocle_apptrace/exporters/aws/s3_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
from monocle_apptrace.exporters.base_exporter import SpanExporterBase
from typing import Sequence

import json
logger = logging.getLogger(__name__)

class S3SpanExporter(SpanExporterBase):
Expand Down Expand Up @@ -109,11 +109,18 @@ async def __export_async(self, spans: Sequence[ReadableSpan]) -> SpanExportResul
def __serialize_spans(self, spans: Sequence[ReadableSpan]) -> str:
try:
# Serialize spans to JSON or any other format you prefer
span_data_list = [span.to_json() for span in spans]
return "[" + ", ".join(span_data_list) + "]"
valid_json_list = []
Hansrajr marked this conversation as resolved.
Show resolved Hide resolved
for span in spans:
try:
valid_json_list.append(span.to_json(indent=0).replace("\n", ""))
except json.JSONDecodeError as e:
logger.warning(f"Invalid JSON format in span data: {span.context.span_id}. Error: {e}")
continue
ndjson_data = "\n".join(valid_json_list) + "\n"
return ndjson_data
except Exception as e:
logger.error(f"Error serializing spans: {e}")
raise
logger.warning(f"Error serializing spans: {e}")


async def __export_spans(self):
if len(self.export_queue) == 0:
Expand All @@ -135,7 +142,7 @@ async def __export_spans(self):

def __upload_to_s3(self, span_data_batch: str):
current_time = datetime.datetime.now().strftime(self.time_format)
file_name = f"{self.file_prefix}{current_time}.json"
file_name = f"{self.file_prefix}{current_time}.ndjson"
self.s3_client.put_object(
Bucket=self.bucket_name,
Key=file_name,
Expand Down
19 changes: 13 additions & 6 deletions src/monocle_apptrace/exporters/azure/blob_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
from typing import Sequence
from monocle_apptrace.exporters.base_exporter import SpanExporterBase

import json
logger = logging.getLogger(__name__)

class AzureBlobSpanExporter(SpanExporterBase):
Expand Down Expand Up @@ -84,11 +84,18 @@ async def _export_async(self, spans: Sequence[ReadableSpan]):

def __serialize_spans(self, spans: Sequence[ReadableSpan]) -> str:
try:
span_data_list = [span.to_json() for span in spans]
return "[" + ", ".join(span_data_list) + "]"
valid_json_list = []
for span in spans:
try:
valid_json_list.append(span.to_json(indent=0).replace("\n", ""))
except json.JSONDecodeError as e:
logger.warning(f"Invalid JSON format in span data: {span.context.span_id}. Error: {e}")
continue

ndjson_data = "\n".join(valid_json_list) + "\n"
return ndjson_data
except Exception as e:
logger.error(f"Error serializing spans: {e}")
raise
logger.warning(f"Error serializing spans: {e}")

async def __export_spans(self):
if len(self.export_queue) == 0:
Expand All @@ -108,7 +115,7 @@ async def __export_spans(self):

def __upload_to_blob(self, span_data_batch: str):
current_time = datetime.datetime.now().strftime(self.time_format)
file_name = f"{self.file_prefix}{current_time}.json"
file_name = f"{self.file_prefix}{current_time}.ndjson"
blob_client = self.blob_service_client.get_blob_client(container=self.container_name, blob=file_name)
blob_client.upload_blob(span_data_batch, overwrite=True)
logger.info(f"Span batch uploaded to Azure Blob Storage as {file_name}.")
Expand Down
37 changes: 37 additions & 0 deletions tests/azblob_ndjson_validation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import json
import pytest
from azure.storage.blob import BlobServiceClient
from azure.core.exceptions import ResourceNotFoundError

class TestAzureBlobNDJSONFormat:

@pytest.fixture
def blob_service_client(self):
connection_string = ""
client = BlobServiceClient.from_connection_string(connection_string)
return client

@pytest.fixture
def blob_info(self):
return {
'container_name': 'sachin',
'blob_name': 'xx.ndjson'
}

def test_blob_ndjson_format(self, blob_service_client, blob_info):
try:
blob_client = blob_service_client.get_blob_client(container=blob_info['container_name'], blob=blob_info['blob_name'])
blob_data = blob_client.download_blob().readall().decode('utf-8')
lines = blob_data.strip().split("\n")
for line in lines:
try:
json_obj = json.loads(line)
assert isinstance(json_obj, dict), f"Line is not a valid JSON object: {line}"
except json.JSONDecodeError:
raise AssertionError(f"Line is not valid JSON: {line}")

except ResourceNotFoundError:
raise AssertionError(f"Blob {blob_info['blob_name']} not found in container {blob_info['container_name']}")

if __name__ == '__main__':
pytest.main()
27 changes: 17 additions & 10 deletions tests/langchain_sample_blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,16 @@
import logging
logging.basicConfig(level=logging.INFO)
load_dotenv()
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")

os.environ["OPENAI_API_KEY"] = " "
os.environ['CONNECTION_STRING'] = ""
os.environ['CONTAINER_NAME'] = ""
exporter = AzureBlobSpanExporter()
setup_monocle_telemetry(
workflow_name="langchain_app_1",
span_processors=[BatchSpanProcessor(exporter)],
wrapper_methods=[])

llm = ChatOpenAI(model="gpt-3.5-turbo-0125",openai_api_key=OPENAI_API_KEY)
llm = ChatOpenAI(model="gpt-3.5-turbo-0125")

# Load, chunk and index the contents of the blog.
loader = WebBaseLoader(
Expand All @@ -45,7 +46,7 @@

text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
splits = text_splitter.split_documents(docs)
vectorstore = Chroma.from_documents(documents=splits, embedding=OpenAIEmbeddings(openai_api_key=OPENAI_API_KEY))
vectorstore = Chroma.from_documents(documents=splits, embedding=OpenAIEmbeddings())

# Retrieve and generate using the relevant snippets of the blog.
retriever = vectorstore.as_retriever()
Expand Down Expand Up @@ -102,9 +103,15 @@ def format_docs(docs):
question = "What is Task Decomposition?"
ai_msg_1 = rag_chain.invoke({"input": question, "chat_history": chat_history})
print(ai_msg_1["answer"])
chat_history.extend([HumanMessage(content=question), ai_msg_1["answer"]])

second_question = "What are common ways of doing it?"
ai_msg_2 = rag_chain.invoke({"input": second_question, "chat_history": chat_history})

print(ai_msg_2["answer"])
# chat_history.extend([HumanMessage(content=question), ai_msg_1["answer"]])
#
# second_question = "What are common ways of doing it?"
# ai_msg_2 = rag_chain.invoke({"input": second_question, "chat_history": chat_history})
#
# print(ai_msg_2["answer"])

#ndjson format stored on blob

# {"name": "langchain.task.VectorStoreRetriever", "context": {"trace_id": "0x94f6a3d96b14ec831b7f9a3545130fe5", "span_id": "0x447091e285b1da17", "trace_state": "[]"}, "kind": "SpanKind.INTERNAL", "parent_id": "0xa6807c63a68b1cbd", "start_time": "2024-10-22T06:35:07.925768Z", "end_time": "2024-10-22T06:35:08.610434Z", "status": {"status_code": "UNSET"}, "attributes": {"session.session_id": "0x4fa6d91d1f2a4bdbb7a1287d90ec4a16", "span.type": "retrieval", "entity.count": 2, "entity.1.name": "Chroma", "entity.1.type": "vectorstore.Chroma", "entity.2.name": "text-embedding-ada-002", "entity.2.type": "model.embedding.text-embedding-ada-002"}, "events": [{"name": "data.input", "timestamp": "2024-10-22T06:35:07.925905Z", "attributes": {"question": "What is Task Decomposition?"}}, {"name": "data.output", "timestamp": "2024-10-22T06:35:08.610419Z", "attributes": {"response": "Fig. 1. Overview of a LLM-powered autonomous agent system.\nComponent One: Planning#\nA complicated ta..."}}], "links": [], "resource": {"attributes": {"service.name": "langchain_app_1"}, "schema_url": ""}}
# {"name": "langchain.workflow", "context": {"trace_id": "0x94f6a3d96b14ec831b7f9a3545130fe5", "span_id": "0xa6807c63a68b1cbd", "trace_state": "[]"}, "kind": "SpanKind.INTERNAL", "parent_id": "0x23eea25b1a5abbd5", "start_time": "2024-10-22T06:35:07.925206Z", "end_time": "2024-10-22T06:35:08.610466Z", "status": {"status_code": "UNSET"}, "attributes": {"session.session_id": "0x4fa6d91d1f2a4bdbb7a1287d90ec4a16"}, "events": [], "links": [], "resource": {"attributes": {"service.name": "langchain_app_1"}, "schema_url": ""}}
#
16 changes: 12 additions & 4 deletions tests/langchain_sample_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@
import logging
logging.basicConfig(level=logging.INFO)
import os
import time
from dotenv import load_dotenv, dotenv_values
load_dotenv()
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
os.environ["OPENAI_API_KEY"] = ""
os.environ['AWS_ACCESS_KEY_ID'] = ''
os.environ['AWS_SECRET_ACCESS_KEY'] = ''
exporter = S3SpanExporter(
region_name='us-east-1',
bucket_name='sachin-dev'
Expand All @@ -32,7 +33,7 @@
span_processors=[BatchSpanProcessor(exporter)],
wrapper_methods=[])

llm = ChatOpenAI(model="gpt-3.5-turbo-0125",api_key=OPENAI_API_KEY)
llm = ChatOpenAI(model="gpt-3.5-turbo-0125")

# Load, chunk and index the contents of the blog.
loader = WebBaseLoader(
Expand All @@ -47,7 +48,7 @@

text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
splits = text_splitter.split_documents(docs)
vectorstore = Chroma.from_documents(documents=splits, embedding=OpenAIEmbeddings(api_key=OPENAI_API_KEY))
vectorstore = Chroma.from_documents(documents=splits, embedding=OpenAIEmbeddings())

# Retrieve and generate using the relevant snippets of the blog.
retriever = vectorstore.as_retriever()
Expand Down Expand Up @@ -111,3 +112,10 @@ def format_docs(docs):
ai_msg_2 = rag_chain.invoke({"input": second_question, "chat_history": chat_history})

print(ai_msg_2["answer"])


#ndjson format stored in s3_bucket

# {"name": "langchain.task.ChatOpenAI", "context": {"trace_id": "0x5b964bc8323611c33bedfb2ba1c02297", "span_id": "0xef1a5270c100927d", "trace_state": "[]"}, "kind": "SpanKind.INTERNAL", "parent_id": "0x0ea09995ad209078", "start_time": "2024-10-22T06:29:20.705616Z", "end_time": "2024-10-22T06:29:22.488604Z", "status": {"status_code": "UNSET"}, "attributes": {"session.session_id": "0x4fa6d91d1f2a4bdbb7a1287d90ec4a16", "span.type": "inference", "entity.count": 2, "entity.1.type": "inference.azure_oai", "entity.1.provider_name": "api.openai.com", "entity.2.name": "gpt-3.5-turbo-0125", "entity.2.type": "model.llm.gpt-3.5-turbo-0125"}, "events": [{"name": "metadata", "timestamp": "2024-10-22T06:29:22.488587Z", "attributes": {"temperature": 0.7, "completion_tokens": 82, "prompt_tokens": 580, "total_tokens": 662}}], "links": [], "resource": {"attributes": {"service.name": "langchain_app_1"}, "schema_url": ""}}
Hansrajr marked this conversation as resolved.
Show resolved Hide resolved
# {"name": "langchain.task.StrOutputParser", "context": {"trace_id": "0x5b964bc8323611c33bedfb2ba1c02297", "span_id": "0x4ffc0cd2351f4560", "trace_state": "[]"}, "kind": "SpanKind.INTERNAL", "parent_id": "0x0ea09995ad209078", "start_time": "2024-10-22T06:29:22.488731Z", "end_time": "2024-10-22T06:29:22.488930Z", "status": {"status_code": "UNSET"}, "attributes": {"session.session_id": "0x4fa6d91d1f2a4bdbb7a1287d90ec4a16"}, "events": [], "links": [], "resource": {"attributes": {"service.name": "langchain_app_1"}, "schema_url": ""}}
#
46 changes: 46 additions & 0 deletions tests/s3_ndjson_validation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import json
import boto3
from botocore.exceptions import NoCredentialsError
import pytest


class TestS3NDJSONFormat:

@pytest.fixture
def s3_client(self):
client = boto3.client(
's3',
aws_access_key_id='',
aws_secret_access_key='',
region_name='us-east-1'
)
return client

@pytest.fixture
def bucket_info(self):
return {
'bucket_name': 'sachin-dev',
's3_file_key': 'xx.ndjson'
}

def test_s3_ndjson_format(self, s3_client, bucket_info):
try:
response = s3_client.get_object(Bucket=bucket_info['bucket_name'], Key=bucket_info['s3_file_key'])
file_content = response['Body'].read().decode('utf-8')

lines = file_content.strip().split("\n")
for line in lines:
try:
json_obj = json.loads(line)
assert isinstance(json_obj, dict), f"Line is not a valid JSON object: {line}"
except json.JSONDecodeError:
raise AssertionError(f"Line is not valid JSON: {line}")

except NoCredentialsError:
raise AssertionError("AWS credentials not available")
except Exception as e:
raise AssertionError(f"Test failed with error: {e}")


if __name__ == '__main__':
pytest.main()
Loading