diff --git a/python/langsmith/evaluation/_arunner.py b/python/langsmith/evaluation/_arunner.py index 7b29241e6..2150b1fac 100644 --- a/python/langsmith/evaluation/_arunner.py +++ b/python/langsmith/evaluation/_arunner.py @@ -491,15 +491,24 @@ async def _aevaluate( cache_path = None with ls_utils.with_optional_cache(cache_path, ignore_hosts=[client.api_url]): if is_async_target: - manager = await manager.awith_predictions( - cast(ATARGET_T, target), max_concurrency=max_concurrency - ) - if evaluators: - manager = await manager.awith_evaluators( - evaluators, max_concurrency=max_concurrency - ) - if summary_evaluators: - manager = await manager.awith_summary_evaluators(summary_evaluators) + if evaluators: + # Run predictions and evaluations in a single pipeline + manager = await manager.awith_predictions_and_evaluators( + cast(ATARGET_T, target), evaluators, max_concurrency=max_concurrency + ) + else: + manager = await manager.awith_predictions( + cast(ATARGET_T, target), max_concurrency=max_concurrency + ) + if summary_evaluators: + manager = await manager.awith_summary_evaluators(summary_evaluators) + else: + if evaluators: + manager = await manager.awith_evaluators( + evaluators, max_concurrency=max_concurrency + ) + if summary_evaluators: + manager = await manager.awith_summary_evaluators(summary_evaluators) results = AsyncExperimentResults(manager) if blocking: await results.wait() @@ -642,6 +651,61 @@ async def astart(self) -> _AsyncExperimentManager: upload_results=self._upload_results, ) + async def awith_predictions_and_evaluators( + self, + target: ATARGET_T, + evaluators: Sequence[Union[EVALUATOR_T, AEVALUATOR_T]], + /, + max_concurrency: Optional[int] = None, + ) -> _AsyncExperimentManager: + """Run predictions and evaluations in a single pipeline. + + This allows evaluators to process results as soon as they're available from + the target function, rather than waiting for all predictions to complete first. + """ + evaluators = _resolve_evaluators(evaluators) + + if not hasattr(self, "_evaluator_executor"): + self._evaluator_executor = cf.ThreadPoolExecutor(max_workers=4) + + async def process_examples(): + async for pred in self._apredict( + target, + max_concurrency=max_concurrency, + include_attachments=_include_attachments(target), + ): + example, run = pred["example"], pred["run"] + result = self._arun_evaluators( + evaluators, + { + "run": run, + "example": example, + "evaluation_results": {"results": []}, + }, + executor=self._evaluator_executor, + ) + yield result + + experiment_results = aitertools.aiter_with_concurrency( + max_concurrency, + process_examples(), + _eager_consumption_timeout=0.001, + ) + + r1, r2, r3 = aitertools.atee(experiment_results, 3, lock=asyncio.Lock()) + + return _AsyncExperimentManager( + (result["example"] async for result in r1), + experiment=self._experiment, + metadata=self._metadata, + client=self.client, + runs=(result["run"] async for result in r2), + evaluation_results=(result["evaluation_results"] async for result in r3), + summary_results=self._summary_results, + include_attachments=self._include_attachments, + upload_results=self._upload_results, + ) + async def awith_predictions( self, target: ATARGET_T, @@ -796,15 +860,20 @@ async def _arun_evaluators( run = current_results["run"] example = current_results["example"] eval_results = current_results["evaluation_results"] - for evaluator in evaluators: + lock = asyncio.Lock() + + async def _run_single_evaluator(evaluator): try: evaluator_response = await evaluator.aevaluate_run( run=run, example=example, ) - eval_results["results"].extend( - self.client._select_eval_results(evaluator_response) + selected_results = self.client._select_eval_results( + evaluator_response ) + async with lock: + eval_results["results"].extend(selected_results) + if self._upload_results: self.client._log_evaluation_feedback( evaluator_response, run=run, _executor=executor @@ -824,9 +893,11 @@ async def _arun_evaluators( for key in feedback_keys ] ) - eval_results["results"].extend( - self.client._select_eval_results(error_response) + selected_results = self.client._select_eval_results( + error_response ) + async with lock: + eval_results["results"].extend(selected_results) if self._upload_results: self.client._log_evaluation_feedback( error_response, run=run, _executor=executor @@ -839,15 +910,10 @@ async def _arun_evaluators( f" run {run.id}: {repr(e)}", exc_info=True, ) - logger.error( - f"Error running evaluator {repr(evaluator)} on" - f" run {run.id}: {repr(e)}", - exc_info=True, - ) - if example.attachments is not None: - for attachment in example.attachments: - reader = example.attachments[attachment]["reader"] - reader.seek(0) + + await asyncio.gather( + *[_run_single_evaluator(evaluator) for evaluator in evaluators] + ) return ExperimentResultRow( run=run, example=example, @@ -1064,10 +1130,6 @@ def _get_run(r: run_trees.RunTree) -> None: client=client, ), ) - if include_attachments and example.attachments is not None: - for attachment in example.attachments: - reader = example.attachments[attachment]["reader"] - reader.seek(0) except Exception as e: logger.error( f"Error running target function: {e}", exc_info=True, stacklevel=1 diff --git a/python/langsmith/evaluation/evaluator.py b/python/langsmith/evaluation/evaluator.py index a1505699a..fc1f07685 100644 --- a/python/langsmith/evaluation/evaluator.py +++ b/python/langsmith/evaluation/evaluator.py @@ -4,6 +4,7 @@ import asyncio import inspect +import io import uuid from abc import abstractmethod from typing import ( @@ -666,7 +667,15 @@ async def awrapper( "example": example, "inputs": example.inputs if example else {}, "outputs": run.outputs or {}, - "attachments": example.attachments or {} if example else {}, + "attachments": { + name: { + "presigned_url": value["presigned_url"], + "reader": io.BytesIO(value["reader"].getvalue()), + } + for name, value in (example.attachments or {}).items() + } + if example + else {}, "reference_outputs": example.outputs or {} if example else {}, } args = (arg_map[arg] for arg in positional_args) diff --git a/python/langsmith/schemas.py b/python/langsmith/schemas.py index acedaf177..a6ca393b6 100644 --- a/python/langsmith/schemas.py +++ b/python/langsmith/schemas.py @@ -76,14 +76,14 @@ def read(self, size: int = -1) -> bytes: """Read function.""" ... - def write(self, b: bytes) -> int: - """Write function.""" - ... - def seek(self, offset: int, whence: int = 0) -> int: """Seek function.""" ... + def getvalue(self) -> bytes: + """Get value function.""" + ... + class ExampleBase(BaseModel): """Example base model.""" diff --git a/python/tests/integration_tests/test_client.py b/python/tests/integration_tests/test_client.py index 3bcd9d04c..8e3e5cd8d 100644 --- a/python/tests/integration_tests/test_client.py +++ b/python/tests/integration_tests/test_client.py @@ -1488,15 +1488,18 @@ async def test_aevaluate_with_attachments(langchain_client: Client) -> None: data_type=DataType.kv, ) - example = ExampleUploadWithAttachments( - inputs={"question": "What is shown in the image?"}, - outputs={"answer": "test image"}, - attachments={ - "image": ("image/png", b"fake image data for testing"), - }, - ) + examples = [ + ExampleUploadWithAttachments( + inputs={"question": "What is shown in the image?"}, + outputs={"answer": "test image"}, + attachments={ + "image": ("image/png", b"fake image data for testing"), + }, + ) + for i in range(10) + ] - langchain_client.upload_examples_multipart(dataset_id=dataset.id, uploads=[example]) + langchain_client.upload_examples_multipart(dataset_id=dataset.id, uploads=examples) async def target( inputs: Dict[str, Any], attachments: Dict[str, Any] @@ -1542,7 +1545,7 @@ async def evaluator_2( max_concurrency=3, ) - assert len(results) == 2 + assert len(results) == 20 async for result in results: assert result["evaluation_results"]["results"][0].score == 1.0 assert result["evaluation_results"]["results"][1].score == 1.0