Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

python[patch]: evaluate local mode #1224

Merged
merged 29 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 32 additions & 16 deletions python/langsmith/evaluation/_arunner.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""V2 Evaluation Interface."""

Check notice on line 1 in python/langsmith/evaluation/_arunner.py

View workflow job for this annotation

GitHub Actions / benchmark

Benchmark results

......................................... WARNING: the benchmark result may be unstable * the standard deviation (125 ms) is 17% of the mean (723 ms) Try to rerun the benchmark with more runs, values and/or loops. Run 'python -m pyperf system tune' command to reduce the system jitter. Use pyperf stats, pyperf dump and pyperf hist to analyze results. Use --quiet option to hide these warnings. create_5_000_run_trees: Mean +- std dev: 723 ms +- 125 ms ......................................... WARNING: the benchmark result may be unstable * the standard deviation (176 ms) is 13% of the mean (1.37 sec) Try to rerun the benchmark with more runs, values and/or loops. Run 'python -m pyperf system tune' command to reduce the system jitter. Use pyperf stats, pyperf dump and pyperf hist to analyze results. Use --quiet option to hide these warnings. create_10_000_run_trees: Mean +- std dev: 1.37 sec +- 0.18 sec ......................................... WARNING: the benchmark result may be unstable * the standard deviation (167 ms) is 12% of the mean (1.38 sec) Try to rerun the benchmark with more runs, values and/or loops. Run 'python -m pyperf system tune' command to reduce the system jitter. Use pyperf stats, pyperf dump and pyperf hist to analyze results. Use --quiet option to hide these warnings. create_20_000_run_trees: Mean +- std dev: 1.38 sec +- 0.17 sec ......................................... dumps_class_nested_py_branch_and_leaf_200x400: Mean +- std dev: 703 us +- 9 us ......................................... dumps_class_nested_py_leaf_50x100: Mean +- std dev: 25.1 ms +- 0.4 ms ......................................... dumps_class_nested_py_leaf_100x200: Mean +- std dev: 104 ms +- 3 ms ......................................... dumps_dataclass_nested_50x100: Mean +- std dev: 25.5 ms +- 0.4 ms ......................................... WARNING: the benchmark result may be unstable * the standard deviation (16.5 ms) is 25% of the mean (67.1 ms) Try to rerun the benchmark with more runs, values and/or loops. Run 'python -m pyperf system tune' command to reduce the system jitter. Use pyperf stats, pyperf dump and pyperf hist to analyze results. Use --quiet option to hide these warnings. dumps_pydantic_nested_50x100: Mean +- std dev: 67.1 ms +- 16.5 ms ......................................... WARNING: the benchmark result may be unstable * the standard deviation (31.6 ms) is 14% of the mean (222 ms) Try to rerun the benchmark with more runs, values and/or loops. Run 'python -m pyperf system tune' command to reduce the system jitter. Use pyperf stats, pyperf dump and pyperf hist to analyze results. Use --quiet option to hide these warnings. dumps_pydanticv1_nested_50x100: Mean +- std dev: 222 ms +- 32 ms

Check notice on line 1 in python/langsmith/evaluation/_arunner.py

View workflow job for this annotation

GitHub Actions / benchmark

Comparison against main

+-----------------------------------+----------+------------------------+ | Benchmark | main | changes | +===================================+==========+========================+ | create_10_000_run_trees | 1.42 sec | 1.37 sec: 1.03x faster | +-----------------------------------+----------+------------------------+ | dumps_class_nested_py_leaf_50x100 | 25.6 ms | 25.1 ms: 1.02x faster | +-----------------------------------+----------+------------------------+ | dumps_dataclass_nested_50x100 | 25.9 ms | 25.5 ms: 1.02x faster | +-----------------------------------+----------+------------------------+ | Geometric mean | (ref) | 1.01x faster | +-----------------------------------+----------+------------------------+ Benchmark hidden because not significant (6): dumps_pydanticv1_nested_50x100, dumps_pydantic_nested_50x100, dumps_class_nested_py_branch_and_leaf_200x400, dumps_class_nested_py_leaf_100x200, create_20_000_run_trees, create_5_000_run_trees

from __future__ import annotations

Expand Down Expand Up @@ -83,6 +83,7 @@
client: Optional[langsmith.Client] = None,
blocking: bool = True,
experiment: Optional[Union[schemas.TracerSession, str, uuid.UUID]] = None,
upload_results: bool = True,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flyby comment: the fact that we have to update so many call sites feels pretty problematic

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wdym call sites?

Our data model for evals is indeed too complicated lol -tracer sessions, feedback, runs in multiple sessions, ay caramba

) -> AsyncExperimentResults:
r"""Evaluate an async target system or function on a given dataset.

Expand Down Expand Up @@ -260,6 +261,7 @@
client=client,
blocking=blocking,
experiment=experiment,
upload_results=upload_results,
)


Expand Down Expand Up @@ -379,6 +381,7 @@
client: Optional[langsmith.Client] = None,
blocking: bool = True,
experiment: Optional[Union[schemas.TracerSession, str, uuid.UUID]] = None,
upload_results: bool = True,
) -> AsyncExperimentResults:
is_async_target = (
asyncio.iscoroutinefunction(target)
Expand All @@ -401,6 +404,7 @@
description=description,
num_repetitions=num_repetitions,
runs=runs,
upload_results=upload_results,
).astart()
cache_dir = ls_utils.get_cache_dir(None)
if cache_dir is not None:
Expand Down Expand Up @@ -461,6 +465,7 @@
summary_results: Optional[AsyncIterable[EvaluationResults]] = None,
description: Optional[str] = None,
num_repetitions: int = 1,
upload_results: bool = True,
):
super().__init__(
experiment=experiment,
Expand All @@ -476,6 +481,7 @@
self._evaluation_results = evaluation_results
self._summary_results = summary_results
self._num_repetitions = num_repetitions
self._upload_results = upload_results

async def aget_examples(self) -> AsyncIterator[schemas.Example]:
if self._examples is None:
Expand Down Expand Up @@ -535,7 +541,7 @@
"No examples found in the dataset."
"Please ensure the data provided to aevaluate is not empty."
)
project = self._get_project(first_example)
project = self._get_project(first_example) if self._upload_results else None
self._print_experiment_start(project, first_example)
self._metadata["num_repetitions"] = self._num_repetitions
return self.__class__(
Expand All @@ -545,6 +551,7 @@
client=self.client,
runs=self._runs,
evaluation_results=self._evaluation_results,
upload_results=self._upload_results,
)

async def awith_predictions(
Expand All @@ -561,6 +568,7 @@
metadata=self._metadata,
client=self.client,
runs=(pred["run"] async for pred in r2),
upload_results=self._upload_results,
)

async def awith_evaluators(
Expand All @@ -580,6 +588,7 @@
runs=(result["run"] async for result in r2),
evaluation_results=(result["evaluation_results"] async for result in r3),
summary_results=self._summary_results,
upload_results=self._upload_results,
)

async def awith_summary_evaluators(
Expand All @@ -596,6 +605,7 @@
runs=self.aget_runs(),
evaluation_results=self._evaluation_results,
summary_results=aggregate_feedback_gen,
upload_results=self._upload_results,
)

async def aget_results(self) -> AsyncIterator[ExperimentResultRow]:
Expand Down Expand Up @@ -675,7 +685,7 @@
**current_context,
"project_name": "evaluators",
"metadata": metadata,
"enabled": True,
"enabled": "local" if not self._upload_results else True,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does local mean here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

means we're tracing (ie tracking intermediate steps) but not sending to langsmith

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case, where do they get stored?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we're keeping all the runs in memory currently (on the ExperimentManager)

"client": self.client,
}
):
Expand All @@ -689,10 +699,12 @@
example=example,
)
eval_results["results"].extend(
self.client._select_eval_results(evaluator_response)
)
if self._upload_results:
self.client._log_evaluation_feedback(
evaluator_response, run=run, _executor=executor
)
)
except Exception as e:
try:
feedback_keys = _extract_feedback_keys(evaluator)
Expand All @@ -709,11 +721,12 @@
]
)
eval_results["results"].extend(
# TODO: This is a hack
self.client._select_eval_results(error_response)
)
if self._upload_results:
self.client._log_evaluation_feedback(
error_response, run=run, _executor=executor
)
)
except Exception as e2:
logger.debug(f"Error parsing feedback keys: {e2}")
pass
Expand Down Expand Up @@ -758,7 +771,7 @@
**current_context,
"project_name": "evaluators",
"metadata": metadata,
"enabled": True,
"enabled": "local" if not self._upload_results else True,
"client": self.client,
}
):
Expand All @@ -770,16 +783,17 @@
fn_name=evaluator.__name__,
)
aggregate_feedback.extend(flattened_results)
for result in flattened_results:
feedback = result.dict(exclude={"target_run_id"})
evaluator_info = feedback.pop("evaluator_info", None)
await aitertools.aio_to_thread(
self.client.create_feedback,
**feedback,
run_id=None,
project_id=project_id,
source_info=evaluator_info,
)
if self._upload_results:
for result in flattened_results:
feedback = result.dict(exclude={"target_run_id"})
evaluator_info = feedback.pop("evaluator_info", None)
await aitertools.aio_to_thread(
self.client.create_feedback,
**feedback,
run_id=None,
project_id=project_id,
source_info=evaluator_info,
)
Comment on lines +789 to +799
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of comments here:

  1. What's the rationale of awaiting in the for loop over gathering the results with some max concurrency?
  2. Second, in most cases now, (when multipart is enabled) we now enqueue the feedback to the TracingQueue for asyncronous background processing. So it really doesn't make sense to keep this logic to run this in an executor? cc @hinthornw for this too

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fwiw the only change here was condition in L786 (rest is just indentation)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya we can asyncio.gather()

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re: background processing, it doesn't get put in the background queue because we lack a trace_id in this case

except Exception as e:
logger.error(
f"Error running summary evaluator {repr(evaluator)}: {e}",
Expand Down Expand Up @@ -815,6 +829,8 @@
return list(splits)

async def _aend(self) -> None:
if not self._upload_results:
return
experiment = self._experiment
if experiment is None:
raise ValueError("Experiment not started yet.")
Expand Down
Loading
Loading