diff --git a/.github/workflows/prepare-release-branch.yml b/.github/workflows/prepare-release-branch.yml index 74497d6..ab12a4e 100644 --- a/.github/workflows/prepare-release-branch.yml +++ b/.github/workflows/prepare-release-branch.yml @@ -1,9 +1,6 @@ # name: Prepare release branch on: - push: - branches: - - release_test_temp workflow_dispatch: inputs: prerelease_version: @@ -24,7 +21,7 @@ jobs: env: PRERELEASE_VERSION: ${{ github.event.inputs.prerelease_version }} run: | - if [[ $GITHUB_REF_NAME != release_test_temp ]]; then + if [[ $GITHUB_REF_NAME != main ]]; then echo this workflow should only be run against main exit 1 fi diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index a980f27..5c5d860 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -83,12 +83,12 @@ jobs: run: | twine upload --repository testpypi --skip-existing --verbose dist/* - # - name: Publish to PyPI - # env: - # TWINE_USERNAME: '__token__' - # TWINE_PASSWORD: ${{ secrets.PYPI_TOKEN }} - # run: | - # twine upload --skip-existing --verbose dist/* + - name: Publish to PyPI + env: + TWINE_USERNAME: '__token__' + TWINE_PASSWORD: ${{ secrets.PYPI_TOKEN }} + run: | + twine upload --skip-existing --verbose dist/* - name: Generate release notes env: diff --git a/CHANGELOG.md b/CHANGELOG.md index 7b9c31b..933cf58 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,15 @@ +## Unreleased + +- Add dev dependency for Mistral AI integration ([#81](https://github.com/monocle2ai/monocle/pull/81)) +- Add VectorStore deployment URL capture support ([#80](https://github.com/monocle2ai/monocle/pull/80)) +- Clean up cloud exporter implementation ([#79](https://github.com/monocle2ai/monocle/pull/79)) +- Capture inference span input/output events attributes ([#77](https://github.com/monocle2ai/monocle/pull/77)) +- Add release automation workflows ([#76](https://github.com/monocle2ai/monocle/pull/76)) +- Fix gaps in Monocle SDK implementation ([#72](https://github.com/monocle2ai/monocle/pull/72)) +- Add kwargs and return value handling in Accessor ([#71](https://github.com/monocle2ai/monocle/pull/71)) +- Update workflow name formatting ([#69](https://github.com/monocle2ai/monocle/pull/69)) +- Implement Haystack metamodel support ([#68](https://github.com/monocle2ai/monocle/pull/68)) + ## Version 0.2.0 (2024-12-05) ## 0.2.0 (Oct 22, 2024) diff --git a/src/monocle_apptrace/message_processing.py b/src/monocle_apptrace/message_processing.py index 4f3b320..3af45fe 100644 --- a/src/monocle_apptrace/message_processing.py +++ b/src/monocle_apptrace/message_processing.py @@ -11,49 +11,53 @@ def extract_messages(args): """Extract system and user messages""" try: - system_message, user_message = "", "" + messages = [] args_input = get_attribute(DATA_INPUT_KEY) if args_input: - user_message = args_input - return system_message, user_message + messages.append(args_input) + return messages if args and isinstance(args, tuple) and len(args) > 0: if hasattr(args[0], "messages") and isinstance(args[0].messages, list): for msg in args[0].messages: if hasattr(msg, 'content') and hasattr(msg, 'type'): - if msg.type == "system": - system_message = msg.content - elif msg.type in ["user", "human"]: - user_message = msg.content - elif isinstance(args[0], list): + messages.append({msg.type: msg.content}) + elif isinstance(args[0], list): #llama for msg in args[0]: if hasattr(msg, 'content') and hasattr(msg, 'role'): + if hasattr(msg.role, 'value'): + role = msg.role.value + else: + role = msg.role if msg.role == "system": - system_message = msg.content + messages.append({role: msg.content}) elif msg.role in ["user", "human"]: user_message = extract_query_from_content(msg.content) - return system_message, user_message + messages.append({role: user_message}) + return messages except Exception as e: logger.warning("Warning: Error occurred in extract_messages: %s", str(e)) - return "", "" + return [] def extract_assistant_message(response): try: if isinstance(response, str): - return response + return [response] if hasattr(response, "content"): - return response.content + return [response.content] if hasattr(response, "message") and hasattr(response.message, "content"): - return response.message.content + return [response.message.content] if "replies" in response: - if hasattr(response['replies'][0], 'content'): - return response['replies'][0].content - else: - return response['replies'][0] - return "" + reply = response["replies"][0] + if hasattr(reply, 'content'): + return [reply.content] + return [reply] + if isinstance(response, dict): + return [response] + return [] except Exception as e: logger.warning("Warning: Error occurred in extract_assistant_message: %s", str(e)) - return "" + return [] def extract_query_from_content(content): diff --git a/src/monocle_apptrace/metamodel/maps/attributes/inference/haystack_entities.json b/src/monocle_apptrace/metamodel/maps/attributes/inference/haystack_entities.json index 868c0bd..c54d0a6 100644 --- a/src/monocle_apptrace/metamodel/maps/attributes/inference/haystack_entities.json +++ b/src/monocle_apptrace/metamodel/maps/attributes/inference/haystack_entities.json @@ -37,14 +37,9 @@ "attributes": [ { - "_comment": "this is instruction to LLM", - "attribute": "system", - "accessor": "lambda arguments: extract_messages(arguments)[0]" - }, - { - "_comment": "this is user query to LLM", - "attribute": "user", - "accessor": "lambda arguments: extract_messages(arguments)[1]" + "_comment": "this is instruction and user query to LLM", + "attribute": "input", + "accessor": "lambda arguments: extract_messages(arguments['args'])" } ] }, @@ -53,7 +48,7 @@ "attributes": [ { "_comment": "this is response from LLM", - "attribute": "assistant", + "attribute": "response", "accessor": "lambda response: extract_assistant_message(response)" } ] diff --git a/src/monocle_apptrace/metamodel/maps/attributes/inference/langchain_entities.json b/src/monocle_apptrace/metamodel/maps/attributes/inference/langchain_entities.json index 84aa992..35c491a 100644 --- a/src/monocle_apptrace/metamodel/maps/attributes/inference/langchain_entities.json +++ b/src/monocle_apptrace/metamodel/maps/attributes/inference/langchain_entities.json @@ -24,11 +24,11 @@ { "_comment": "LLM Model", "attribute": "name", - "accessor": "lambda arguments: resolve_from_alias(arguments['instance'].__dict__, ['model', 'model_name'])" + "accessor": "lambda arguments: resolve_from_alias(arguments['instance'].__dict__, ['model', 'model_name']) or arguments['instance'].model_id" }, { "attribute": "type", - "accessor": "lambda arguments: 'model.llm.'+resolve_from_alias(arguments['instance'].__dict__, ['model', 'model_name'])" + "accessor": "lambda arguments: 'model.llm.'+ (resolve_from_alias(arguments['instance'].__dict__, ['model', 'model_name']) or arguments['instance'].model_id)" } ] ], @@ -37,14 +37,9 @@ "attributes": [ { - "_comment": "this is instruction to LLM", - "attribute": "system", - "accessor": "lambda arguments: extract_messages(arguments)[0]" - }, - { - "_comment": "this is user query to LLM", - "attribute": "user", - "accessor": "lambda arguments: extract_messages(arguments)[1]" + "_comment": "this is instruction and user query to LLM", + "attribute": "input", + "accessor": "lambda arguments: extract_messages(arguments['args'])" } ] }, @@ -53,7 +48,7 @@ "attributes": [ { "_comment": "this is response from LLM", - "attribute": "assistant", + "attribute": "response", "accessor": "lambda response: extract_assistant_message(response)" } ] diff --git a/src/monocle_apptrace/metamodel/maps/attributes/inference/llamaindex_entities.json b/src/monocle_apptrace/metamodel/maps/attributes/inference/llamaindex_entities.json index 84aa992..03c35b9 100644 --- a/src/monocle_apptrace/metamodel/maps/attributes/inference/llamaindex_entities.json +++ b/src/monocle_apptrace/metamodel/maps/attributes/inference/llamaindex_entities.json @@ -37,14 +37,9 @@ "attributes": [ { - "_comment": "this is instruction to LLM", - "attribute": "system", - "accessor": "lambda arguments: extract_messages(arguments)[0]" - }, - { - "_comment": "this is user query to LLM", - "attribute": "user", - "accessor": "lambda arguments: extract_messages(arguments)[1]" + "_comment": "this is instruction and user query to LLM", + "attribute": "input", + "accessor": "lambda arguments: extract_messages(arguments['args'])" } ] }, @@ -53,7 +48,7 @@ "attributes": [ { "_comment": "this is response from LLM", - "attribute": "assistant", + "attribute": "response", "accessor": "lambda response: extract_assistant_message(response)" } ] diff --git a/src/monocle_apptrace/wrap_common.py b/src/monocle_apptrace/wrap_common.py index a7f0f7c..f5dd385 100644 --- a/src/monocle_apptrace/wrap_common.py +++ b/src/monocle_apptrace/wrap_common.py @@ -148,8 +148,9 @@ def process_span(to_wrap, span, instance, args, kwargs, return_value): logger.warning("attributes not found or incorrect written in entity json") if 'events' in output_processor: events = output_processor['events'] + arguments = {"instance": instance, "args": args, "kwargs": kwargs, "output": return_value} accessor_mapping = { - "arguments": args, + "arguments": arguments, "response": return_value } for event in events: @@ -164,7 +165,10 @@ def process_span(to_wrap, span, instance, args, kwargs, return_value): accessor_function = eval(accessor) for keyword, value in accessor_mapping.items(): if keyword in accessor: - event_attributes[attribute_key] = accessor_function(value) + evaluated_val = accessor_function(value) + if isinstance(evaluated_val, list): + evaluated_val = [str(d) for d in evaluated_val] + event_attributes[attribute_key] = evaluated_val except Exception as e: logger.error(f"Error evaluating accessor for attribute '{attribute_key}': {e}") span.add_event(name=event_name, attributes=event_attributes) @@ -339,6 +343,12 @@ def get_provider_name(instance): except: pass + try: + if isinstance(instance.client.meta.endpoint_url, str): + inference_endpoint = instance.client.meta.endpoint_url + except: + pass + api_base = getattr(instance, "api_base", None) if isinstance(api_base, str): provider_url = api_base @@ -398,15 +408,18 @@ def update_span_from_llm_response(response, span: Span, instance): token_usage = response["meta"][0]["usage"] if (response is not None and hasattr(response, "response_metadata")): - response_metadata = response.response_metadata - token_usage = response_metadata.get("token_usage") + if hasattr(response, "usage_metadata"): + token_usage = response.usage_metadata + else: + response_metadata = response.response_metadata + token_usage = response_metadata.get("token_usage") meta_dict = {} if token_usage is not None: temperature = instance.__dict__.get("temperature", None) meta_dict.update({"temperature": temperature}) - meta_dict.update({"completion_tokens": token_usage.get("completion_tokens")}) - meta_dict.update({"prompt_tokens": token_usage.get("prompt_tokens")}) + meta_dict.update({"completion_tokens": token_usage.get("completion_tokens") or token_usage.get("output_tokens")}) + meta_dict.update({"prompt_tokens": token_usage.get("prompt_tokens") or token_usage.get("input_tokens")}) meta_dict.update({"total_tokens": token_usage.get("total_tokens")}) span.add_event(META_DATA, meta_dict) # extract token usage from llamaindex openai diff --git a/tests/langchain_bedrock_sample.py b/tests/langchain_bedrock_sample.py new file mode 100644 index 0000000..ce76d47 --- /dev/null +++ b/tests/langchain_bedrock_sample.py @@ -0,0 +1,316 @@ +import bs4 +from langchain import hub +from langchain_chroma import Chroma +from langchain_core.output_parsers import StrOutputParser +from langchain_core.runnables import RunnablePassthrough +from langchain_aws import ChatBedrockConverse +from langchain.text_splitter import RecursiveCharacterTextSplitter +from monocle_apptrace.instrumentor import setup_monocle_telemetry +from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter +from langchain_community.embeddings import OpenAIEmbeddings +from langchain_community.document_loaders import WebBaseLoader +import boto3 +import os +os.environ["OPENAI_API_KEY"] = "" + + +bedrock_runtime_client = boto3.client(service_name='bedrock-runtime', region_name='us-east-1') + +setup_monocle_telemetry( + workflow_name="bedrock_rag_workflow", + span_processors=[BatchSpanProcessor(ConsoleSpanExporter())], + wrapper_methods=[], +) + +llm = ChatBedrockConverse( + client=bedrock_runtime_client, + model_id="ai21.jamba-1-5-mini-v1:0", + temperature=0.1, +) + +loader = WebBaseLoader( + web_paths=("https://lilianweng.github.io/posts/2023-06-23-agent/",), + bs_kwargs=dict( + parse_only=bs4.SoupStrainer( + class_=("post-content", "post-title", "post-header") + ) + ), +) +docs = loader.load() + +text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200) +splits = text_splitter.split_documents(docs) + +vectorstore = Chroma.from_documents( + documents=splits, + embedding=OpenAIEmbeddings(), +) + +retriever = vectorstore.as_retriever() + +prompt = hub.pull("rlm/rag-prompt") + + +def format_docs(docs): + return "\n\n".join(doc.page_content for doc in docs) + + +rag_chain = ( + {"context": retriever | format_docs, "question": RunnablePassthrough()} + | prompt + | llm + | StrOutputParser() +) + +query = "What is Task Decomposition?" +result = rag_chain.invoke(query) + +print(result) + + +# { +# "name": "langchain_core.vectorstores.base.VectorStoreRetriever", +# "context": { +# "trace_id": "0x569a7336ee2a2fe52923844a6eedf79d", +# "span_id": "0x015f4ec4ff5d61f3", +# "trace_state": "[]" +# }, +# "kind": "SpanKind.INTERNAL", +# "parent_id": "0xb0809664c643f839", +# "start_time": "2024-12-06T11:19:39.619246Z", +# "end_time": "2024-12-06T11:19:40.225971Z", +# "status": { +# "status_code": "UNSET" +# }, +# "attributes": { +# "span.type": "retrieval", +# "entity.count": 2, +# "entity.1.name": "Chroma", +# "entity.1.type": "vectorstore.Chroma", +# "entity.2.name": "text-embedding-ada-002", +# "entity.2.type": "model.embedding.text-embedding-ada-002" +# }, +# "events": [ +# { +# "name": "data.input", +# "timestamp": "2024-12-06T11:19:39.619269Z", +# "attributes": { +# "input": "What is Task Decomposition?" +# } +# }, +# { +# "name": "data.output", +# "timestamp": "2024-12-06T11:19:40.225945Z", +# "attributes": { +# "response": "Fig. 1. Overview of a LLM-powered autonomous agent system.\nComponent One: Planning#\nA complicated ta..." +# } +# } +# ], +# "links": [], +# "resource": { +# "attributes": { +# "service.name": "bedrock_rag_workflow" +# }, +# "schema_url": "" +# } +# }, +# { +# "name": "langchain.workflow", +# "context": { +# "trace_id": "0x569a7336ee2a2fe52923844a6eedf79d", +# "span_id": "0xb0809664c643f839", +# "trace_state": "[]" +# }, +# "kind": "SpanKind.INTERNAL", +# "parent_id": "0x902f1c282642e02f", +# "start_time": "2024-12-06T11:19:39.619011Z", +# "end_time": "2024-12-06T11:19:40.226404Z", +# "status": { +# "status_code": "UNSET" +# }, +# "attributes": {}, +# "events": [], +# "links": [], +# "resource": { +# "attributes": { +# "service.name": "bedrock_rag_workflow" +# }, +# "schema_url": "" +# } +# }, +# { +# "name": "langchain.workflow", +# "context": { +# "trace_id": "0x569a7336ee2a2fe52923844a6eedf79d", +# "span_id": "0x902f1c282642e02f", +# "trace_state": "[]" +# }, +# "kind": "SpanKind.INTERNAL", +# "parent_id": "0x9308293874a886e6", +# "start_time": "2024-12-06T11:19:39.618541Z", +# "end_time": "2024-12-06T11:19:40.226591Z", +# "status": { +# "status_code": "UNSET" +# }, +# "attributes": {}, +# "events": [], +# "links": [], +# "resource": { +# "attributes": { +# "service.name": "bedrock_rag_workflow" +# }, +# "schema_url": "" +# } +# }, +# { +# "name": "langchain_core.prompts.chat.ChatPromptTemplate", +# "context": { +# "trace_id": "0x569a7336ee2a2fe52923844a6eedf79d", +# "span_id": "0x372e5758c82f6b1b", +# "trace_state": "[]" +# }, +# "kind": "SpanKind.INTERNAL", +# "parent_id": "0x9308293874a886e6", +# "start_time": "2024-12-06T11:19:40.226714Z", +# "end_time": "2024-12-06T11:19:40.227314Z", +# "status": { +# "status_code": "UNSET" +# }, +# "attributes": {}, +# "events": [], +# "links": [], +# "resource": { +# "attributes": { +# "service.name": "bedrock_rag_workflow" +# }, +# "schema_url": "" +# } +# }, +# { +# "name": "langchain_aws.chat_models.bedrock_converse.ChatBedrockConverse", +# "context": { +# "trace_id": "0x569a7336ee2a2fe52923844a6eedf79d", +# "span_id": "0x0de4a317052e1ea3", +# "trace_state": "[]" +# }, +# "kind": "SpanKind.INTERNAL", +# "parent_id": "0x9308293874a886e6", +# "start_time": "2024-12-06T11:19:40.227407Z", +# "end_time": "2024-12-06T11:19:41.996350Z", +# "status": { +# "status_code": "UNSET" +# }, +# "attributes": { +# "span.type": "inference", +# "entity.count": 2, +# "entity.1.type": "inference.azure_oai", +# "entity.1.inference_endpoint": "https://bedrock-runtime.us-east-1.amazonaws.com", +# "entity.2.name": "ai21.jamba-1-5-mini-v1:0", +# "entity.2.type": "model.llm.ai21.jamba-1-5-mini-v1:0" +# }, +# "events": [ +# { +# "name": "data.input", +# "timestamp": "2024-12-06T11:19:41.996279Z", +# "attributes": { +# "input": [ +# "{'human': 'You are an assistant for question-answering tasks. Use the following pieces of retrieved context to answer the question. If you don\\'t know the answer, just say that you don\\'t know. Use three sentences maximum and keep the answer concise.\\nQuestion: What is Task Decomposition? \\nContext: Fig. 1. Overview of a LLM-powered autonomous agent system.\\nComponent One: Planning#\\nA complicated task usually involves many steps. An agent needs to know what they are and plan ahead.\\nTask Decomposition#\\nChain of thought (CoT; Wei et al. 2022) has become a standard prompting technique for enhancing model performance on complex tasks. The model is instructed to \u201cthink step by step\u201d to utilize more test-time computation to decompose hard tasks into smaller and simpler steps. CoT transforms big tasks into multiple manageable tasks and shed lights into an interpretation of the model\u2019s thinking process.\\n\\nTree of Thoughts (Yao et al. 2023) extends CoT by exploring multiple reasoning possibilities at each step. It first decomposes the problem into multiple thought steps and generates multiple thoughts per step, creating a tree structure. The search process can be BFS (breadth-first search) or DFS (depth-first search) with each state evaluated by a classifier (via a prompt) or majority vote.\\nTask decomposition can be done (1) by LLM with simple prompting like \"Steps for XYZ.\\\\n1.\", \"What are the subgoals for achieving XYZ?\", (2) by using task-specific instructions; e.g. \"Write a story outline.\" for writing a novel, or (3) with human inputs.\\n\\nResources:\\n1. Internet access for searches and information gathering.\\n2. Long Term memory management.\\n3. GPT-3.5 powered Agents for delegation of simple tasks.\\n4. File output.\\n\\nPerformance Evaluation:\\n1. Continuously review and analyze your actions to ensure you are performing to the best of your abilities.\\n2. Constructively self-criticize your big-picture behavior constantly.\\n3. Reflect on past decisions and strategies to refine your approach.\\n4. Every command has a cost, so be smart and efficient. Aim to complete tasks in the least number of steps.\\n\\n(3) Task execution: Expert models execute on the specific tasks and log results.\\nInstruction:\\n\\nWith the input and the inference results, the AI assistant needs to describe the process and results. The previous stages can be formed as - User Input: {{ User Input }}, Task Planning: {{ Tasks }}, Model Selection: {{ Model Assignment }}, Task Execution: {{ Predictions }}. You must first answer the user\\'s request in a straightforward manner. Then describe the task process and show your analysis and model inference results to the user in the first person. If inference results contain a file path, must tell the user the complete file path. \\nAnswer:'}" +# ] +# } +# }, +# { +# "name": "data.output", +# "timestamp": "2024-12-06T11:19:41.996316Z", +# "attributes": { +# "response": [ +# " Task decomposition is the process of breaking down a complex task into smaller, more manageable steps. This can be done using various methods, such as simple prompting, task-specific instructions, or human input. The goal is to make the task more understandable and easier to complete." +# ] +# } +# }, +# { +# "name": "metadata", +# "timestamp": "2024-12-06T11:19:41.996333Z", +# "attributes": { +# "temperature": 0.1, +# "completion_tokens": 55, +# "prompt_tokens": 640, +# "total_tokens": 695 +# } +# } +# ], +# "links": [], +# "resource": { +# "attributes": { +# "service.name": "bedrock_rag_workflow" +# }, +# "schema_url": "" +# } +# }, +# { +# "name": "langchain_core.output_parsers.string.StrOutputParser", +# "context": { +# "trace_id": "0x569a7336ee2a2fe52923844a6eedf79d", +# "span_id": "0x469ffc2240a50d36", +# "trace_state": "[]" +# }, +# "kind": "SpanKind.INTERNAL", +# "parent_id": "0x9308293874a886e6", +# "start_time": "2024-12-06T11:19:41.996474Z", +# "end_time": "2024-12-06T11:19:41.996719Z", +# "status": { +# "status_code": "UNSET" +# }, +# "attributes": {}, +# "events": [], +# "links": [], +# "resource": { +# "attributes": { +# "service.name": "bedrock_rag_workflow" +# }, +# "schema_url": "" +# } +# }, +# { +# "name": "langchain.workflow", +# "context": { +# "trace_id": "0x569a7336ee2a2fe52923844a6eedf79d", +# "span_id": "0x9308293874a886e6", +# "trace_state": "[]" +# }, +# "kind": "SpanKind.INTERNAL", +# "parent_id": null, +# "start_time": "2024-12-06T11:19:39.602991Z", +# "end_time": "2024-12-06T11:19:41.996782Z", +# "status": { +# "status_code": "UNSET" +# }, +# "attributes": { +# "monocle_apptrace.version": "0.3.0", +# "span.type": "workflow", +# "entity.1.name": "bedrock_rag_workflow", +# "entity.1.type": "workflow.langchain" +# }, +# "events": [ +# { +# "name": "data.input", +# "timestamp": "2024-12-06T11:19:39.604009Z", +# "attributes": { +# "input": "What is Task Decomposition?" +# } +# }, +# { +# "name": "data.output", +# "timestamp": "2024-12-06T11:19:41.996776Z", +# "attributes": { +# "response": " Task decomposition is the process of breaking down a complex task into smaller, more manageable steps. This can be done using various methods, such as simple prompting, task-specific instructions, or human input. The goal is to make the task more understandable and easier to complete." +# } +# } +# ], +# "links": [], +# "resource": { +# "attributes": { +# "service.name": "bedrock_rag_workflow" +# }, +# "schema_url": "" +# } +# } diff --git a/tests/langchain_bedrock_test.py b/tests/langchain_bedrock_test.py new file mode 100644 index 0000000..2dc5d21 --- /dev/null +++ b/tests/langchain_bedrock_test.py @@ -0,0 +1,63 @@ +import unittest +from unittest.mock import patch, MagicMock +from monocle_apptrace.wrap_common import process_span +import requests + + +class TestProcessSpan(unittest.TestCase): + + @patch.object(requests.Session, 'post') + def test_process_span_with_mocked_eval(self, mock_eval): + mock_eval.side_effect = lambda expression: { + "lambda arguments: arguments['kwargs']['provider_name'] or arguments['instance'].provider": "value1", + "lambda arguments: extract_messages(arguments['args']": "What is Task Decomposition?" + }.get(expression, None) + + span = MagicMock() + span.set_attribute = MagicMock() + instance = MagicMock() + + args = (MagicMock(messages=[ + MagicMock(content="System message", type="system"), + MagicMock(content="What is Task Decomposition?", type="user") + ]), {}) + kwargs = {"key1": "value1", "provider_name": "value1"} + return_value = "test_return_value" + + # Define wrap_attributes with attributes and events to process + wrap_attributes = { + "output_processor": { + "type": "inference", + "attributes": [ + [ + { + "attribute": "provider_name", + "accessor": "lambda arguments: arguments['kwargs']['provider_name'] or arguments['instance'].provider" + } + ] + ], + "events": [ + { + "name": "data.input", + "attributes": [ + { + "attribute": "user", + "accessor": "lambda arguments: extract_messages(arguments['args'])" + } + ] + } + ] + } + } + + process_span(to_wrap=wrap_attributes, span=span, instance=instance, args=args, kwargs=kwargs, + return_value=return_value) + + # Verify the span and events attributes + span.set_attribute.assert_any_call("entity.count", 1) + span.set_attribute.assert_any_call("span.type", "inference") + span.set_attribute.assert_any_call("entity.1.provider_name", "value1") + span.add_event.assert_any_call(name="data.input", attributes={'user': ["{'system': 'System message'}", "{'user': 'What is Task Decomposition?'}"]}) + +if __name__ == '__main__': + unittest.main()