Skip to content

Commit

Permalink
improvement in vectorstore traces (#38)
Browse files Browse the repository at this point in the history
* improvement in haystack, llama and langchain vector store traces

Signed-off-by: hansrajr <[email protected]>
  • Loading branch information
Hansrajr authored Sep 24, 2024
1 parent 212df70 commit c713cfb
Show file tree
Hide file tree
Showing 9 changed files with 288 additions and 120 deletions.
16 changes: 16 additions & 0 deletions src/monocle_apptrace/haystack/wrap_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
_SUPPRESS_INSTRUMENTATION_KEY,
)
from monocle_apptrace.wrap_common import PROMPT_INPUT_KEY, PROMPT_OUTPUT_KEY, WORKFLOW_TYPE_MAP, with_tracer_wrapper
from monocle_apptrace.utils import set_embedding_model

logger = logging.getLogger(__name__)

Expand All @@ -17,6 +18,9 @@ def wrap(tracer, to_wrap, wrapped, instance, args, kwargs):
attach(set_value("workflow_name", name))
inputs = set()
workflow_input = get_workflow_input(args, inputs)
embedding_model = get_embedding_model(instance)
set_embedding_model(embedding_model)


with tracer.start_as_current_span(f"{name}.workflow") as span:
span.set_attribute(PROMPT_INPUT_KEY, workflow_input)
Expand Down Expand Up @@ -44,3 +48,15 @@ def get_workflow_input(args, inputs):
def set_workflow_attributes(span, workflow_name):
span.set_attribute("workflow_name",workflow_name)
span.set_attribute("workflow_type", WORKFLOW_TYPE_MAP["haystack"])

def get_embedding_model(instance):
try:
if hasattr(instance, 'get_component'):
text_embedder = instance.get_component('text_embedder')
if text_embedder and hasattr(text_embedder, 'model'):
# Set the embedding model attribute
return text_embedder.model
except:
pass

return None
20 changes: 20 additions & 0 deletions src/monocle_apptrace/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from opentelemetry.trace import Span
from monocle_apptrace.constants import azure_service_map, aws_service_map

embedding_model_context = {}

def set_span_attribute(span, name, value):
if value is not None:
if value != "":
Expand Down Expand Up @@ -71,3 +73,21 @@ def update_span_with_infra_name(span: Span, span_key: str):
for key,val in aws_service_map.items():
if key in os.environ:
span.set_attribute(span_key, val)


def set_embedding_model(model_name: str):
"""
Sets the embedding model in the global context.
@param model_name: The name of the embedding model to set
"""
embedding_model_context['embedding_model'] = model_name


def get_embedding_model() -> str:
"""
Retrieves the embedding model from the global context.
@return: The name of the embedding model, or 'unknown' if not set
"""
return embedding_model_context.get('embedding_model', 'unknown')
55 changes: 49 additions & 6 deletions src/monocle_apptrace/wrap_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from urllib.parse import urlparse

from opentelemetry.trace import Span, Tracer
from monocle_apptrace.utils import resolve_from_alias, update_span_with_infra_name, with_tracer_wrapper
from monocle_apptrace.utils import resolve_from_alias, update_span_with_infra_name, with_tracer_wrapper, get_embedding_model

logger = logging.getLogger(__name__)
WORKFLOW_TYPE_KEY = "workflow_type"
Expand All @@ -17,7 +17,10 @@
TAGS = "tags"
SESSION_PROPERTIES_KEY = "session"
INFRA_SERVICE_KEY = "infra_service_name"

TYPE = "type"
PROVIDER = "provider_name"
EMBEDDING_MODEL = "embedding_model"
VECTOR_STORE = 'vector_store'


WORKFLOW_TYPE_MAP = {
Expand All @@ -26,6 +29,24 @@
"haystack": "workflow.haystack"
}

framework_vector_store_mapping = {
'langchain_core.retrievers': lambda instance: {
'provider': instance.tags[0],
'embedding_model': instance.tags[1],
'type': VECTOR_STORE,
},
'llama_index.core.indices.base_retriever': lambda instance: {
'provider': type(instance._vector_store).__name__,
'embedding_model': instance._embed_model.model_name,
'type': VECTOR_STORE,
},
'haystack.components.retrievers': lambda instance: {
'provider': instance.__dict__.get("document_store").__class__.__name__,
'embedding_model': get_embedding_model(),
'type': VECTOR_STORE,
},
}

@with_tracer_wrapper
def task_wrapper(tracer: Tracer, to_wrap, wrapped, instance, args, kwargs):
"""Instruments and calls every function defined in TO_WRAP."""
Expand Down Expand Up @@ -66,6 +87,7 @@ def pre_task_processing(to_wrap, instance, args, span):
#capture the tags attribute of the instance if present, else ignore
try:
update_tags(instance, span)
update_vectorstore_attributes(to_wrap, instance, span)
except AttributeError:
pass
update_span_with_context_input(to_wrap=to_wrap, wrapped_args=args, span=span)
Expand Down Expand Up @@ -133,6 +155,8 @@ def llm_wrapper(tracer: Tracer, to_wrap, wrapped, instance, args, kwargs):
else:
name = f"langchain.task.{instance.__class__.__name__}"
with tracer.start_as_current_span(name) as span:
if 'haystack.components.retrievers' in to_wrap['package'] and 'haystack.retriever' in span.name:
update_vectorstore_attributes(to_wrap, instance, span)
update_llm_endpoint(curr_span= span, instance=instance)

return_value = wrapped(*args, **kwargs)
Expand All @@ -148,12 +172,12 @@ def update_llm_endpoint(curr_span: Span, instance):
if 'temperature' in instance.__dict__:
temp_val = instance.__dict__.get("temperature")
curr_span.set_attribute("temperature", temp_val)
# handling for model name
model_name = resolve_from_alias(instance.__dict__ , ["model","model_name"])
# handling for model name
model_name = resolve_from_alias(instance.__dict__ , ["model","model_name"])
curr_span.set_attribute("model_name", model_name)
set_provider_name(curr_span, instance)
# handling AzureOpenAI deployment
deployment_name = resolve_from_alias(instance.__dict__ , [ "engine", "azure_deployment",
deployment_name = resolve_from_alias(instance.__dict__ , [ "engine", "azure_deployment",
"deployment_name", "deployment_id", "deployment"])
curr_span.set_attribute("az_openai_deployment", deployment_name)
# handling the inference endpoint
Expand Down Expand Up @@ -191,7 +215,6 @@ def get_input_from_args(chain_args):
return ""

def update_span_from_llm_response(response, span: Span):

# extract token uasge from langchain openai
if (response is not None and hasattr(response, "response_metadata")):
response_metadata = response.response_metadata
Expand Down Expand Up @@ -266,3 +289,23 @@ def update_tags(instance, span):
span.set_attribute(TAGS, [model_name, vector_store_name])
except:
pass


def update_vectorstore_attributes(to_wrap, instance, span):
"""
Updates the telemetry span attributes for vector store retrieval tasks.
"""
try:
package = to_wrap.get('package')
if package in framework_vector_store_mapping:
attributes = framework_vector_store_mapping[package](instance)
span._attributes.update({
TYPE: attributes['type'],
PROVIDER: attributes['provider'],
EMBEDDING_MODEL: attributes['embedding_model']
})
else:
logger.warning(f"Package '{package}' not recognized for vector store telemetry.")

except Exception as e:
logger.error(f"Error updating span attributes: {e}")
10 changes: 7 additions & 3 deletions tests/haystack_sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def haystack_app():

haystack_app()

# {
#{
# "name": "haystack.retriever",
# "context": {
# "trace_id": "0x1db120b68e3a759882ac457b07af344f",
Expand All @@ -116,7 +116,9 @@ def haystack_app():
# "status_code": "UNSET"
# },
# "attributes": {
# "server_url": "20.237.77.237:8001"
# "type": "vector_store",
# "provider_name": "InMemoryDocumentStore",
# "embedding_model": "sentence-transformers/all-MiniLM-L6-v2"
# },
# "events": [],
# "links": [],
Expand Down Expand Up @@ -172,8 +174,10 @@ def haystack_app():
# "status_code": "UNSET"
# },
# "attributes": {
# "input": "What does Rhodes Statue look like?",
# "workflow_name": "haystack_app_1",
# "workflow_type": "workflow.haystack"
# "workflow_type": "workflow.haystack",
# "output": "The Rhodes Statue, also known as the Colossus of Rhodes, depicted the Greek sun-god Helios. It was a bronze statue standing approximately 33 meters (108 feet) tall with a standard rendering of the head and face, featuring curly hair with evenly spaced spikes of bronze or silver flame radiating. The actual appearance of the rest of the statue remains unknown, but it was considered the tallest statue in the ancient world."
# },
# "events": [],
# "links": [],
Expand Down
20 changes: 15 additions & 5 deletions tests/haystack_unitest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@


import json
import logging
import os
Expand All @@ -17,6 +16,8 @@
from monocle_apptrace.instrumentor import setup_monocle_telemetry
from monocle_apptrace.wrapper import WrapperMethod
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
from haystack.components.retrievers import InMemoryBM25Retriever
from haystack.document_stores.in_memory import InMemoryDocumentStore

logger = logging.getLogger(__name__)

Expand All @@ -36,10 +37,14 @@ def test_haystack(self, mock_post):
)
prompt_builder = DynamicChatPromptBuilder()
llm = OpenAIChatGenerator(api_key=Secret.from_token(api_key), model="gpt-4")
document_store = InMemoryDocumentStore()
retriever = InMemoryBM25Retriever(document_store=document_store)

pipe = Pipeline()
pipe.add_component("retriever", retriever)
pipe.add_component("prompt_builder", prompt_builder)
pipe.add_component("llm", llm)
pipe.connect("retriever", "prompt_builder.template_variables")
pipe.connect("prompt_builder.prompt", "llm.messages")
query = "OpenTelemetry"
messages = [ChatMessage.from_user("Tell me a joke about {{query}}")]
Expand All @@ -62,9 +67,9 @@ def test_haystack(self, mock_post):
This can be used to do more asserts'''
dataBodyStr = mock_post.call_args.kwargs['data']
logger.debug(dataBodyStr)
dataJson = json.loads(dataBodyStr) # more asserts can be added on individual fields
dataJson = json.loads(dataBodyStr) # more asserts can be added on individual fields

root_attributes = [x for x in dataJson["batch"] if x["parent_id"] == "None"][0]["attributes"]
root_attributes = [x for x in dataJson["batch"] if x["parent_id"] == "None"][0]["attributes"]
# assert root_attributes["workflow_input"] == query
# assert root_attributes["workflow_output"] == llm.dummy_response

Expand All @@ -78,6 +83,7 @@ def test_haystack(self, mock_post):

type_found = False
model_name_found = False
provider_found = False
assert root_attributes["workflow_input"] == query
assert root_attributes["workflow_output"] == TestHandler.ragText

Expand All @@ -88,9 +94,13 @@ def test_haystack(self, mock_post):
if span["name"] == "haystack.openai" and "model_name" in span["attributes"]:
assert span["attributes"]["model_name"] == "gpt-4"
model_name_found = True
if span["name"] == "haystack.retriever" and "type" in span["attributes"]:
assert span["attributes"]["provider_name"] == "InMemoryDocumentStore"
provider_found = True

assert type_found == True
assert model_name_found == True
assert type_found
assert model_name_found
assert provider_found

if __name__ == '__main__':
unittest.main()
Loading

0 comments on commit c713cfb

Please sign in to comment.