From 13a5f70f09ba692267d0902bf09e61f29e821ebe Mon Sep 17 00:00:00 2001 From: Isaac Francisco <78627776+isahers1@users.noreply.github.com> Date: Mon, 4 Nov 2024 09:47:51 -0800 Subject: [PATCH] Python feat: track evaluator errors from sdk (#1079) Co-authored-by: William Fu-Hinthorn <13333726+hinthornw@users.noreply.github.com> --- python/langsmith/client.py | 6 + python/langsmith/evaluation/_arunner.py | 42 ++++- python/langsmith/evaluation/_runner.py | 186 ++++++++++++++++++++- python/langsmith/evaluation/evaluator.py | 2 + python/langsmith/schemas.py | 2 + python/tests/evaluation/test_evaluation.py | 158 ++++++++++++++++- 6 files changed, 390 insertions(+), 6 deletions(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index 9f7d2846a..4419a2e34 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -4120,6 +4120,7 @@ def _submit_feedback(**kwargs): ), feedback_source_type=ls_schemas.FeedbackSourceType.MODEL, project_id=project_id, + extra=res.extra, trace_id=run.trace_id if run else None, ) return results @@ -4191,6 +4192,7 @@ def create_feedback( project_id: Optional[ID_TYPE] = None, comparative_experiment_id: Optional[ID_TYPE] = None, feedback_group_id: Optional[ID_TYPE] = None, + extra: Optional[Dict] = None, trace_id: Optional[ID_TYPE] = None, **kwargs: Any, ) -> ls_schemas.Feedback: @@ -4239,6 +4241,9 @@ def create_feedback( feedback_group_id : str or UUID When logging preferences, ranking runs, or other comparative feedback, this is used to group feedback together. + extra : dict + Metadata for the feedback. + trace_id: Optional[ID_TYPE] = The trace ID of the run to provide feedback for. Enables batch ingestion. """ if run_id is None and project_id is None: raise ValueError("One of run_id and project_id must be provided") @@ -4302,6 +4307,7 @@ def create_feedback( comparative_experiment_id, accept_null=True ), feedback_group_id=_ensure_uuid(feedback_group_id, accept_null=True), + extra=extra, ) use_multipart = (self.info.batch_ingest_config or {}).get( diff --git a/python/langsmith/evaluation/_arunner.py b/python/langsmith/evaluation/_arunner.py index a1055e64d..0b36d425a 100644 --- a/python/langsmith/evaluation/_arunner.py +++ b/python/langsmith/evaluation/_arunner.py @@ -36,6 +36,7 @@ SUMMARY_EVALUATOR_T, ExperimentResultRow, _ExperimentManagerMixin, + _extract_feedback_keys, _ForwardResults, _load_examples_map, _load_experiment, @@ -46,7 +47,11 @@ _resolve_experiment, _wrap_summary_evaluators, ) -from langsmith.evaluation.evaluator import EvaluationResults, RunEvaluator +from langsmith.evaluation.evaluator import ( + EvaluationResult, + EvaluationResults, + RunEvaluator, +) logger = logging.getLogger(__name__) @@ -667,6 +672,34 @@ async def _arun_evaluators( ) ) except Exception as e: + try: + feedback_keys = _extract_feedback_keys(evaluator) + + error_response = EvaluationResults( + results=[ + EvaluationResult( + key=key, + source_run_id=run.id, + comment=repr(e), + extra={"error": True}, + ) + for key in feedback_keys + ] + ) + eval_results["results"].extend( + # TODO: This is a hack + self.client._log_evaluation_feedback( + error_response, run=run, _executor=executor + ) + ) + except Exception as e2: + logger.debug(f"Error parsing feedback keys: {e2}") + pass + logger.error( + f"Error running evaluator {repr(evaluator)} on" + f" run {run.id}: {repr(e)}", + exc_info=True, + ) logger.error( f"Error running evaluator {repr(evaluator)} on" f" run {run.id}: {repr(e)}", @@ -727,7 +760,8 @@ async def _aapply_summary_evaluators( ) except Exception as e: logger.error( - f"Error running summary evaluator {repr(evaluator)}: {e}" + f"Error running summary evaluator {repr(evaluator)}: {e}", + exc_info=True, ) yield {"results": aggregate_feedback} @@ -861,7 +895,9 @@ def _get_run(r: run_trees.RunTree) -> None: ), ) except Exception as e: - logger.error(f"Error running target function: {e}") + logger.error( + f"Error running target function: {e}", exc_info=True, stacklevel=1 + ) return _ForwardResults( run=cast(schemas.Run, run), example=example, diff --git a/python/langsmith/evaluation/_runner.py b/python/langsmith/evaluation/_runner.py index 06a65a075..ae04736df 100644 --- a/python/langsmith/evaluation/_runner.py +++ b/python/langsmith/evaluation/_runner.py @@ -2,15 +2,18 @@ from __future__ import annotations +import ast import collections import concurrent.futures as cf import datetime import functools +import inspect import itertools import logging import pathlib import queue import random +import textwrap import threading import uuid from contextvars import copy_context @@ -42,6 +45,7 @@ from langsmith.evaluation.evaluator import ( ComparisonEvaluationResult, DynamicComparisonRunEvaluator, + DynamicRunEvaluator, EvaluationResult, EvaluationResults, RunEvaluator, @@ -1365,6 +1369,29 @@ def _run_evaluators( ) ) except Exception as e: + try: + feedback_keys = _extract_feedback_keys(evaluator) + + error_response = EvaluationResults( + results=[ + EvaluationResult( + key=key, + source_run_id=run.id, + comment=repr(e), + extra={"error": True}, + ) + for key in feedback_keys + ] + ) + eval_results["results"].extend( + # TODO: This is a hack + self.client._log_evaluation_feedback( + error_response, run=run, _executor=executor + ) + ) + except Exception as e2: + logger.debug(f"Error parsing feedback keys: {e2}") + pass logger.error( f"Error running evaluator {repr(evaluator)} on" f" run {run.id}: {repr(e)}", @@ -1469,7 +1496,8 @@ def _apply_summary_evaluators( ) except Exception as e: logger.error( - f"Error running summary evaluator {repr(evaluator)}: {e}" + f"Error running summary evaluator {repr(evaluator)}: {e}", + exc_info=True, ) yield {"results": aggregate_feedback} @@ -1593,7 +1621,9 @@ def _get_run(r: rt.RunTree) -> None: ), ) except Exception as e: - logger.error(f"Error running target function: {e}") + logger.error( + f"Error running target function: {e}", exc_info=True, stacklevel=1 + ) return _ForwardResults( run=cast(schemas.Run, run), example=example, @@ -1662,3 +1692,155 @@ def _get_random_name() -> str: from langsmith.evaluation._name_generation import random_name # noqa: F401 return random_name() + + +def _extract_feedback_keys(evaluator: RunEvaluator): + if isinstance(evaluator, DynamicRunEvaluator): + if getattr(evaluator, "func", None): + return _extract_code_evaluator_feedback_keys(evaluator.func) + elif getattr(evaluator, "afunc", None): + return _extract_code_evaluator_feedback_keys(evaluator.afunc) + # TODO: Support for DynamicComparisonRunEvaluator + if hasattr(evaluator, "evaluator"): + # LangChainStringEvaluator + if getattr(getattr(evaluator, "evaluator"), "evaluation_name", None): + return [evaluator.evaluator.evaluation_name] + return [] + + +def _extract_code_evaluator_feedback_keys(func: Callable) -> list[str]: + python_code = inspect.getsource(func) + + def extract_dict_keys(node): + if isinstance(node, ast.Dict): + keys = [] + key_value = None + for key, value in zip(node.keys, node.values): + if isinstance(key, (ast.Str, ast.Constant)): + key_str = key.s if isinstance(key, ast.Str) else key.value + if key_str == "key" and isinstance(value, (ast.Str, ast.Constant)): + key_value = ( + value.s if isinstance(value, ast.Str) else value.value + ) + return [key_value] if key_value else keys + elif ( + isinstance(node, ast.Call) + and isinstance(node.func, ast.Name) + and node.func.id == "dict" + ): + for keyword in node.keywords: + if keyword.arg == "key" and isinstance( + keyword.value, (ast.Str, ast.Constant) + ): + return [ + ( + keyword.value.s + if isinstance(keyword.value, ast.Str) + else keyword.value.value + ) + ] + return [] + + def extract_evaluation_result_key(node): + if ( + isinstance(node, ast.Call) + and isinstance(node.func, ast.Name) + and node.func.id == "EvaluationResult" + ): + for keyword in node.keywords: + if keyword.arg == "key" and isinstance( + keyword.value, (ast.Str, ast.Constant) + ): + return [ + ( + keyword.value.s + if isinstance(keyword.value, ast.Str) + else keyword.value.value + ) + ] + return [] + + def extract_evaluation_results_keys(node, variables): + if ( + isinstance(node, ast.Call) + and isinstance(node.func, ast.Name) + and node.func.id == "EvaluationResults" + ): + for keyword in node.keywords: + if keyword.arg == "results": + if isinstance(keyword.value, ast.Name): + return variables.get(keyword.value.id, []) + elif isinstance(keyword.value, ast.List): + keys = [] + for elt in keyword.value.elts: + keys.extend(extract_evaluation_result_key(elt)) + return keys + elif isinstance(node, ast.Dict): + for key, value in zip(node.keys, node.values): + if isinstance(key, (ast.Str, ast.Constant)) and key.s == "results": + if isinstance(value, ast.List): + keys = [] + for elt in value.elts: + if isinstance(elt, ast.Dict): + for elt_key, elt_value in zip(elt.keys, elt.values): + if ( + isinstance(elt_key, (ast.Str, ast.Constant)) + and elt_key.s == "key" + ): + if isinstance( + elt_value, (ast.Str, ast.Constant) + ): + keys.append(elt_value.s) + elif ( + isinstance(elt, ast.Call) + and isinstance(elt.func, ast.Name) + and elt.func.id in ("EvaluationResult", "dict") + ): + for keyword in elt.keywords: + if keyword.arg == "key" and isinstance( + keyword.value, (ast.Str, ast.Constant) + ): + keys.append( + keyword.value.s + if isinstance(keyword.value, ast.Str) + else keyword.value.value + ) + + return keys + return [] + + python_code = textwrap.dedent(python_code) + + try: + tree = ast.parse(python_code) + function_def = tree.body[0] + if not isinstance(function_def, ast.FunctionDef): + return [] + + variables = {} + keys = [] + + for node in ast.walk(function_def): + if isinstance(node, ast.Assign): + if isinstance(node.value, ast.List): + list_keys = [] + for elt in node.value.elts: + list_keys.extend(extract_evaluation_result_key(elt)) + if isinstance(node.targets[0], ast.Name): + variables[node.targets[0].id] = list_keys + elif isinstance(node, ast.Return) and node.value is not None: + dict_keys = extract_dict_keys(node.value) + eval_result_key = extract_evaluation_result_key(node.value) + eval_results_keys = extract_evaluation_results_keys( + node.value, variables + ) + + keys.extend(dict_keys) + keys.extend(eval_result_key) + keys.extend(eval_results_keys) + + # If no keys found, return the function name + return keys if keys else [function_def.name] + + except SyntaxError: + return [] diff --git a/python/langsmith/evaluation/evaluator.py b/python/langsmith/evaluation/evaluator.py index 7e3e748ba..065f5b16b 100644 --- a/python/langsmith/evaluation/evaluator.py +++ b/python/langsmith/evaluation/evaluator.py @@ -90,6 +90,8 @@ class EvaluationResult(BaseModel): If none provided, the evaluation feedback is applied to the root trace being.""" + extra: Optional[Dict] = None + """Metadata for the evaluator run.""" class Config: """Pydantic model configuration.""" diff --git a/python/langsmith/schemas.py b/python/langsmith/schemas.py index 8be35c558..2ef728d0e 100644 --- a/python/langsmith/schemas.py +++ b/python/langsmith/schemas.py @@ -486,6 +486,8 @@ class FeedbackBase(BaseModel): """For preference scoring, this group ID is shared across feedbacks for each run in the group that was being compared.""" + extra: Optional[Dict] = None + """The metadata of the feedback.""" class Config: """Configuration class for the schema.""" diff --git a/python/tests/evaluation/test_evaluation.py b/python/tests/evaluation/test_evaluation.py index 1cd8ced9a..87ca42ac5 100644 --- a/python/tests/evaluation/test_evaluation.py +++ b/python/tests/evaluation/test_evaluation.py @@ -1,15 +1,29 @@ import asyncio +import logging import time +from contextlib import contextmanager from typing import Callable, Sequence, Tuple, TypeVar import pytest from langsmith import Client, aevaluate, evaluate, expect, test +from langsmith.evaluation import EvaluationResult, EvaluationResults from langsmith.schemas import Example, Run T = TypeVar("T") +@contextmanager +def suppress_warnings(): + logger = logging.getLogger() + current_level = logger.level + logger.setLevel(logging.CRITICAL) + try: + yield + finally: + logger.setLevel(current_level) + + def wait_for( condition: Callable[[], Tuple[T, bool]], max_sleep_time: int = 120, @@ -32,7 +46,149 @@ def wait_for( raise ValueError(f"Callable did not return within {total_time}") -@pytest.mark.skip(reason="Skipping this test for now. Should remove in the future.") +async def test_error_handling_evaluators(): + client = Client() + _ = client.clone_public_dataset( + "https://smith.langchain.com/public/419dcab2-1d66-4b94-8901-0357ead390df/d" + ) + dataset_name = "Evaluate Examples" + + # Case 1: Normal dictionary return + def error_dict_evaluator(run: Run, example: Example): + if True: # This condition ensures the error is always raised + raise ValueError("Error in dict evaluator") + return {"key": "dict_key", "score": 1} + + # Case 2: EvaluationResult return + def error_evaluation_result(run: Run, example: Example): + if True: # This condition ensures the error is always raised + raise ValueError("Error in EvaluationResult evaluator") + return EvaluationResult(key="eval_result_key", score=1) + + # Case 3: EvaluationResults return + def error_evaluation_results(run: Run, example: Example): + if True: # This condition ensures the error is always raised + raise ValueError("Error in EvaluationResults evaluator") + return EvaluationResults( + results=[ + EvaluationResult(key="eval_results_key1", score=1), + EvaluationResult(key="eval_results_key2", score=2), + ] + ) + + # Case 4: Dictionary without 'key' field + def error_dict_no_key(run: Run, example: Example): + if True: # This condition ensures the error is always raised + raise ValueError("Error in dict without key evaluator") + return {"score": 1} + + # Case 5: dict-style results + def error_evaluation_results_dict(run: Run, example: Example): + if True: # This condition ensures the error is always raised + raise ValueError("Error in EvaluationResults dict evaluator") + + return { + "results": [ + dict(key="eval_results_dict_key1", score=1), + {"key": "eval_results_dict_key2", "score": 2}, + EvaluationResult(key="eval_results_dict_key3", score=3), + ] + } + + def predict(inputs: dict) -> dict: + return {"output": "Yes"} + + with suppress_warnings(): + sync_results = evaluate( + predict, + data=client.list_examples( + dataset_name=dataset_name, + as_of="test_version", + ), + evaluators=[ + error_dict_evaluator, + error_evaluation_result, + error_evaluation_results, + error_dict_no_key, + error_evaluation_results_dict, + ], + max_concurrency=1, # To ensure deterministic order + ) + + assert len(sync_results) == 10 # Assuming 10 examples in the dataset + + def check_results(results): + for result in results: + eval_results = result["evaluation_results"]["results"] + assert len(eval_results) == 8 + + # Check error handling for each evaluator + assert eval_results[0].key == "dict_key" + assert "Error in dict evaluator" in eval_results[0].comment + assert eval_results[0].extra.get("error") is True + + assert eval_results[1].key == "eval_result_key" + assert "Error in EvaluationResult evaluator" in eval_results[1].comment + assert eval_results[1].extra.get("error") is True + + assert eval_results[2].key == "eval_results_key1" + assert "Error in EvaluationResults evaluator" in eval_results[2].comment + assert eval_results[2].extra.get("error") is True + + assert eval_results[3].key == "eval_results_key2" + assert "Error in EvaluationResults evaluator" in eval_results[3].comment + assert eval_results[3].extra.get("error") is True + + assert eval_results[4].key == "error_dict_no_key" + assert "Error in dict without key evaluator" in eval_results[4].comment + assert eval_results[4].extra.get("error") is True + + assert eval_results[5].key == "eval_results_dict_key1" + assert ( + "Error in EvaluationResults dict evaluator" in eval_results[5].comment + ) + assert eval_results[5].extra.get("error") is True + + assert eval_results[6].key == "eval_results_dict_key2" + assert ( + "Error in EvaluationResults dict evaluator" in eval_results[6].comment + ) + assert eval_results[6].extra.get("error") is True + + assert eval_results[7].key == "eval_results_dict_key3" + assert ( + "Error in EvaluationResults dict evaluator" in eval_results[7].comment + ) + assert eval_results[7].extra.get("error") is True + + check_results(sync_results) + + async def apredict(inputs: dict): + return predict(inputs) + + with suppress_warnings(): + async_results = await aevaluate( + apredict, + data=list( + client.list_examples( + dataset_name=dataset_name, + as_of="test_version", + ) + ), + evaluators=[ + error_dict_evaluator, + error_evaluation_result, + error_evaluation_results, + error_dict_no_key, + error_evaluation_results_dict, + ], + max_concurrency=1, # To ensure deterministic order + ) + + assert len(async_results) == 10 # Assuming 10 examples in the dataset + check_results([res async for res in async_results]) + + def test_evaluate(): client = Client() _ = client.clone_public_dataset(