diff --git a/python/langsmith/evaluation/_arunner.py b/python/langsmith/evaluation/_arunner.py index 7b29241e6..d69ee2541 100644 --- a/python/langsmith/evaluation/_arunner.py +++ b/python/langsmith/evaluation/_arunner.py @@ -5,6 +5,7 @@ import asyncio import concurrent.futures as cf import datetime +import io import logging import pathlib import uuid @@ -47,7 +48,6 @@ _load_experiment, _load_tqdm, _load_traces, - _make_fresh_examples, _resolve_data, _resolve_evaluators, _resolve_experiment, @@ -480,7 +480,13 @@ async def _aevaluate( num_repetitions=num_repetitions, runs=runs, include_attachments=_include_attachments(target) - or _evaluators_include_attachments(evaluators), + or _evaluators_include_attachments(evaluators) > 0, + reuse_attachments=num_repetitions + * ( + int(_include_attachments(target)) + + _evaluators_include_attachments(evaluators) + ) + > 1, upload_results=upload_results, ).astart() cache_dir = ls_utils.get_cache_dir(None) @@ -491,15 +497,24 @@ async def _aevaluate( cache_path = None with ls_utils.with_optional_cache(cache_path, ignore_hosts=[client.api_url]): if is_async_target: - manager = await manager.awith_predictions( - cast(ATARGET_T, target), max_concurrency=max_concurrency - ) - if evaluators: - manager = await manager.awith_evaluators( - evaluators, max_concurrency=max_concurrency - ) - if summary_evaluators: - manager = await manager.awith_summary_evaluators(summary_evaluators) + if evaluators: + # Run predictions and evaluations in a single pipeline + manager = await manager.awith_predictions_and_evaluators( + cast(ATARGET_T, target), evaluators, max_concurrency=max_concurrency + ) + else: + manager = await manager.awith_predictions( + cast(ATARGET_T, target), max_concurrency=max_concurrency + ) + if summary_evaluators: + manager = await manager.awith_summary_evaluators(summary_evaluators) + else: + if evaluators: + manager = await manager.awith_evaluators( + evaluators, max_concurrency=max_concurrency + ) + if summary_evaluators: + manager = await manager.awith_summary_evaluators(summary_evaluators) results = AsyncExperimentResults(manager) if blocking: await results.wait() @@ -528,6 +543,18 @@ class _AsyncExperimentManager(_ExperimentManagerMixin): sresults for the experiment. summary_results (Optional[Iterable[EvaluationResults]]): The aggregate results for the experiment. + num_repetitions (Optional[int], default=1): The number of repetitions for + the experiment. + include_attachments (Optional[bool], default=False): Whether to include + attachments. This is used for when we pull the examples for the experiment. + reuse_attachments (Optional[bool], default=False): Whether to reuse attachments + from examples. This is True if we need to reuse attachments across multiple + target/evaluator functions. + upload_results (Optional[bool], default=True): Whether to upload results + to Langsmith. + attachment_raw_data_dict (Optional[dict]): A dictionary to store raw data + for attachments. Only used if we reuse attachments across multiple + target/evaluator functions. """ def __init__( @@ -543,7 +570,9 @@ def __init__( description: Optional[str] = None, num_repetitions: int = 1, include_attachments: bool = False, + reuse_attachments: bool = False, upload_results: bool = True, + attachment_raw_data_dict: Optional[dict] = None, ): super().__init__( experiment=experiment, @@ -560,7 +589,54 @@ def __init__( self._summary_results = summary_results self._num_repetitions = num_repetitions self._include_attachments = include_attachments + self._reuse_attachments = reuse_attachments self._upload_results = upload_results + self._attachment_raw_data_dict = attachment_raw_data_dict + + def _reset_example_attachments(self, example: schemas.Example) -> schemas.Example: + """Reset attachment readers for an example. + + This is only in the case that an attachment is going to be used by more + than 1 callable (target + evaluators). In that case we keep a single copy + of the attachment data in self._attachment_raw_data_dict, and create + readers from that data. This makes it so that we don't have to keep + copies of the same data in memory, instead we can just create readers + from the same data. + """ + if not hasattr(example, "attachments") or not example.attachments: + return example + + new_attachments: dict[str, schemas.AttachmentInfo] = {} + for name, attachment in example.attachments.items(): + if ( + self._attachment_raw_data_dict is not None + and str(example.id) + name in self._attachment_raw_data_dict + ): + new_attachments[name] = { + "presigned_url": attachment["presigned_url"], + "reader": io.BytesIO( + self._attachment_raw_data_dict[str(example.id) + name] + ), + "mime_type": attachment["mime_type"], + } + else: + new_attachments[name] = attachment + + # Create a new Example instance with the updated attachments + return schemas.Example( + id=example.id, + created_at=example.created_at, + dataset_id=example.dataset_id, + inputs=example.inputs, + outputs=example.outputs, + metadata=example.metadata, + modified_at=example.modified_at, + runs=example.runs, + source_run_id=example.source_run_id, + attachments=new_attachments, + _host_url=example._host_url, + _tenant_id=example._tenant_id, + ) async def aget_examples(self) -> AsyncIterator[schemas.Example]: if self._examples is None: @@ -569,11 +645,23 @@ async def aget_examples(self) -> AsyncIterator[schemas.Example]: client=self.client, include_attachments=self._include_attachments, ) + if self._reuse_attachments and self._attachment_raw_data_dict is None: + examples_copy, self._examples = aitertools.atee(self._examples) + self._attachment_raw_data_dict = { + str(e.id) + name: value["reader"].read() + async for e in examples_copy + for name, value in (e.attachments or {}).items() + } if self._num_repetitions > 1: examples_list = [example async for example in self._examples] self._examples = async_chain_from_iterable( [ - async_iter_from_list(_make_fresh_examples(examples_list)) + async_iter_from_list( + [ + self._reset_example_attachments(example) + for example in examples_list + ] + ) for _ in range(self._num_repetitions) ] ) @@ -639,6 +727,104 @@ async def astart(self) -> _AsyncExperimentManager: runs=self._runs, evaluation_results=self._evaluation_results, include_attachments=self._include_attachments, + reuse_attachments=self._reuse_attachments, + upload_results=self._upload_results, + attachment_raw_data_dict=self._attachment_raw_data_dict, + ) + + def _get_example_with_readers(self, example: schemas.Example) -> schemas.Example: + new_attachments: dict[str, schemas.AttachmentInfo] = {} + for name, attachment in (example.attachments or {}).items(): + if ( + self._attachment_raw_data_dict is not None + and str(example.id) + name in self._attachment_raw_data_dict + ): + reader = io.BytesIO( + self._attachment_raw_data_dict[str(example.id) + name] + ) + new_attachments[name] = { + "presigned_url": attachment["presigned_url"], + "reader": reader, + "mime_type": attachment["mime_type"], + } + else: + new_attachments[name] = attachment + + return schemas.Example( + id=example.id, + created_at=example.created_at, + dataset_id=example.dataset_id, + inputs=example.inputs, + outputs=example.outputs, + metadata=example.metadata, + modified_at=example.modified_at, + runs=example.runs, + source_run_id=example.source_run_id, + attachments=new_attachments, + _host_url=example._host_url, + _tenant_id=example._tenant_id, + ) + + async def awith_predictions_and_evaluators( + self, + target: ATARGET_T, + evaluators: Sequence[Union[EVALUATOR_T, AEVALUATOR_T]], + /, + max_concurrency: Optional[int] = None, + ) -> _AsyncExperimentManager: + """Run predictions and evaluations in a single pipeline. + + This allows evaluators to process results as soon as they're available from + the target function, rather than waiting for all predictions to complete first. + """ + evaluators = _resolve_evaluators(evaluators) + + if not hasattr(self, "_evaluator_executor"): + self._evaluator_executor = cf.ThreadPoolExecutor(max_workers=4) + + async def process_examples(): + """Create a single task per example. + + That task is to run the target function and all the evaluators + sequentially. + """ + async for pred in self._apredict( + target, + max_concurrency=max_concurrency, + include_attachments=_include_attachments(target), + ): + example, run = pred["example"], pred["run"] + result = self._arun_evaluators( + evaluators, + { + "run": run, + "example": example, + "evaluation_results": {"results": []}, + }, + executor=self._evaluator_executor, + ) + yield result + + # Run the per-example tasks with max-concurrency + # This guarantees that max_concurrency is the upper limit + # for the number of target/evaluators that can be run in parallel + experiment_results = aitertools.aiter_with_concurrency( + max_concurrency, + process_examples(), + _eager_consumption_timeout=0.001, + ) + + r1, r2, r3 = aitertools.atee(experiment_results, 3, lock=asyncio.Lock()) + + return _AsyncExperimentManager( + (result["example"] async for result in r1), + experiment=self._experiment, + metadata=self._metadata, + client=self.client, + runs=(result["run"] async for result in r2), + evaluation_results=(result["evaluation_results"] async for result in r3), + summary_results=self._summary_results, + include_attachments=self._include_attachments, upload_results=self._upload_results, ) @@ -740,7 +926,7 @@ async def predict_all(): # Yield the coroutine to be awaited later yield _aforward( fn, - example, + self._get_example_with_readers(example), self.experiment_name, self._metadata, self.client, @@ -796,19 +982,22 @@ async def _arun_evaluators( run = current_results["run"] example = current_results["example"] eval_results = current_results["evaluation_results"] - for evaluator in evaluators: + + async def _run_single_evaluator(evaluator): try: evaluator_response = await evaluator.aevaluate_run( run=run, - example=example, + example=self._get_example_with_readers(example), ) - eval_results["results"].extend( - self.client._select_eval_results(evaluator_response) + selected_results = self.client._select_eval_results( + evaluator_response ) + if self._upload_results: self.client._log_evaluation_feedback( evaluator_response, run=run, _executor=executor ) + return selected_results except Exception as e: try: feedback_keys = _extract_feedback_keys(evaluator) @@ -824,13 +1013,14 @@ async def _arun_evaluators( for key in feedback_keys ] ) - eval_results["results"].extend( - self.client._select_eval_results(error_response) + selected_results = self.client._select_eval_results( + error_response ) if self._upload_results: self.client._log_evaluation_feedback( error_response, run=run, _executor=executor ) + return selected_results except Exception as e2: logger.debug(f"Error parsing feedback keys: {e2}") pass @@ -839,15 +1029,14 @@ async def _arun_evaluators( f" run {run.id}: {repr(e)}", exc_info=True, ) - logger.error( - f"Error running evaluator {repr(evaluator)} on" - f" run {run.id}: {repr(e)}", - exc_info=True, - ) - if example.attachments is not None: - for attachment in example.attachments: - reader = example.attachments[attachment]["reader"] - reader.seek(0) + + all_results = [] + for evaluator in evaluators: + all_results.append(await _run_single_evaluator(evaluator)) + + for result in all_results: + if result is not None: + eval_results["results"].extend(result) return ExperimentResultRow( run=run, example=example, @@ -1064,10 +1253,6 @@ def _get_run(r: run_trees.RunTree) -> None: client=client, ), ) - if include_attachments and example.attachments is not None: - for attachment in example.attachments: - reader = example.attachments[attachment]["reader"] - reader.seek(0) except Exception as e: logger.error( f"Error running target function: {e}", exc_info=True, stacklevel=1 diff --git a/python/langsmith/evaluation/_runner.py b/python/langsmith/evaluation/_runner.py index b680973b5..a464f9cf2 100644 --- a/python/langsmith/evaluation/_runner.py +++ b/python/langsmith/evaluation/_runner.py @@ -37,7 +37,6 @@ cast, ) -import requests from typing_extensions import TypedDict, overload import langsmith @@ -1067,7 +1066,7 @@ def _evaluate( runs=runs, # Create or resolve the experiment. include_attachments=_include_attachments(target) - or _evaluators_include_attachments(evaluators), + or _evaluators_include_attachments(evaluators) > 0, upload_results=upload_results, ).start() cache_dir = ls_utils.get_cache_dir(None) @@ -1317,7 +1316,9 @@ def __init__( description: Optional[str] = None, num_repetitions: int = 1, include_attachments: bool = False, + reuse_attachments: bool = False, upload_results: bool = True, + attachment_raw_data_dict: Optional[dict] = None, ): super().__init__( experiment=experiment, @@ -1332,7 +1333,56 @@ def __init__( self._summary_results = summary_results self._num_repetitions = num_repetitions self._include_attachments = include_attachments + self._reuse_attachments = reuse_attachments self._upload_results = upload_results + self._attachment_raw_data_dict = attachment_raw_data_dict + + def _reset_example_attachment_readers( + self, example: schemas.Example + ) -> schemas.Example: + """Reset attachment readers for an example. + + This is only in the case that an attachment is going to be used by more + than 1 callable (target + evaluators). In that case we keep a single copy + of the attachment data in self._attachment_raw_data_dict, and create + readers from that data. This makes it so that we don't have to keep + copies of the same data in memory, instead we can just create readers + from the same data. + """ + if not hasattr(example, "attachments") or not example.attachments: + return example + + new_attachments: dict[str, schemas.AttachmentInfo] = {} + for name, attachment in example.attachments.items(): + if ( + self._attachment_raw_data_dict is not None + and str(example.id) + name in self._attachment_raw_data_dict + ): + new_attachments[name] = { + "presigned_url": attachment["presigned_url"], + "reader": io.BytesIO( + self._attachment_raw_data_dict[str(example.id) + name] + ), + "mime_type": attachment["mime_type"], + } + else: + new_attachments[name] = attachment + + # Create a new Example instance with the updated attachments + return schemas.Example( + id=example.id, + created_at=example.created_at, + dataset_id=example.dataset_id, + inputs=example.inputs, + outputs=example.outputs, + metadata=example.metadata, + modified_at=example.modified_at, + runs=example.runs, + source_run_id=example.source_run_id, + attachments=new_attachments, + _host_url=example._host_url, + _tenant_id=example._tenant_id, + ) @property def examples(self) -> Iterable[schemas.Example]: @@ -1342,10 +1392,20 @@ def examples(self) -> Iterable[schemas.Example]: client=self.client, include_attachments=self._include_attachments, ) + if self._reuse_attachments and self._attachment_raw_data_dict is None: + examples_copy, self._examples = itertools.tee(self._examples) + self._attachment_raw_data_dict = { + str(e.id) + name: value["reader"].read() + for e in examples_copy + for name, value in (e.attachments or {}).items() + } if self._num_repetitions > 1: examples_list = list(self._examples) self._examples = itertools.chain.from_iterable( - _make_fresh_examples(examples_list) + [ + self._reset_example_attachment_readers(example) + for example in examples_list + ] for _ in range(self._num_repetitions) ) self._examples, examples_iter = itertools.tee(self._examples) @@ -1390,7 +1450,9 @@ def start(self) -> _ExperimentManager: runs=self._runs, evaluation_results=self._evaluation_results, include_attachments=self._include_attachments, + reuse_attachments=self._reuse_attachments, upload_results=self._upload_results, + attachment_raw_data_dict=self._attachment_raw_data_dict, ) def with_predictions( @@ -1929,11 +1991,11 @@ def _ensure_traceable( def _evaluators_include_attachments( evaluators: Optional[Sequence[Union[EVALUATOR_T, AEVALUATOR_T]]], -) -> bool: +) -> int: if evaluators is None: - return False + return 0 - def evaluator_has_attachments(evaluator: Any) -> bool: + def evaluator_uses_attachments(evaluator: Any) -> bool: if not callable(evaluator): return False sig = inspect.signature(evaluator) @@ -1943,7 +2005,7 @@ def evaluator_has_attachments(evaluator: Any) -> bool: ] return any(p.name == "attachments" for p in positional_params) - return any(evaluator_has_attachments(e) for e in evaluators) + return sum(evaluator_uses_attachments(e) for e in evaluators) def _include_attachments( @@ -2232,43 +2294,3 @@ def _import_langchain_runnable() -> Optional[type]: def _is_langchain_runnable(o: Any) -> bool: return bool((Runnable := _import_langchain_runnable()) and isinstance(o, Runnable)) - - -def _reset_example_attachments(example: schemas.Example) -> schemas.Example: - """Reset attachment readers for an example.""" - if not hasattr(example, "attachments") or not example.attachments: - return example - - new_attachments = {} - for key, attachment in example.attachments.items(): - response = requests.get(attachment["presigned_url"], stream=True) - response.raise_for_status() - reader = io.BytesIO(response.content) - new_attachments[key] = { - "presigned_url": attachment["presigned_url"], - "reader": reader, - "mime_type": attachment.get("mime_type"), - } - - # Create a new Example instance with the updated attachments - return schemas.Example( - id=example.id, - created_at=example.created_at, - dataset_id=example.dataset_id, - inputs=example.inputs, - outputs=example.outputs, - metadata=example.metadata, - modified_at=example.modified_at, - runs=example.runs, - source_run_id=example.source_run_id, - attachments=new_attachments, - _host_url=example._host_url, - _tenant_id=example._tenant_id, - ) - - -def _make_fresh_examples( - _original_examples: List[schemas.Example], -) -> List[schemas.Example]: - """Create fresh copies of examples with reset readers.""" - return [_reset_example_attachments(example) for example in _original_examples] diff --git a/python/langsmith/schemas.py b/python/langsmith/schemas.py index 58515df77..0da222e90 100644 --- a/python/langsmith/schemas.py +++ b/python/langsmith/schemas.py @@ -76,14 +76,14 @@ def read(self, size: int = -1) -> bytes: """Read function.""" ... - def write(self, b: bytes) -> int: - """Write function.""" - ... - def seek(self, offset: int, whence: int = 0) -> int: """Seek function.""" ... + def getvalue(self) -> bytes: + """Get value function.""" + ... + class ExampleBase(BaseModel): """Example base model.""" diff --git a/python/tests/integration_tests/test_client.py b/python/tests/integration_tests/test_client.py index 3aa72baa1..d561a2888 100644 --- a/python/tests/integration_tests/test_client.py +++ b/python/tests/integration_tests/test_client.py @@ -1,5 +1,6 @@ """LangSmith langchain_client Integration Tests.""" +import asyncio import datetime import io import logging @@ -1460,6 +1461,7 @@ def evaluator_2( num_repetitions=2, ) + assert len(results) == 2 for result in results: assert result["evaluation_results"]["results"][0].score == 1.0 assert result["evaluation_results"]["results"][1].score == 1.0 @@ -1570,6 +1572,16 @@ def evaluator( for result in results: assert result["evaluation_results"]["results"][0].score == 1.0 + results = langchain_client.evaluate( + target, + data=dataset_name, + evaluators=[evaluator], + ) + + assert len(results) == 1 + for result in results: + assert result["evaluation_results"]["results"][0].score == 1.0 + langchain_client.delete_dataset(dataset_name=dataset_name) @@ -1629,15 +1641,18 @@ async def test_aevaluate_with_attachments(langchain_client: Client) -> None: data_type=DataType.kv, ) - example = ExampleUploadWithAttachments( - inputs={"question": "What is shown in the image?"}, - outputs={"answer": "test image"}, - attachments={ - "image": ("image/png", b"fake image data for testing"), - }, - ) + examples = [ + ExampleUploadWithAttachments( + inputs={"question": "What is shown in the image?", "index": i}, + outputs={"answer": "test image"}, + attachments={ + "image": ("text/plain", bytes(f"data: {i}", "utf-8")), + }, + ) + for i in range(10) + ] - langchain_client.upload_examples_multipart(dataset_id=dataset.id, uploads=[example]) + langchain_client.upload_examples_multipart(dataset_id=dataset.id, uploads=examples) async def target( inputs: Dict[str, Any], attachments: Dict[str, Any] @@ -1646,34 +1661,24 @@ async def target( assert "image" in attachments assert "presigned_url" in attachments["image"] image_data = attachments["image"]["reader"] - assert image_data.read() == b"fake image data for testing" + assert image_data.read() == bytes(f"data: {inputs['index']}", "utf-8") return {"answer": "test image"} async def evaluator_1( - outputs: dict, reference_outputs: dict, attachments: dict - ) -> Dict[str, Any]: + inputs: dict, outputs: dict, reference_outputs: dict, attachments: dict + ) -> bool: assert "image" in attachments assert "presigned_url" in attachments["image"] image_data = attachments["image"]["reader"] - assert image_data.read() == b"fake image data for testing" - return { - "score": float( - reference_outputs.get("answer") == outputs.get("answer") # type: ignore - ) - } + return image_data.read() == bytes(f"data: {inputs['index']}", "utf-8") async def evaluator_2( - outputs: dict, reference_outputs: dict, attachments: dict - ) -> Dict[str, Any]: + inputs: dict, outputs: dict, reference_outputs: dict, attachments: dict + ) -> bool: assert "image" in attachments assert "presigned_url" in attachments["image"] image_data = attachments["image"]["reader"] - assert image_data.read() == b"fake image data for testing" - return { - "score": float( - reference_outputs.get("answer") == outputs.get("answer") # type: ignore - ) - } + return image_data.read() == bytes(f"data: {inputs['index']}", "utf-8") results = await langchain_client.aevaluate( target, @@ -1683,11 +1688,21 @@ async def evaluator_2( max_concurrency=3, ) - assert len(results) == 2 + assert len(results) == 20 async for result in results: assert result["evaluation_results"]["results"][0].score == 1.0 assert result["evaluation_results"]["results"][1].score == 1.0 + results = await langchain_client.aevaluate( + target, + data=dataset_name, + evaluators=[], + num_repetitions=1, + max_concurrency=3, + ) + + assert len(results) == 10 + langchain_client.delete_dataset(dataset_name=dataset_name) @@ -2124,3 +2139,66 @@ def test_update_examples_multipart(langchain_client: Client) -> None: # Clean up langchain_client.delete_dataset(dataset_id=dataset.id) + + +async def test_aevaluate_max_concurrency(langchain_client: Client) -> None: + """Test max concurrency works as expected.""" + dataset_name = "__test_a_ton_of_feedback" + uuid4().hex[:4] + dataset = langchain_client.create_dataset( + dataset_name=dataset_name, + description="Test dataset for max concurrency", + ) + + examples = [ + ExampleUploadWithAttachments( + inputs={"query": "What's in this image?"}, + outputs={"answer": "A test image 1"}, + attachments={ + "image1": ("image/png", b"fake image data 1"), + "extra": ("text/plain", b"extra data"), + }, + ) + for _ in range(10) + ] + + langchain_client.upload_examples_multipart(dataset_id=dataset.id, uploads=examples) + + evaluators = [] + for _ in range(100): + + async def eval_func(inputs, outputs): + await asyncio.sleep(0.1) + return {"score": random.random()} + + evaluators.append(eval_func) + + async def target(inputs, attachments): + return {"foo": "bar"} + + start_time = time.time() + await langchain_client.aevaluate( + target, + data=dataset_name, + evaluators=evaluators, + max_concurrency=8, + ) + + end_time = time.time() + # this should proceed in a 8-2 manner, taking around 20 seconds total + assert end_time - start_time < 30 + assert end_time - start_time > 20 + + start_time = time.time() + await langchain_client.aevaluate( + target, + data=dataset_name, + evaluators=evaluators, + max_concurrency=4, + ) + + end_time = time.time() + # this should proceed in a 4-4-2 manner, taking around 30 seconds total + assert end_time - start_time < 40 + assert end_time - start_time > 30 + + langchain_client.delete_dataset(dataset_id=dataset.id) diff --git a/python/tests/unit_tests/evaluation/test_runner.py b/python/tests/unit_tests/evaluation/test_runner.py index e33d07fd5..f6e7a4704 100644 --- a/python/tests/unit_tests/evaluation/test_runner.py +++ b/python/tests/unit_tests/evaluation/test_runner.py @@ -603,7 +603,7 @@ def summary_eval_outputs_reference(outputs, reference_outputs): now = time.time() if last is None: elapsed = now - start - assert elapsed < 3 + assert elapsed < 3.05 deltas.append((now - last) if last is not None else 0) # type: ignore last = now total = now - start # type: ignore @@ -617,7 +617,6 @@ def summary_eval_outputs_reference(outputs, reference_outputs): tolerance = 3 assert total_slow < tolerance assert total_quick > (SPLIT_SIZE * NUM_REPETITIONS - 1) - tolerance - assert any([d > 1 for d in deltas]) async for r in results: assert r["run"].outputs["output"] == r["example"].inputs["in"] + 1 # type: ignore