Skip to content

Commit

Permalink
updated async and custom output processor testcase for metamodel (#58)
Browse files Browse the repository at this point in the history
* updated async and custom output processor testcase for metamodel

Signed-off-by: sachintendulkar576123 <[email protected]>
  • Loading branch information
sachintendulkar576123 authored Oct 21, 2024
1 parent cbcf21c commit e321ef9
Show file tree
Hide file tree
Showing 7 changed files with 454 additions and 47 deletions.
14 changes: 9 additions & 5 deletions src/monocle_apptrace/metamodel/maps/langchain_methods.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,24 @@
"object": "BaseChatModel",
"method": "ainvoke",
"wrapper_package": "wrap_common",
"wrapper_method": "allm_wrapper"
"wrapper_method": "allm_wrapper",
"output_processor": ["metamodel/maps/attributes/inference/langchain_entities.json"]
},
{
"package": "langchain_core.language_models.llms",
"object": "LLM",
"method": "_generate",
"wrapper_package": "wrap_common",
"wrapper_method": "llm_wrapper"
"wrapper_method": "llm_wrapper",
"output_processor": ["metamodel/maps/attributes/inference/langchain_entities.json"]
},
{
"package": "langchain_core.language_models.llms",
"object": "LLM",
"method": "_agenerate",
"wrapper_package": "wrap_common",
"wrapper_method": "llm_wrapper"
"wrapper_method": "allm_wrapper",
"output_processor": ["metamodel/maps/attributes/inference/langchain_entities.json"]
},
{
"package": "langchain_core.retrievers",
Expand All @@ -57,7 +60,8 @@
"object": "BaseRetriever",
"method": "ainvoke",
"wrapper_package": "wrap_common",
"wrapper_method": "atask_wrapper"
"wrapper_method": "atask_wrapper",
"output_processor": ["metamodel/maps/attributes/retrieval/langchain_entities.json"]
},
{
"package": "langchain.schema",
Expand Down Expand Up @@ -106,4 +110,4 @@
"wrapper_method": "atask_wrapper"
}
]
}
}
14 changes: 9 additions & 5 deletions src/monocle_apptrace/metamodel/maps/llamaindex_methods.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
"method": "aretrieve",
"span_name": "llamaindex.retrieve",
"wrapper_package": "wrap_common",
"wrapper_method": "atask_wrapper"
"wrapper_method": "atask_wrapper",
"output_processor": ["metamodel/maps/attributes/retrieval/llamaindex_entities.json"]
},
{
"package": "llama_index.core.base.base_query_engine",
Expand All @@ -39,15 +40,17 @@
"method": "chat",
"span_name": "llamaindex.llmchat",
"wrapper_package": "wrap_common",
"wrapper_method": "task_wrapper"
"wrapper_method": "task_wrapper",
"output_processor": ["metamodel/maps/attributes/inference/llamaindex_entities.json"]
},
{
"package": "llama_index.core.llms.custom",
"object": "CustomLLM",
"method": "achat",
"span_name": "llamaindex.llmchat",
"wrapper_package": "wrap_common",
"wrapper_method": "atask_wrapper"
"wrapper_method": "atask_wrapper",
"output_processor": ["metamodel/maps/attributes/inference/llamaindex_entities.json"]
},
{
"package": "llama_index.llms.openai.base",
Expand All @@ -64,7 +67,8 @@
"method": "achat",
"span_name": "llamaindex.openai",
"wrapper_package": "wrap_common",
"wrapper_method": "allm_wrapper"
"wrapper_method": "allm_wrapper",
"output_processor": ["metamodel/maps/attributes/inference/llamaindex_entities.json"]
}
]
}
}
9 changes: 6 additions & 3 deletions src/monocle_apptrace/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ def load_output_processor(wrapper_method, attributes_config_base_path):
logger.info(f'Output processor file path is: {output_processor_file_path}')

if isinstance(output_processor_file_path, str) and output_processor_file_path: # Combined condition
absolute_file_path = os.path.join(attributes_config_base_path, output_processor_file_path)
if not attributes_config_base_path:
absolute_file_path = os.path.abspath(output_processor_file_path)
else:
absolute_file_path = os.path.join(attributes_config_base_path, output_processor_file_path)

logger.info(f'Absolute file path is: {absolute_file_path}')
try:
Expand Down Expand Up @@ -107,7 +110,7 @@ def process_wrapper_method_config(
wrapper_method["span_name_getter"] = get_wrapper_method(
wrapper_method["span_name_getter_package"],
wrapper_method["span_name_getter_method"])
if "output_processor" in wrapper_method:
if "output_processor" in wrapper_method and wrapper_method["output_processor"]:
load_output_processor(wrapper_method, attributes_config_base_path)

def get_wrapper_method(package_name: str, method_name: str):
Expand Down Expand Up @@ -158,4 +161,4 @@ def get_attribute(key: str) -> str:
Returns:
The value associated with the given key.
"""
return get_value(key)
return get_value(key)
90 changes: 56 additions & 34 deletions src/monocle_apptrace/wrap_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ def get_embedding_model_for_vectorstore(instance):
},
}


@with_tracer_wrapper
def task_wrapper(tracer: Tracer, to_wrap, wrapped, instance, args, kwargs):
"""Instruments and calls every function defined in TO_WRAP."""
Expand All @@ -93,40 +94,46 @@ def task_wrapper(tracer: Tracer, to_wrap, wrapped, instance, args, kwargs):

with tracer.start_as_current_span(name) as span:
if "output_processor" in to_wrap:
process_span(to_wrap["output_processor"],span,instance,args)
process_span(to_wrap["output_processor"], span, instance, args)
pre_task_processing(to_wrap, instance, args, span)
return_value = wrapped(*args, **kwargs)
post_task_processing(to_wrap, span, return_value)

return return_value

def process_span(output_processor,span,instance,args):
# Check if the output_processor is a valid JSON (in Python, that means it's a dictionary)
if isinstance(output_processor, dict) and len(output_processor)>0:
if 'type' in output_processor:
span.set_attribute("span.type", output_processor['type'])
else:
logger.warning("type of span not found or incorrect written in entity json")
count=0
if 'attributes' in output_processor:
count = len(output_processor["attributes"])
span.set_attribute("entity.count", count)
span_index = 1
for processors in output_processor["attributes"]:
for processor in processors:
if 'attribute' in processor and 'accessor' in processor:
attribute_name = f"entity.{span_index}.{processor['attribute']}"
result = eval(processor['accessor'])(instance, args)
span.set_attribute(attribute_name, result)
else:
logger.warning("attribute or accessor not found or incorrect written in entity json")
span_index += 1
else:
logger.warning("attributes not found or incorrect written in entity json")
span.set_attribute("span.count", count)

def process_span(output_processor, span, instance, args):
# Check if the output_processor is a valid JSON (in Python, that means it's a dictionary)
if isinstance(output_processor, dict) and len(output_processor) > 0:
if 'type' in output_processor:
span.set_attribute("span.type", output_processor['type'])
else:
logger.warning("type of span not found or incorrect written in entity json")
count = 0
if 'attributes' in output_processor:
count = len(output_processor["attributes"])
span.set_attribute("entity.count", count)
span_index = 1
for processors in output_processor["attributes"]:
for processor in processors:
if 'attribute' in processor and 'accessor' in processor:
attribute_name = f"entity.{span_index}.{processor['attribute']}"
try:
result = eval(processor['accessor'])(instance, args)
if result and isinstance(result, str):
span.set_attribute(attribute_name, result)
except Exception as e:
pass

else:
logger.warning("attribute or accessor not found or incorrect written in entity json")
span_index += 1
else:
logger.warning("empty or entities json is not in correct format")
logger.warning("attributes not found or incorrect written in entity json")
span.set_attribute("span.count", count)

else:
logger.warning("empty or entities json is not in correct format")


def post_task_processing(to_wrap, span, return_value):
Expand All @@ -138,6 +145,7 @@ def post_task_processing(to_wrap, span, return_value):
except:
logger.exception("exception in post_task_processing")


def pre_task_processing(to_wrap, instance, args, span):
try:
if is_root_span(span):
Expand All @@ -150,7 +158,7 @@ def pre_task_processing(to_wrap, instance, args, span):
update_span_with_context_input(to_wrap=to_wrap, wrapped_args=args, span=span)
except:
logger.exception("exception in pre_task_processing")


@with_tracer_wrapper
async def atask_wrapper(tracer, to_wrap, wrapped, instance, args, kwargs):
Expand All @@ -167,6 +175,8 @@ async def atask_wrapper(tracer, to_wrap, wrapped, instance, args, kwargs):
else:
name = f"langchain.task.{instance.__class__.__name__}"
with tracer.start_as_current_span(name) as span:
if "output_processor" in to_wrap:
process_span(to_wrap["output_processor"], span, instance, args)
pre_task_processing(to_wrap, instance, args, span)
return_value = await wrapped(*args, **kwargs)
post_task_processing(to_wrap, span, return_value)
Expand All @@ -190,11 +200,18 @@ async def allm_wrapper(tracer, to_wrap, wrapped, instance, args, kwargs):
else:
name = f"langchain.task.{instance.__class__.__name__}"
with tracer.start_as_current_span(name) as span:
update_llm_endpoint(curr_span=span, instance=instance)
if 'haystack.components.retrievers' in to_wrap['package'] and 'haystack.retriever' in span.name:
input_arg_text = get_attribute(DATA_INPUT_KEY)
span.add_event(DATA_INPUT_KEY, {QUERY: input_arg_text})
provider_name = set_provider_name(instance)
instance_args = {"provider_name": provider_name}
if 'output_processor' in to_wrap:
process_span(to_wrap['output_processor'], span, instance, instance_args)

return_value = await wrapped(*args, **kwargs)

update_span_from_llm_response(response = return_value, span = span, instance=instance)
if 'haystack.components.retrievers' in to_wrap['package'] and 'haystack.retriever' in span.name:
update_span_with_context_output(to_wrap=to_wrap, return_value=return_value, span=span)
update_span_from_llm_response(response=return_value, span=span, instance=instance)

return return_value

Expand Down Expand Up @@ -227,7 +244,7 @@ def llm_wrapper(tracer: Tracer, to_wrap, wrapped, instance, args, kwargs):
return_value = wrapped(*args, **kwargs)
if 'haystack.components.retrievers' in to_wrap['package'] and 'haystack.retriever' in span.name:
update_span_with_context_output(to_wrap=to_wrap, return_value=return_value, span=span)
update_span_from_llm_response(response = return_value, span = span,instance=instance)
update_span_from_llm_response(response=return_value, span=span, instance=instance)

return return_value

Expand Down Expand Up @@ -261,6 +278,7 @@ def update_llm_endpoint(curr_span: Span, instance):
inference_endpoint=inference_ep
)


def set_provider_name(instance):
provider_url = ""

Expand Down Expand Up @@ -293,7 +311,8 @@ def get_input_from_args(chain_args):
return chain_args[0]
return ""

def update_span_from_llm_response(response, span: Span ,instance):

def update_span_from_llm_response(response, span: Span, instance):
# extract token uasge from langchain openai
if (response is not None and hasattr(response, "response_metadata")):
response_metadata = response.response_metadata
Expand Down Expand Up @@ -326,6 +345,7 @@ def update_span_from_llm_response(response, span: Span ,instance):
except AttributeError:
token_usage = None


def update_workflow_type(to_wrap, span: Span):
package_name = to_wrap.get('package')

Expand All @@ -346,6 +366,7 @@ def update_span_with_context_input(to_wrap, wrapped_args, span: Span):
if input_arg_text:
span.add_event(DATA_INPUT_KEY, {QUERY: input_arg_text})


def update_span_with_context_output(to_wrap, return_value, span: Span):
package_name: str = to_wrap.get('package')
output_arg_text = ""
Expand All @@ -362,6 +383,7 @@ def update_span_with_context_output(to_wrap, return_value, span: Span):
if output_arg_text:
span.add_event(DATA_OUTPUT_KEY, {RESPONSE: output_arg_text})


def update_span_with_prompt_input(to_wrap, wrapped_args, span: Span):
input_arg_text = wrapped_args[0]

Expand All @@ -370,12 +392,12 @@ def update_span_with_prompt_input(to_wrap, wrapped_args, span: Span):
else:
span.add_event(PROMPT_INPUT_KEY, {QUERY: input_arg_text})


def update_span_with_prompt_output(to_wrap, wrapped_args, span: Span):
package_name: str = to_wrap.get('package')
if isinstance(wrapped_args, str):
span.add_event(PROMPT_OUTPUT_KEY, {RESPONSE: wrapped_args})
if isinstance(wrapped_args, dict):
span.add_event(PROMPT_OUTPUT_KEY, wrapped_args)
if "llama_index.core.base.base_query_engine" in package_name:
span.add_event(PROMPT_OUTPUT_KEY, {RESPONSE:wrapped_args.response})

span.add_event(PROMPT_OUTPUT_KEY, {RESPONSE: wrapped_args.response})
17 changes: 17 additions & 0 deletions tests/entities.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"type": "retrieval",
"attributes": [
[
{
"_comment": "vector store name and type",
"attribute": "name",
"accessor": "lambda instance,args: type(instance.vectorstore).__name__"
},
{
"attribute": "type",
"accessor": "lambda instance,args: 'vectorstore.'+type(instance.vectorstore).__name__"
}
]

]
}
Loading

0 comments on commit e321ef9

Please sign in to comment.