Skip to content

Commit

Permalink
Merge pull request #22 from monocle2ai/kshitiz/add_traces_updates_1
Browse files Browse the repository at this point in the history
improvement in traces
  • Loading branch information
kshitiz-okahu authored Aug 6, 2024
2 parents 3f79f15 + d2c6c41 commit f766fe9
Show file tree
Hide file tree
Showing 7 changed files with 348 additions and 27 deletions.
2 changes: 1 addition & 1 deletion src/monocle_apptrace/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,4 @@ def load_wrapper_from_config(config_file_path:str, module_name:str=None):

def get_wrapper_method(package_name: str, method_name: str):
wrapper_module = import_module("monocle_apptrace." + package_name)
return getattr(wrapper_module, method_name)
return getattr(wrapper_module, method_name)
89 changes: 72 additions & 17 deletions src/monocle_apptrace/wrap_common.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import logging
import os
from urllib.parse import urlparse

from opentelemetry.trace import Span, Tracer

from monocle_apptrace.utils import resolve_from_alias, with_tracer_wrapper

logger = logging.getLogger(__name__)
Expand All @@ -17,6 +17,7 @@
CONTEXT_PROPERTIES_KEY = "workflow_context_properties"



WORKFLOW_TYPE_MAP = {
"llama_index": "workflow.llamaindex",
"langchain": "workflow.langchain",
Expand All @@ -40,25 +41,33 @@ def task_wrapper(tracer: Tracer, to_wrap, wrapped, instance, args, kwargs):
kind = to_wrap.get("kind")

with tracer.start_as_current_span(name) as span:
if is_root_span(span):
update_span_with_prompt_input(to_wrap=to_wrap, wrapped_args=args, span=span)
pre_task_processing(to_wrap, instance, args, span)
return_value = wrapped(*args, **kwargs)
post_task_processing(to_wrap, span, return_value)

return return_value

def post_task_processing(to_wrap, span, return_value):
update_span_with_context_output(to_wrap=to_wrap, return_value=return_value, span=span)

if is_root_span(span):
workflow_name = span.resource.attributes.get("service.name")
span.set_attribute("workflow_name",workflow_name)
update_span_with_prompt_output(to_wrap=to_wrap, wrapped_args=return_value, span=span)
update_workflow_type(to_wrap, span)

def pre_task_processing(to_wrap, instance, args, span):
if is_root_span(span):
update_span_with_prompt_input(to_wrap=to_wrap, wrapped_args=args, span=span)

#capture the tags attribute of the instance if present, else ignore
try:
span.set_attribute(TAGS, getattr(instance, TAGS))
except AttributeError:
pass
update_span_with_context_input(to_wrap=to_wrap, wrapped_args=args, span=span)
return_value = wrapped(*args, **kwargs)
update_span_with_context_output(to_wrap=to_wrap, return_value=return_value, span=span)
try:
update_tags(instance, span)
except AttributeError:
pass
update_span_with_context_input(to_wrap=to_wrap, wrapped_args=args, span=span)

if is_root_span(span):
workflow_name = span.resource.attributes.get("service.name")
span.set_attribute("workflow_name",workflow_name)
update_span_with_prompt_output(to_wrap=to_wrap, wrapped_args=return_value, span=span)
update_workflow_type(to_wrap, span)

return return_value

@with_tracer_wrapper
async def atask_wrapper(tracer, to_wrap, wrapped, instance, args, kwargs):
Expand All @@ -76,7 +85,9 @@ async def atask_wrapper(tracer, to_wrap, wrapped, instance, args, kwargs):
name = f"langchain.task.{instance.__class__.__name__}"
kind = to_wrap.get("kind")
with tracer.start_as_current_span(name) as span:
pre_task_processing(to_wrap, instance, args, span)
return_value = await wrapped(*args, **kwargs)
post_task_processing(to_wrap, span, return_value)

return return_value

Expand All @@ -86,7 +97,7 @@ async def allm_wrapper(tracer, to_wrap, wrapped, instance, args, kwargs):
if instance.__class__.__name__ in ("AgentExecutor"):
return wrapped(*args, **kwargs)

if to_wrap.get("span_name_getter"):
if callable(to_wrap.get("span_name_getter")):
name = to_wrap.get("span_name_getter")(instance)

elif hasattr(instance, "name") and instance.name:
Expand All @@ -100,6 +111,7 @@ async def allm_wrapper(tracer, to_wrap, wrapped, instance, args, kwargs):
update_llm_endpoint(curr_span= span, instance=instance)

return_value = await wrapped(*args, **kwargs)
update_span_from_llm_response(response = return_value, span = span)

return return_value

Expand Down Expand Up @@ -138,7 +150,10 @@ def update_llm_endpoint(curr_span: Span, instance):
curr_span.set_attribute("temperature", temp_val)
# handling for model name
model_name = resolve_from_alias(instance.__dict__ , ["model","model_name"])
# TODO: remove openai_model_name after discussion
curr_span.set_attribute("openai_model_name", model_name)
curr_span.set_attribute("model_name", model_name)
set_provider_name(curr_span, instance)
# handling AzureOpenAI deployment
deployment_name = resolve_from_alias(instance.__dict__ , [ "engine", "azure_deployment",
"deployment_name", "deployment_id", "deployment"])
Expand All @@ -147,6 +162,28 @@ def update_llm_endpoint(curr_span: Span, instance):
inference_ep = resolve_from_alias(instance.__dict__,["azure_endpoint","api_base"])
curr_span.set_attribute("inference_endpoint",inference_ep)

def set_provider_name(curr_span, instance):
provider_url = ""

try :
if type(instance.client._client.base_url.host) == str :
provider_url = instance.client._client.base_url.host
except:
pass

try :
if type(instance.api_base) == str:
provider_url = instance.api_base
except:
pass

try :
if len(provider_url) > 0:
parsed_provider_url = urlparse(provider_url)
curr_span.set_attribute("provider_name", parsed_provider_url.hostname or provider_url)
except:
pass

def is_root_span(curr_span: Span) -> bool:
return curr_span.parent == None

Expand Down Expand Up @@ -212,3 +249,21 @@ def update_span_with_prompt_output(to_wrap, wrapped_args ,span: Span):
span.add_event(PROMPT_OUTPUT_KEY, {RESPONSE:wrapped_args})
if("llama_index.core.base.base_query_engine" in package_name):
span.add_event(PROMPT_OUTPUT_KEY, {RESPONSE:wrapped_args.response})

def update_tags(instance, span):

try:
# copy tags as is from langchain
span.set_attribute(TAGS, getattr(instance, TAGS))
except:
pass

try:
# extract embed model and vector store names for llamaindex
model_name = instance.retriever._embed_model.model_name
vector_store_name = type(instance.retriever._vector_store).__name__
span.set_attribute(TAGS, [model_name, vector_store_name])
except:
pass


63 changes: 63 additions & 0 deletions tests/fake_list_llm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from langchain_core.language_models.llms import LLM
from typing import Optional

from typing import Any, List, Mapping


class FakeListLLM(LLM):
"""Fake LLM for testing purposes."""

responses: List[str]
"""List of responses to return in order."""
# This parameter should be removed from FakeListLLM since
# it's only used by sub-classes.
sleep: Optional[float] = None
"""Sleep time in seconds between responses.
Ignored by FakeListLLM, but used by sub-classes.
"""
i: int = 0
"""Internally incremented after every model invocation.
Useful primarily for testing purposes.
"""
api_base = ""

@property
def _llm_type(self) -> str:
"""Return type of llm."""
return "fake-list"

def _call(
self,
prompt: str,
stop: Optional[List[str]] = None,
run_manager = None,
**kwargs: Any,
) -> str:
"""Return next response"""
response = self.responses[self.i]
if self.i < len(self.responses) - 1:
self.i += 1
else:
self.i = 0
return response

async def _acall(
self,
prompt: str,
stop: Optional[List[str]] = None,
run_manager = None,
**kwargs: Any,
) -> str:
"""Return next response"""
response = self.responses[self.i]
if self.i < len(self.responses) - 1:
self.i += 1
else:
self.i = 0
return response

@property
def _identifying_params(self) -> Mapping[str, Any]:
return {"responses": self.responses}
Loading

0 comments on commit f766fe9

Please sign in to comment.