Skip to content

Commit

Permalink
Merge pull request #61 from beehyv/ndjson_format_for_exporters
Browse files Browse the repository at this point in the history
Ndjson format for S3 and Blob exporters
  • Loading branch information
kshitiz-okahu authored Oct 22, 2024
2 parents 2b5112f + 0bb528c commit bd6b9f2
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 26 deletions.
19 changes: 13 additions & 6 deletions src/monocle_apptrace/exporters/aws/s3_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
from monocle_apptrace.exporters.base_exporter import SpanExporterBase
from typing import Sequence

import json
logger = logging.getLogger(__name__)

class S3SpanExporter(SpanExporterBase):
Expand Down Expand Up @@ -109,11 +109,18 @@ async def __export_async(self, spans: Sequence[ReadableSpan]) -> SpanExportResul
def __serialize_spans(self, spans: Sequence[ReadableSpan]) -> str:
try:
# Serialize spans to JSON or any other format you prefer
span_data_list = [span.to_json() for span in spans]
return "[" + ", ".join(span_data_list) + "]"
valid_json_list = []
for span in spans:
try:
valid_json_list.append(span.to_json(indent=0).replace("\n", ""))
except json.JSONDecodeError as e:
logger.warning(f"Invalid JSON format in span data: {span.context.span_id}. Error: {e}")
continue
ndjson_data = "\n".join(valid_json_list) + "\n"
return ndjson_data
except Exception as e:
logger.error(f"Error serializing spans: {e}")
raise
logger.warning(f"Error serializing spans: {e}")


async def __export_spans(self):
if len(self.export_queue) == 0:
Expand All @@ -135,7 +142,7 @@ async def __export_spans(self):

def __upload_to_s3(self, span_data_batch: str):
current_time = datetime.datetime.now().strftime(self.time_format)
file_name = f"{self.file_prefix}{current_time}.json"
file_name = f"{self.file_prefix}{current_time}.ndjson"
self.s3_client.put_object(
Bucket=self.bucket_name,
Key=file_name,
Expand Down
19 changes: 13 additions & 6 deletions src/monocle_apptrace/exporters/azure/blob_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
from typing import Sequence
from monocle_apptrace.exporters.base_exporter import SpanExporterBase

import json
logger = logging.getLogger(__name__)

class AzureBlobSpanExporter(SpanExporterBase):
Expand Down Expand Up @@ -84,11 +84,18 @@ async def _export_async(self, spans: Sequence[ReadableSpan]):

def __serialize_spans(self, spans: Sequence[ReadableSpan]) -> str:
try:
span_data_list = [span.to_json() for span in spans]
return "[" + ", ".join(span_data_list) + "]"
valid_json_list = []
for span in spans:
try:
valid_json_list.append(span.to_json(indent=0).replace("\n", ""))
except json.JSONDecodeError as e:
logger.warning(f"Invalid JSON format in span data: {span.context.span_id}. Error: {e}")
continue

ndjson_data = "\n".join(valid_json_list) + "\n"
return ndjson_data
except Exception as e:
logger.error(f"Error serializing spans: {e}")
raise
logger.warning(f"Error serializing spans: {e}")

async def __export_spans(self):
if len(self.export_queue) == 0:
Expand All @@ -108,7 +115,7 @@ async def __export_spans(self):

def __upload_to_blob(self, span_data_batch: str):
current_time = datetime.datetime.now().strftime(self.time_format)
file_name = f"{self.file_prefix}{current_time}.json"
file_name = f"{self.file_prefix}{current_time}.ndjson"
blob_client = self.blob_service_client.get_blob_client(container=self.container_name, blob=file_name)
blob_client.upload_blob(span_data_batch, overwrite=True)
logger.info(f"Span batch uploaded to Azure Blob Storage as {file_name}.")
Expand Down
37 changes: 37 additions & 0 deletions tests/azblob_ndjson_validation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import json
import pytest
from azure.storage.blob import BlobServiceClient
from azure.core.exceptions import ResourceNotFoundError

class TestAzureBlobNDJSONFormat:

@pytest.fixture
def blob_service_client(self):
connection_string = ""
client = BlobServiceClient.from_connection_string(connection_string)
return client

@pytest.fixture
def blob_info(self):
return {
'container_name': 'sachin',
'blob_name': 'xx.ndjson'
}

def test_blob_ndjson_format(self, blob_service_client, blob_info):
try:
blob_client = blob_service_client.get_blob_client(container=blob_info['container_name'], blob=blob_info['blob_name'])
blob_data = blob_client.download_blob().readall().decode('utf-8')
lines = blob_data.strip().split("\n")
for line in lines:
try:
json_obj = json.loads(line)
assert isinstance(json_obj, dict), f"Line is not a valid JSON object: {line}"
except json.JSONDecodeError:
raise AssertionError(f"Line is not valid JSON: {line}")

except ResourceNotFoundError:
raise AssertionError(f"Blob {blob_info['blob_name']} not found in container {blob_info['container_name']}")

if __name__ == '__main__':
pytest.main()
27 changes: 17 additions & 10 deletions tests/langchain_sample_blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,16 @@
import logging
logging.basicConfig(level=logging.INFO)
load_dotenv()
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")

os.environ["OPENAI_API_KEY"] = " "
os.environ['CONNECTION_STRING'] = ""
os.environ['CONTAINER_NAME'] = ""
exporter = AzureBlobSpanExporter()
setup_monocle_telemetry(
workflow_name="langchain_app_1",
span_processors=[BatchSpanProcessor(exporter)],
wrapper_methods=[])

llm = ChatOpenAI(model="gpt-3.5-turbo-0125",openai_api_key=OPENAI_API_KEY)
llm = ChatOpenAI(model="gpt-3.5-turbo-0125")

# Load, chunk and index the contents of the blog.
loader = WebBaseLoader(
Expand All @@ -45,7 +46,7 @@

text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
splits = text_splitter.split_documents(docs)
vectorstore = Chroma.from_documents(documents=splits, embedding=OpenAIEmbeddings(openai_api_key=OPENAI_API_KEY))
vectorstore = Chroma.from_documents(documents=splits, embedding=OpenAIEmbeddings())

# Retrieve and generate using the relevant snippets of the blog.
retriever = vectorstore.as_retriever()
Expand Down Expand Up @@ -102,9 +103,15 @@ def format_docs(docs):
question = "What is Task Decomposition?"
ai_msg_1 = rag_chain.invoke({"input": question, "chat_history": chat_history})
print(ai_msg_1["answer"])
chat_history.extend([HumanMessage(content=question), ai_msg_1["answer"]])

second_question = "What are common ways of doing it?"
ai_msg_2 = rag_chain.invoke({"input": second_question, "chat_history": chat_history})

print(ai_msg_2["answer"])
# chat_history.extend([HumanMessage(content=question), ai_msg_1["answer"]])
#
# second_question = "What are common ways of doing it?"
# ai_msg_2 = rag_chain.invoke({"input": second_question, "chat_history": chat_history})
#
# print(ai_msg_2["answer"])

#ndjson format stored on blob

# {"name": "langchain.task.VectorStoreRetriever", "context": {"trace_id": "0x94f6a3d96b14ec831b7f9a3545130fe5", "span_id": "0x447091e285b1da17", "trace_state": "[]"}, "kind": "SpanKind.INTERNAL", "parent_id": "0xa6807c63a68b1cbd", "start_time": "2024-10-22T06:35:07.925768Z", "end_time": "2024-10-22T06:35:08.610434Z", "status": {"status_code": "UNSET"}, "attributes": {"session.session_id": "0x4fa6d91d1f2a4bdbb7a1287d90ec4a16", "span.type": "retrieval", "entity.count": 2, "entity.1.name": "Chroma", "entity.1.type": "vectorstore.Chroma", "entity.2.name": "text-embedding-ada-002", "entity.2.type": "model.embedding.text-embedding-ada-002"}, "events": [{"name": "data.input", "timestamp": "2024-10-22T06:35:07.925905Z", "attributes": {"question": "What is Task Decomposition?"}}, {"name": "data.output", "timestamp": "2024-10-22T06:35:08.610419Z", "attributes": {"response": "Fig. 1. Overview of a LLM-powered autonomous agent system.\nComponent One: Planning#\nA complicated ta..."}}], "links": [], "resource": {"attributes": {"service.name": "langchain_app_1"}, "schema_url": ""}}
# {"name": "langchain.workflow", "context": {"trace_id": "0x94f6a3d96b14ec831b7f9a3545130fe5", "span_id": "0xa6807c63a68b1cbd", "trace_state": "[]"}, "kind": "SpanKind.INTERNAL", "parent_id": "0x23eea25b1a5abbd5", "start_time": "2024-10-22T06:35:07.925206Z", "end_time": "2024-10-22T06:35:08.610466Z", "status": {"status_code": "UNSET"}, "attributes": {"session.session_id": "0x4fa6d91d1f2a4bdbb7a1287d90ec4a16"}, "events": [], "links": [], "resource": {"attributes": {"service.name": "langchain_app_1"}, "schema_url": ""}}
#
16 changes: 12 additions & 4 deletions tests/langchain_sample_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@
import logging
logging.basicConfig(level=logging.INFO)
import os
import time
from dotenv import load_dotenv, dotenv_values
load_dotenv()
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
os.environ["OPENAI_API_KEY"] = ""
os.environ['AWS_ACCESS_KEY_ID'] = ''
os.environ['AWS_SECRET_ACCESS_KEY'] = ''
exporter = S3SpanExporter(
region_name='us-east-1',
bucket_name='sachin-dev'
Expand All @@ -32,7 +33,7 @@
span_processors=[BatchSpanProcessor(exporter)],
wrapper_methods=[])

llm = ChatOpenAI(model="gpt-3.5-turbo-0125",api_key=OPENAI_API_KEY)
llm = ChatOpenAI(model="gpt-3.5-turbo-0125")

# Load, chunk and index the contents of the blog.
loader = WebBaseLoader(
Expand All @@ -47,7 +48,7 @@

text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
splits = text_splitter.split_documents(docs)
vectorstore = Chroma.from_documents(documents=splits, embedding=OpenAIEmbeddings(api_key=OPENAI_API_KEY))
vectorstore = Chroma.from_documents(documents=splits, embedding=OpenAIEmbeddings())

# Retrieve and generate using the relevant snippets of the blog.
retriever = vectorstore.as_retriever()
Expand Down Expand Up @@ -111,3 +112,10 @@ def format_docs(docs):
ai_msg_2 = rag_chain.invoke({"input": second_question, "chat_history": chat_history})

print(ai_msg_2["answer"])


#ndjson format stored in s3_bucket

# {"name": "langchain.task.ChatOpenAI", "context": {"trace_id": "0x5b964bc8323611c33bedfb2ba1c02297", "span_id": "0xef1a5270c100927d", "trace_state": "[]"}, "kind": "SpanKind.INTERNAL", "parent_id": "0x0ea09995ad209078", "start_time": "2024-10-22T06:29:20.705616Z", "end_time": "2024-10-22T06:29:22.488604Z", "status": {"status_code": "UNSET"}, "attributes": {"session.session_id": "0x4fa6d91d1f2a4bdbb7a1287d90ec4a16", "span.type": "inference", "entity.count": 2, "entity.1.type": "inference.azure_oai", "entity.1.provider_name": "api.openai.com", "entity.2.name": "gpt-3.5-turbo-0125", "entity.2.type": "model.llm.gpt-3.5-turbo-0125"}, "events": [{"name": "metadata", "timestamp": "2024-10-22T06:29:22.488587Z", "attributes": {"temperature": 0.7, "completion_tokens": 82, "prompt_tokens": 580, "total_tokens": 662}}], "links": [], "resource": {"attributes": {"service.name": "langchain_app_1"}, "schema_url": ""}}
# {"name": "langchain.task.StrOutputParser", "context": {"trace_id": "0x5b964bc8323611c33bedfb2ba1c02297", "span_id": "0x4ffc0cd2351f4560", "trace_state": "[]"}, "kind": "SpanKind.INTERNAL", "parent_id": "0x0ea09995ad209078", "start_time": "2024-10-22T06:29:22.488731Z", "end_time": "2024-10-22T06:29:22.488930Z", "status": {"status_code": "UNSET"}, "attributes": {"session.session_id": "0x4fa6d91d1f2a4bdbb7a1287d90ec4a16"}, "events": [], "links": [], "resource": {"attributes": {"service.name": "langchain_app_1"}, "schema_url": ""}}
#
46 changes: 46 additions & 0 deletions tests/s3_ndjson_validation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import json
import boto3
from botocore.exceptions import NoCredentialsError
import pytest


class TestS3NDJSONFormat:

@pytest.fixture
def s3_client(self):
client = boto3.client(
's3',
aws_access_key_id='',
aws_secret_access_key='',
region_name='us-east-1'
)
return client

@pytest.fixture
def bucket_info(self):
return {
'bucket_name': 'sachin-dev',
's3_file_key': 'xx.ndjson'
}

def test_s3_ndjson_format(self, s3_client, bucket_info):
try:
response = s3_client.get_object(Bucket=bucket_info['bucket_name'], Key=bucket_info['s3_file_key'])
file_content = response['Body'].read().decode('utf-8')

lines = file_content.strip().split("\n")
for line in lines:
try:
json_obj = json.loads(line)
assert isinstance(json_obj, dict), f"Line is not a valid JSON object: {line}"
except json.JSONDecodeError:
raise AssertionError(f"Line is not valid JSON: {line}")

except NoCredentialsError:
raise AssertionError("AWS credentials not available")
except Exception as e:
raise AssertionError(f"Test failed with error: {e}")


if __name__ == '__main__':
pytest.main()

0 comments on commit bd6b9f2

Please sign in to comment.