Skip to content

Commit

Permalink
update workflow name and type with new format
Browse files Browse the repository at this point in the history
Signed-off-by: sachintendulkar576123 <[email protected]>
  • Loading branch information
sachintendulkar576123 committed Oct 21, 2024
1 parent e321ef9 commit 3c0ea97
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 42 deletions.
1 change: 1 addition & 0 deletions src/monocle_apptrace/instrumentor.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ def setup_monocle_telemetry(
})
span_processors = span_processors or [BatchSpanProcessor(FileSpanExporter())]
trace_provider = TracerProvider(resource=resource)
attach(set_value("workflow_name", workflow_name))
tracer_provider_default = trace.get_tracer_provider()
provider_type = type(tracer_provider_default).__name__
is_proxy_provider = "Proxy" in provider_type
Expand Down
92 changes: 50 additions & 42 deletions src/monocle_apptrace/wrap_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from opentelemetry.trace import Span, Tracer
from monocle_apptrace.utils import resolve_from_alias, update_span_with_infra_name, with_tracer_wrapper, get_embedding_model, get_attribute
from monocle_apptrace.utils import set_attribute

from opentelemetry.context import get_value, attach, set_value
logger = logging.getLogger(__name__)
WORKFLOW_TYPE_KEY = "workflow_type"
DATA_INPUT_KEY = "data.input"
Expand Down Expand Up @@ -93,47 +93,59 @@ def task_wrapper(tracer: Tracer, to_wrap, wrapped, instance, args, kwargs):
name = f"langchain.task.{instance.__class__.__name__}"

with tracer.start_as_current_span(name) as span:
if "output_processor" in to_wrap:
process_span(to_wrap["output_processor"], span, instance, args)
process_span(to_wrap, span, instance, args)
pre_task_processing(to_wrap, instance, args, span)
return_value = wrapped(*args, **kwargs)
post_task_processing(to_wrap, span, return_value)

return return_value


def process_span(output_processor, span, instance, args):
def process_span(to_wrap, span, instance, args):
# Check if the output_processor is a valid JSON (in Python, that means it's a dictionary)
if isinstance(output_processor, dict) and len(output_processor) > 0:
if 'type' in output_processor:
span.set_attribute("span.type", output_processor['type'])
else:
logger.warning("type of span not found or incorrect written in entity json")
count = 0
if 'attributes' in output_processor:
count = len(output_processor["attributes"])
span.set_attribute("entity.count", count)
span_index = 1
for processors in output_processor["attributes"]:
for processor in processors:
if 'attribute' in processor and 'accessor' in processor:
attribute_name = f"entity.{span_index}.{processor['attribute']}"
try:
result = eval(processor['accessor'])(instance, args)
if result and isinstance(result, str):
span.set_attribute(attribute_name, result)
except Exception as e:
pass

else:
logger.warning("attribute or accessor not found or incorrect written in entity json")
span_index += 1
else:
logger.warning("attributes not found or incorrect written in entity json")
span.set_attribute("span.count", count)
span_index = 1
if is_root_span(span):
workflow_name = get_value("workflow_name")
if workflow_name:
span.set_attribute(f"entities.{span_index}.name", workflow_name)
# workflow type
package_name = to_wrap.get('package')
for (package, workflow_type) in WORKFLOW_TYPE_MAP.items():
if (package_name is not None and package in package_name):
span.set_attribute(f"entities.{span_index}.type", workflow_type)
span_index += 1
if 'output_processor' in to_wrap:
output_processor=to_wrap['output_processor']
if isinstance(output_processor, dict) and len(output_processor) > 0:
if 'type' in output_processor:
span.set_attribute("span.type", output_processor['type'])
else:
logger.warning("type of span not found or incorrect written in entity json")
count = 0
if 'attributes' in output_processor:
count = len(output_processor["attributes"])
span.set_attribute("entity.count", count)
span_index = 1
for processors in output_processor["attributes"]:
for processor in processors:
if 'attribute' in processor and 'accessor' in processor:
attribute_name = f"entity.{span_index}.{processor['attribute']}"
try:
result = eval(processor['accessor'])(instance, args)
if result and isinstance(result, str):
span.set_attribute(attribute_name, result)
except Exception as e:
pass

else:
logger.warning("attribute or accessor not found or incorrect written in entity json")
span_index += 1
else:
logger.warning("attributes not found or incorrect written in entity json")
span.set_attribute("span.count", count)

else:
logger.warning("empty or entities json is not in correct format")
else:
logger.warning("empty or entities json is not in correct format")


def post_task_processing(to_wrap, span, return_value):
Expand All @@ -149,9 +161,6 @@ def post_task_processing(to_wrap, span, return_value):
def pre_task_processing(to_wrap, instance, args, span):
try:
if is_root_span(span):
workflow_name = span.resource.attributes.get("service.name")
span.set_attribute("workflow_name", workflow_name)
update_workflow_type(to_wrap, span)
update_span_with_prompt_input(to_wrap=to_wrap, wrapped_args=args, span=span)
update_span_with_infra_name(span, INFRA_SERVICE_KEY)

Expand All @@ -175,8 +184,7 @@ async def atask_wrapper(tracer, to_wrap, wrapped, instance, args, kwargs):
else:
name = f"langchain.task.{instance.__class__.__name__}"
with tracer.start_as_current_span(name) as span:
if "output_processor" in to_wrap:
process_span(to_wrap["output_processor"], span, instance, args)
process_span(to_wrap["output_processor"], span, instance, args)
pre_task_processing(to_wrap, instance, args, span)
return_value = await wrapped(*args, **kwargs)
post_task_processing(to_wrap, span, return_value)
Expand Down Expand Up @@ -205,8 +213,8 @@ async def allm_wrapper(tracer, to_wrap, wrapped, instance, args, kwargs):
span.add_event(DATA_INPUT_KEY, {QUERY: input_arg_text})
provider_name = set_provider_name(instance)
instance_args = {"provider_name": provider_name}
if 'output_processor' in to_wrap:
process_span(to_wrap['output_processor'], span, instance, instance_args)

process_span(to_wrap['output_processor'], span, instance, instance_args)

return_value = await wrapped(*args, **kwargs)
if 'haystack.components.retrievers' in to_wrap['package'] and 'haystack.retriever' in span.name:
Expand Down Expand Up @@ -238,8 +246,8 @@ def llm_wrapper(tracer: Tracer, to_wrap, wrapped, instance, args, kwargs):
span.add_event(DATA_INPUT_KEY, {QUERY: input_arg_text})
provider_name = set_provider_name(instance)
instance_args = {"provider_name": provider_name}
if 'output_processor' in to_wrap:
process_span(to_wrap['output_processor'], span, instance, instance_args)

process_span(to_wrap['output_processor'], span, instance, instance_args)

return_value = wrapped(*args, **kwargs)
if 'haystack.components.retrievers' in to_wrap['package'] and 'haystack.retriever' in span.name:
Expand Down

0 comments on commit 3c0ea97

Please sign in to comment.