Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add inline evaluation results for summary evaluators #1347

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 16 additions & 13 deletions python/langsmith/evaluation/_arunner.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""V2 Evaluation Interface."""

Check notice on line 1 in python/langsmith/evaluation/_arunner.py

View workflow job for this annotation

GitHub Actions / benchmark

Benchmark results

........... WARNING: the benchmark result may be unstable * the standard deviation (83.6 ms) is 12% of the mean (707 ms) Try to rerun the benchmark with more runs, values and/or loops. Run 'python -m pyperf system tune' command to reduce the system jitter. Use pyperf stats, pyperf dump and pyperf hist to analyze results. Use --quiet option to hide these warnings. create_5_000_run_trees: Mean +- std dev: 707 ms +- 84 ms ........... WARNING: the benchmark result may be unstable * the standard deviation (180 ms) is 13% of the mean (1.41 sec) Try to rerun the benchmark with more runs, values and/or loops. Run 'python -m pyperf system tune' command to reduce the system jitter. Use pyperf stats, pyperf dump and pyperf hist to analyze results. Use --quiet option to hide these warnings. create_10_000_run_trees: Mean +- std dev: 1.41 sec +- 0.18 sec ........... WARNING: the benchmark result may be unstable * the standard deviation (172 ms) is 12% of the mean (1.41 sec) Try to rerun the benchmark with more runs, values and/or loops. Run 'python -m pyperf system tune' command to reduce the system jitter. Use pyperf stats, pyperf dump and pyperf hist to analyze results. Use --quiet option to hide these warnings. create_20_000_run_trees: Mean +- std dev: 1.41 sec +- 0.17 sec ........... dumps_class_nested_py_branch_and_leaf_200x400: Mean +- std dev: 687 us +- 5 us ........... dumps_class_nested_py_leaf_50x100: Mean +- std dev: 24.9 ms +- 0.2 ms ........... dumps_class_nested_py_leaf_100x200: Mean +- std dev: 104 ms +- 3 ms ........... dumps_dataclass_nested_50x100: Mean +- std dev: 25.3 ms +- 0.3 ms ........... WARNING: the benchmark result may be unstable * the standard deviation (16.8 ms) is 23% of the mean (72.4 ms) Try to rerun the benchmark with more runs, values and/or loops. Run 'python -m pyperf system tune' command to reduce the system jitter. Use pyperf stats, pyperf dump and pyperf hist to analyze results. Use --quiet option to hide these warnings. dumps_pydantic_nested_50x100: Mean +- std dev: 72.4 ms +- 16.8 ms ........... dumps_pydanticv1_nested_50x100: Mean +- std dev: 197 ms +- 3 ms

Check notice on line 1 in python/langsmith/evaluation/_arunner.py

View workflow job for this annotation

GitHub Actions / benchmark

Comparison against main

+-----------------------------------------------+----------+------------------------+ | Benchmark | main | changes | +===============================================+==========+========================+ | dumps_pydanticv1_nested_50x100 | 220 ms | 197 ms: 1.11x faster | +-----------------------------------------------+----------+------------------------+ | create_5_000_run_trees | 731 ms | 707 ms: 1.03x faster | +-----------------------------------------------+----------+------------------------+ | dumps_dataclass_nested_50x100 | 25.5 ms | 25.3 ms: 1.01x faster | +-----------------------------------------------+----------+------------------------+ | dumps_class_nested_py_leaf_50x100 | 25.1 ms | 24.9 ms: 1.01x faster | +-----------------------------------------------+----------+------------------------+ | dumps_class_nested_py_branch_and_leaf_200x400 | 690 us | 687 us: 1.00x faster | +-----------------------------------------------+----------+------------------------+ | dumps_class_nested_py_leaf_100x200 | 104 ms | 104 ms: 1.00x faster | +-----------------------------------------------+----------+------------------------+ | create_10_000_run_trees | 1.41 sec | 1.41 sec: 1.00x slower | +-----------------------------------------------+----------+------------------------+ | create_20_000_run_trees | 1.39 sec | 1.41 sec: 1.01x slower | +-----------------------------------------------+----------+------------------------+ | dumps_pydantic_nested_50x100 | 65.9 ms | 72.4 ms: 1.10x slower | +-----------------------------------------------+----------+------------------------+ | Geometric mean | (ref) | 1.01x faster | +-----------------------------------------------+----------+------------------------+

from __future__ import annotations

Expand Down Expand Up @@ -36,7 +36,6 @@
AEVALUATOR_T,
DATA_T,
EVALUATOR_T,
ExperimentResultRow,
_evaluators_include_attachments,
_ExperimentManagerMixin,
_extract_feedback_keys,
Expand Down Expand Up @@ -690,7 +689,9 @@
summary_evaluators: Sequence[SUMMARY_EVALUATOR_T],
) -> _AsyncExperimentManager:
wrapped_evaluators = _wrap_summary_evaluators(summary_evaluators)
aggregate_feedback_gen = self._aapply_summary_evaluators(wrapped_evaluators)
aggregate_feedback_gen = self._aapply_summary_evaluators(
wrapped_evaluators, [r async for r in self.aget_results()]
)
return _AsyncExperimentManager(
await self.aget_examples(),
experiment=self._experiment,
Expand All @@ -703,11 +704,11 @@
upload_results=self._upload_results,
)

async def aget_results(self) -> AsyncIterator[ExperimentResultRow]:
async def aget_results(self) -> AsyncIterator[schemas.ExperimentResultRow]:
async for run, example, evaluation_results in aitertools.async_zip(
self.aget_runs(), await self.aget_examples(), self.aget_evaluation_results()
):
yield ExperimentResultRow(
yield schemas.ExperimentResultRow(
run=run,
example=example,
evaluation_results=evaluation_results,
Expand Down Expand Up @@ -758,7 +759,7 @@
self,
evaluators: Sequence[RunEvaluator],
max_concurrency: Optional[int] = None,
) -> AsyncIterator[ExperimentResultRow]:
) -> AsyncIterator[schemas.ExperimentResultRow]:
with cf.ThreadPoolExecutor(max_workers=4) as executor:

async def score_all():
Expand All @@ -776,9 +777,9 @@
async def _arun_evaluators(
self,
evaluators: Sequence[RunEvaluator],
current_results: ExperimentResultRow,
current_results: schemas.ExperimentResultRow,
executor: cf.ThreadPoolExecutor,
) -> ExperimentResultRow:
) -> schemas.ExperimentResultRow:
current_context = rh.get_tracing_context()
metadata = {
**(current_context["metadata"] or {}),
Expand Down Expand Up @@ -848,14 +849,16 @@
for attachment in example.attachments:
reader = example.attachments[attachment]["reader"]
reader.seek(0)
return ExperimentResultRow(
return schemas.ExperimentResultRow(
run=run,
example=example,
evaluation_results=eval_results,
)

async def _aapply_summary_evaluators(
self, summary_evaluators: Sequence[SUMMARY_EVALUATOR_T]
self,
summary_evaluators: Sequence[SUMMARY_EVALUATOR_T],
evaluation_results: List[schemas.ExperimentResultRow],
) -> AsyncIterator[EvaluationResults]:
runs, examples = [], []
async_examples = aitertools.ensure_async_iterator(await self.aget_examples())
Expand Down Expand Up @@ -885,7 +888,7 @@
):
for evaluator in summary_evaluators:
try:
summary_eval_result = evaluator(runs, examples)
summary_eval_result = evaluator(runs, examples, evaluation_results)
flattened_results = self.client._select_eval_results(
summary_eval_result,
fn_name=evaluator.__name__,
Expand Down Expand Up @@ -963,7 +966,7 @@
experiment_manager: _AsyncExperimentManager,
):
self._manager = experiment_manager
self._results: List[ExperimentResultRow] = []
self._results: List[schemas.ExperimentResultRow] = []
self._lock = asyncio.Lock()
self._task = asyncio.create_task(self._process_data(self._manager))
self._processed_count = 0
Expand All @@ -972,10 +975,10 @@
def experiment_name(self) -> str:
return self._manager.experiment_name

def __aiter__(self) -> AsyncIterator[ExperimentResultRow]:
def __aiter__(self) -> AsyncIterator[schemas.ExperimentResultRow]:
return self

async def __anext__(self) -> ExperimentResultRow:
async def __anext__(self) -> schemas.ExperimentResultRow:
async def _wait_until_index(index: int) -> None:
while self._processed_count < index:
await asyncio.sleep(0.05)
Expand Down
53 changes: 29 additions & 24 deletions python/langsmith/evaluation/_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -534,12 +534,6 @@ def evaluate_existing(
)


class ExperimentResultRow(TypedDict):
run: schemas.Run
example: schemas.Example
evaluation_results: EvaluationResults


class ExperimentResults:
"""Represents the results of an evaluate() call.

Expand All @@ -554,8 +548,8 @@ class ExperimentResults:

def __init__(self, experiment_manager: _ExperimentManager, blocking: bool = True):
self._manager = experiment_manager
self._results: List[ExperimentResultRow] = []
self._queue: queue.Queue[ExperimentResultRow] = queue.Queue()
self._results: List[schemas.ExperimentResultRow] = []
self._queue: queue.Queue[schemas.ExperimentResultRow] = queue.Queue()
self._processing_complete = threading.Event()
if not blocking:
self._thread: Optional[threading.Thread] = threading.Thread(
Expand All @@ -570,7 +564,7 @@ def __init__(self, experiment_manager: _ExperimentManager, blocking: bool = True
def experiment_name(self) -> str:
return self._manager.experiment_name

def __iter__(self) -> Iterator[ExperimentResultRow]:
def __iter__(self) -> Iterator[schemas.ExperimentResultRow]:
ix = 0
while (
not self._processing_complete.is_set()
Expand Down Expand Up @@ -1439,6 +1433,7 @@ def with_evaluators(
# Split the generator into three so the manager
# can consume each value individually.
r1, r2, r3 = itertools.tee(experiment_results, 3)
# print("FOOOO", [result["evaluation_results"] for result in r3])
isahers1 marked this conversation as resolved.
Show resolved Hide resolved
return _ExperimentManager(
(result["example"] for result in r1),
experiment=self._experiment,
Expand All @@ -1459,7 +1454,9 @@ def with_summary_evaluators(
wrapped_evaluators = _wrap_summary_evaluators(summary_evaluators)
context = copy_context()
aggregate_feedback_gen = context.run(
self._apply_summary_evaluators, wrapped_evaluators
self._apply_summary_evaluators,
wrapped_evaluators,
[r for r in self.get_results()],
)
return _ExperimentManager(
self.examples,
Expand All @@ -1473,12 +1470,12 @@ def with_summary_evaluators(
upload_results=self._upload_results,
)

def get_results(self) -> Iterable[ExperimentResultRow]:
def get_results(self) -> Iterable[schemas.ExperimentResultRow]:
"""Return the traces, evaluation results, and associated examples."""
for run, example, evaluation_results in zip(
self.runs, self.examples, self.evaluation_results
):
yield ExperimentResultRow(
yield schemas.ExperimentResultRow(
run=run,
example=example,
evaluation_results=evaluation_results,
Expand Down Expand Up @@ -1544,9 +1541,9 @@ def _predict(
def _run_evaluators(
self,
evaluators: Sequence[RunEvaluator],
current_results: ExperimentResultRow,
current_results: schemas.ExperimentResultRow,
executor: cf.ThreadPoolExecutor,
) -> ExperimentResultRow:
) -> schemas.ExperimentResultRow:
current_context = rh.get_tracing_context()
metadata = {
**(current_context["metadata"] or {}),
Expand Down Expand Up @@ -1619,7 +1616,7 @@ def _run_evaluators(
reader = example.attachments[attachment]["reader"]
reader.seek(0)

return ExperimentResultRow(
return schemas.ExperimentResultRow(
run=run,
example=example,
evaluation_results=eval_results,
Expand All @@ -1629,7 +1626,7 @@ def _score(
self,
evaluators: Sequence[RunEvaluator],
max_concurrency: Optional[int] = None,
) -> Iterable[ExperimentResultRow]:
) -> Iterable[schemas.ExperimentResultRow]:
"""Run the evaluators on the prediction stream.

Expects runs to be available in the manager.
Expand Down Expand Up @@ -1671,7 +1668,9 @@ def _score(
yield result

def _apply_summary_evaluators(
self, summary_evaluators: Sequence[SUMMARY_EVALUATOR_T]
self,
summary_evaluators: Sequence[SUMMARY_EVALUATOR_T],
evaluation_results: List[schemas.ExperimentResultRow],
) -> Generator[EvaluationResults, None, None]:
runs, examples = [], []
for run, example in zip(self.runs, self.examples):
Expand Down Expand Up @@ -1699,7 +1698,9 @@ def _apply_summary_evaluators(
):
for evaluator in summary_evaluators:
try:
summary_eval_result = evaluator(runs, examples)
summary_eval_result = evaluator(
runs, examples, evaluation_results
)
# TODO: Expose public API for this.
flattened_results = self.client._select_eval_results(
summary_eval_result,
Expand Down Expand Up @@ -1793,16 +1794,20 @@ def _wrap(evaluator: SUMMARY_EVALUATOR_T) -> SUMMARY_EVALUATOR_T:

@functools.wraps(evaluator)
def _wrapper_inner(
runs: Sequence[schemas.Run], examples: Sequence[schemas.Example]
runs: Sequence[schemas.Run],
examples: Sequence[schemas.Example],
evaluation_results: Sequence[schemas.ExperimentResultRow],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we just pass in the EvaluationResults not the whole row?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's easier to associate with which example the result came from - which I imagine might be useful? Not 100% sure, up to you just lmk what to do

) -> Union[EvaluationResult, EvaluationResults]:
@rh.traceable(name=eval_name)
def _wrapper_super_inner(
runs_: str, examples_: str
runs_: str, examples_: str, evaluation_results_: str
) -> Union[EvaluationResult, EvaluationResults]:
return evaluator(list(runs), list(examples))
return evaluator(list(runs), list(examples), list(evaluation_results))

return _wrapper_super_inner(
f"Runs[] (Length={len(runs)})", f"Examples[] (Length={len(examples)})"
f"Runs[] (Length={len(runs)})",
f"Examples[] (Length={len(examples)})",
f"EvaluationResults[] (Length={len(evaluation_results)})",
)

return _wrapper_inner
Expand Down Expand Up @@ -2173,7 +2178,7 @@ def extract_evaluation_results_keys(node, variables):


def _to_pandas(
results: list[ExperimentResultRow],
results: list[schemas.ExperimentResultRow],
start: Optional[int] = 0,
end: Optional[int] = None,
):
Expand All @@ -2190,7 +2195,7 @@ def _to_pandas(


def _flatten_experiment_results(
results: list[ExperimentResultRow],
results: list[schemas.ExperimentResultRow],
start: Optional[int] = 0,
end: Optional[int] = None,
):
Expand Down
Loading
Loading