diff --git a/src/monocle_apptrace/message_processing.py b/src/monocle_apptrace/message_processing.py new file mode 100644 index 0000000..4f3b320 --- /dev/null +++ b/src/monocle_apptrace/message_processing.py @@ -0,0 +1,76 @@ +""" +This module provides utility functions for extracting system, user, +and assistant messages from various input formats. +""" + +import logging +from monocle_apptrace.utils import get_attribute +DATA_INPUT_KEY = "data.input" + +logger = logging.getLogger(__name__) +def extract_messages(args): + """Extract system and user messages""" + try: + system_message, user_message = "", "" + args_input = get_attribute(DATA_INPUT_KEY) + if args_input: + user_message = args_input + return system_message, user_message + if args and isinstance(args, tuple) and len(args) > 0: + if hasattr(args[0], "messages") and isinstance(args[0].messages, list): + for msg in args[0].messages: + if hasattr(msg, 'content') and hasattr(msg, 'type'): + if msg.type == "system": + system_message = msg.content + elif msg.type in ["user", "human"]: + user_message = msg.content + elif isinstance(args[0], list): + for msg in args[0]: + if hasattr(msg, 'content') and hasattr(msg, 'role'): + if msg.role == "system": + system_message = msg.content + elif msg.role in ["user", "human"]: + user_message = extract_query_from_content(msg.content) + return system_message, user_message + except Exception as e: + logger.warning("Warning: Error occurred in extract_messages: %s", str(e)) + return "", "" + + +def extract_assistant_message(response): + try: + if isinstance(response, str): + return response + if hasattr(response, "content"): + return response.content + if hasattr(response, "message") and hasattr(response.message, "content"): + return response.message.content + if "replies" in response: + if hasattr(response['replies'][0], 'content'): + return response['replies'][0].content + else: + return response['replies'][0] + return "" + except Exception as e: + logger.warning("Warning: Error occurred in extract_assistant_message: %s", str(e)) + return "" + + +def extract_query_from_content(content): + try: + query_prefix = "Query:" + answer_prefix = "Answer:" + query_start = content.find(query_prefix) + if query_start == -1: + return None + + query_start += len(query_prefix) + answer_start = content.find(answer_prefix, query_start) + if answer_start == -1: + query = content[query_start:].strip() + else: + query = content[query_start:answer_start].strip() + return query + except Exception as e: + logger.warning("Warning: Error occurred in extract_query_from_content: %s", str(e)) + return "" diff --git a/src/monocle_apptrace/wrap_common.py b/src/monocle_apptrace/wrap_common.py index 9a77a2d..389a886 100644 --- a/src/monocle_apptrace/wrap_common.py +++ b/src/monocle_apptrace/wrap_common.py @@ -9,6 +9,7 @@ from monocle_apptrace.utils import resolve_from_alias, with_tracer_wrapper, get_embedding_model, get_attribute, get_workflow_name, set_embedding_model, set_app_hosting_identifier_attribute from monocle_apptrace.utils import set_attribute from monocle_apptrace.utils import get_fully_qualified_class_name, get_nested_value +from monocle_apptrace.message_processing import extract_messages, extract_assistant_message logger = logging.getLogger(__name__) WORKFLOW_TYPE_KEY = "workflow_type" @@ -403,62 +404,6 @@ def update_span_from_llm_response(response, span: Span, instance): token_usage = None -def extract_messages(args): - """Extract system and user messages""" - system_message, user_message = "", "" - args_input = get_attribute(DATA_INPUT_KEY) - if args_input: - user_message = args_input - return system_message, user_message - if args and isinstance(args, tuple) and len(args) > 0: - if hasattr(args[0], "messages") and isinstance(args[0].messages, list): - for msg in args[0].messages: - if hasattr(msg, 'content') and hasattr(msg, 'type'): - if msg.type == "system": - system_message = msg.content - elif msg.type in ["user", "human"]: - user_message = msg.content - elif isinstance(args[0], list): - for msg in args[0]: - if hasattr(msg, 'content') and hasattr(msg, 'role'): - if msg.role == "system": - system_message = msg.content - elif msg.role in ["user", "human"]: - user_message = extract_query_from_content(msg.content) - return system_message, user_message - - -def extract_assistant_message(response): - if isinstance(response, str): - return response - if hasattr(response, "content"): - return response.content - if hasattr(response, "message") and hasattr(response.message, "content"): - return response.message.content - if "replies" in response: - if hasattr(response['replies'][0], 'content'): - return response['replies'][0].content - else: - return response['replies'][0] - return "" - - -def extract_query_from_content(content): - query_prefix = "Query:" - answer_prefix = "Answer:" - query_start = content.find(query_prefix) - if query_start == -1: - return None - - query_start += len(query_prefix) - answer_start = content.find(answer_prefix, query_start) - if answer_start == -1: - query = content[query_start:].strip() - else: - query = content[query_start:answer_start].strip() - return query - - def update_workflow_type(to_wrap, span: Span): package_name = to_wrap.get('package')