From 977340a4ac31e5b113480b3b33a1aef00d7785b3 Mon Sep 17 00:00:00 2001 From: isaac hershenson Date: Mon, 16 Dec 2024 15:47:31 -0800 Subject: [PATCH] wip --- python/langsmith/evaluation/_arunner.py | 104 ++++++++++++++++++------ 1 file changed, 79 insertions(+), 25 deletions(-) diff --git a/python/langsmith/evaluation/_arunner.py b/python/langsmith/evaluation/_arunner.py index 39d324ba7..c0f9822b8 100644 --- a/python/langsmith/evaluation/_arunner.py +++ b/python/langsmith/evaluation/_arunner.py @@ -491,15 +491,24 @@ async def _aevaluate( cache_path = None with ls_utils.with_optional_cache(cache_path, ignore_hosts=[client.api_url]): if is_async_target: - manager = await manager.awith_predictions( - cast(ATARGET_T, target), max_concurrency=max_concurrency - ) - if evaluators: - manager = await manager.awith_evaluators( - evaluators, max_concurrency=max_concurrency - ) - if summary_evaluators: - manager = await manager.awith_summary_evaluators(summary_evaluators) + if evaluators: + # Run predictions and evaluations in a single pipeline + manager = await manager.awith_predictions_and_evaluators( + cast(ATARGET_T, target), evaluators, max_concurrency=max_concurrency + ) + else: + manager = await manager.awith_predictions( + cast(ATARGET_T, target), max_concurrency=max_concurrency + ) + if summary_evaluators: + manager = await manager.awith_summary_evaluators(summary_evaluators) + else: + if evaluators: + manager = await manager.awith_evaluators( + evaluators, max_concurrency=max_concurrency + ) + if summary_evaluators: + manager = await manager.awith_summary_evaluators(summary_evaluators) results = AsyncExperimentResults(manager) if blocking: await results.wait() @@ -642,6 +651,56 @@ async def astart(self) -> _AsyncExperimentManager: upload_results=self._upload_results, ) + async def awith_predictions_and_evaluators( + self, + target: ATARGET_T, + evaluators: Sequence[Union[EVALUATOR_T, AEVALUATOR_T]], + /, + max_concurrency: Optional[int] = None, + ) -> _AsyncExperimentManager: + """Run predictions and evaluations in a single pipeline. + + This allows evaluators to process results as soon as they're available from + the target function, rather than waiting for all predictions to complete first. + """ + evaluators = _resolve_evaluators(evaluators) + + if not hasattr(self, '_evaluator_executor'): + self._evaluator_executor = cf.ThreadPoolExecutor(max_workers=4) + async def process_examples(): + async for pred in self._apredict( + target, + max_concurrency=max_concurrency, + include_attachments=_include_attachments(target), + ): + example, run = pred["example"], pred["run"] + result = self._arun_evaluators( + evaluators, + {"run": run, "example": example, "evaluation_results": {"results": []}}, + executor=self._evaluator_executor, + ) + yield result + + experiment_results = aitertools.aiter_with_concurrency( + max_concurrency, + process_examples(), + _eager_consumption_timeout=0.001, + ) + + r1, r2, r3 = aitertools.atee(experiment_results, 3, lock=asyncio.Lock()) + + return _AsyncExperimentManager( + (result["example"] async for result in r1), + experiment=self._experiment, + metadata=self._metadata, + client=self.client, + runs=(result["run"] async for result in r2), + evaluation_results=(result["evaluation_results"] async for result in r3), + summary_results=self._summary_results, + include_attachments=self._include_attachments, + upload_results=self._upload_results, + ) + async def awith_predictions( self, target: ATARGET_T, @@ -796,15 +855,17 @@ async def _arun_evaluators( run = current_results["run"] example = current_results["example"] eval_results = current_results["evaluation_results"] - for evaluator in evaluators: + lock = asyncio.Lock() + async def _run_single_evaluator(evaluator): try: evaluator_response = await evaluator.aevaluate_run( run=run, example=example, ) - eval_results["results"].extend( - self.client._select_eval_results(evaluator_response) - ) + selected_results = self.client._select_eval_results(evaluator_response) + async with lock: + eval_results["results"].extend(selected_results) + if self._upload_results: self.client._log_evaluation_feedback( evaluator_response, run=run, _executor=executor @@ -824,9 +885,9 @@ async def _arun_evaluators( for key in feedback_keys ] ) - eval_results["results"].extend( - self.client._select_eval_results(error_response) - ) + selected_results = self.client._select_eval_results(error_response) + async with lock: + eval_results["results"].extend(selected_results) if self._upload_results: self.client._log_evaluation_feedback( error_response, run=run, _executor=executor @@ -839,15 +900,8 @@ async def _arun_evaluators( f" run {run.id}: {repr(e)}", exc_info=True, ) - logger.error( - f"Error running evaluator {repr(evaluator)} on" - f" run {run.id}: {repr(e)}", - exc_info=True, - ) - if example.attachments is not None: - for attachment in example.attachments: - reader = example.attachments[attachment]["reader"] - reader.seek(0) + + await asyncio.gather(*[_run_single_evaluator(evaluator) for evaluator in evaluators]) return ExperimentResultRow( run=run, example=example,