diff --git a/src/monocle_apptrace/instrumentor.py b/src/monocle_apptrace/instrumentor.py index 8782133..65eb0e0 100644 --- a/src/monocle_apptrace/instrumentor.py +++ b/src/monocle_apptrace/instrumentor.py @@ -92,6 +92,7 @@ def setup_monocle_telemetry( }) span_processors = span_processors or [BatchSpanProcessor(FileSpanExporter())] trace_provider = TracerProvider(resource=resource) + attach(set_value("workflow_name", workflow_name)) tracer_provider_default = trace.get_tracer_provider() provider_type = type(tracer_provider_default).__name__ is_proxy_provider = "Proxy" in provider_type diff --git a/src/monocle_apptrace/wrap_common.py b/src/monocle_apptrace/wrap_common.py index 690cad6..ede1d54 100644 --- a/src/monocle_apptrace/wrap_common.py +++ b/src/monocle_apptrace/wrap_common.py @@ -6,7 +6,7 @@ from opentelemetry.trace import Span, Tracer from monocle_apptrace.utils import resolve_from_alias, update_span_with_infra_name, with_tracer_wrapper, get_embedding_model, get_attribute from monocle_apptrace.utils import set_attribute - +from opentelemetry.context import get_value, attach, set_value logger = logging.getLogger(__name__) WORKFLOW_TYPE_KEY = "workflow_type" DATA_INPUT_KEY = "data.input" @@ -93,8 +93,7 @@ def task_wrapper(tracer: Tracer, to_wrap, wrapped, instance, args, kwargs): name = f"langchain.task.{instance.__class__.__name__}" with tracer.start_as_current_span(name) as span: - if "output_processor" in to_wrap: - process_span(to_wrap["output_processor"], span, instance, args) + process_span(to_wrap, span, instance, args) pre_task_processing(to_wrap, instance, args, span) return_value = wrapped(*args, **kwargs) post_task_processing(to_wrap, span, return_value) @@ -102,38 +101,51 @@ def task_wrapper(tracer: Tracer, to_wrap, wrapped, instance, args, kwargs): return return_value -def process_span(output_processor, span, instance, args): +def process_span(to_wrap, span, instance, args): # Check if the output_processor is a valid JSON (in Python, that means it's a dictionary) - if isinstance(output_processor, dict) and len(output_processor) > 0: - if 'type' in output_processor: - span.set_attribute("span.type", output_processor['type']) - else: - logger.warning("type of span not found or incorrect written in entity json") - count = 0 - if 'attributes' in output_processor: - count = len(output_processor["attributes"]) - span.set_attribute("entity.count", count) - span_index = 1 - for processors in output_processor["attributes"]: - for processor in processors: - if 'attribute' in processor and 'accessor' in processor: - attribute_name = f"entity.{span_index}.{processor['attribute']}" - try: - result = eval(processor['accessor'])(instance, args) - if result and isinstance(result, str): - span.set_attribute(attribute_name, result) - except Exception as e: - pass - - else: - logger.warning("attribute or accessor not found or incorrect written in entity json") - span_index += 1 - else: - logger.warning("attributes not found or incorrect written in entity json") - span.set_attribute("span.count", count) + span_index = 1 + if is_root_span(span): + workflow_name = get_value("workflow_name") + if workflow_name: + span.set_attribute(f"entities.{span_index}.name", workflow_name) + # workflow type + package_name = to_wrap.get('package') + for (package, workflow_type) in WORKFLOW_TYPE_MAP.items(): + if (package_name is not None and package in package_name): + span.set_attribute(f"entities.{span_index}.type", workflow_type) + span_index += 1 + if 'output_processor' in to_wrap: + output_processor=to_wrap['output_processor'] + if isinstance(output_processor, dict) and len(output_processor) > 0: + if 'type' in output_processor: + span.set_attribute("span.type", output_processor['type']) + else: + logger.warning("type of span not found or incorrect written in entity json") + count = 0 + if 'attributes' in output_processor: + count = len(output_processor["attributes"]) + span.set_attribute("entity.count", count) + span_index = 1 + for processors in output_processor["attributes"]: + for processor in processors: + if 'attribute' in processor and 'accessor' in processor: + attribute_name = f"entity.{span_index}.{processor['attribute']}" + try: + result = eval(processor['accessor'])(instance, args) + if result and isinstance(result, str): + span.set_attribute(attribute_name, result) + except Exception as e: + pass + + else: + logger.warning("attribute or accessor not found or incorrect written in entity json") + span_index += 1 + else: + logger.warning("attributes not found or incorrect written in entity json") + span.set_attribute("span.count", count) - else: - logger.warning("empty or entities json is not in correct format") + else: + logger.warning("empty or entities json is not in correct format") def post_task_processing(to_wrap, span, return_value): @@ -149,9 +161,6 @@ def post_task_processing(to_wrap, span, return_value): def pre_task_processing(to_wrap, instance, args, span): try: if is_root_span(span): - workflow_name = span.resource.attributes.get("service.name") - span.set_attribute("workflow_name", workflow_name) - update_workflow_type(to_wrap, span) update_span_with_prompt_input(to_wrap=to_wrap, wrapped_args=args, span=span) update_span_with_infra_name(span, INFRA_SERVICE_KEY) @@ -175,8 +184,7 @@ async def atask_wrapper(tracer, to_wrap, wrapped, instance, args, kwargs): else: name = f"langchain.task.{instance.__class__.__name__}" with tracer.start_as_current_span(name) as span: - if "output_processor" in to_wrap: - process_span(to_wrap["output_processor"], span, instance, args) + process_span(to_wrap["output_processor"], span, instance, args) pre_task_processing(to_wrap, instance, args, span) return_value = await wrapped(*args, **kwargs) post_task_processing(to_wrap, span, return_value) @@ -205,8 +213,8 @@ async def allm_wrapper(tracer, to_wrap, wrapped, instance, args, kwargs): span.add_event(DATA_INPUT_KEY, {QUERY: input_arg_text}) provider_name = set_provider_name(instance) instance_args = {"provider_name": provider_name} - if 'output_processor' in to_wrap: - process_span(to_wrap['output_processor'], span, instance, instance_args) + + process_span(to_wrap['output_processor'], span, instance, instance_args) return_value = await wrapped(*args, **kwargs) if 'haystack.components.retrievers' in to_wrap['package'] and 'haystack.retriever' in span.name: @@ -238,8 +246,8 @@ def llm_wrapper(tracer: Tracer, to_wrap, wrapped, instance, args, kwargs): span.add_event(DATA_INPUT_KEY, {QUERY: input_arg_text}) provider_name = set_provider_name(instance) instance_args = {"provider_name": provider_name} - if 'output_processor' in to_wrap: - process_span(to_wrap['output_processor'], span, instance, instance_args) + + process_span(to_wrap['output_processor'], span, instance, instance_args) return_value = wrapped(*args, **kwargs) if 'haystack.components.retrievers' in to_wrap['package'] and 'haystack.retriever' in span.name: