From 6cf7c9b1e7e8fb64e46c88fe070d80c364f6c057 Mon Sep 17 00:00:00 2001 From: Bagatur <22008038+baskaryan@users.noreply.github.com> Date: Fri, 22 Nov 2024 13:51:12 -0500 Subject: [PATCH] python[patch]: `evaluate` local mode (#1224) Adds an `upload_results` flag to avoid tracing any runs (target or evaluator) or creating an experiment in langsmith: ```python from langsmith import evaluate results = evaluate( lambda x: x, data="Sample Dataset 3", evaluators=[lambda inputs: {"score": 1, "key": "correct"}], upload_results=False ) ``` --------- Co-authored-by: William Fu-Hinthorn <13333726+hinthornw@users.noreply.github.com> --- python/langsmith/evaluation/_arunner.py | 53 ++++--- python/langsmith/evaluation/_runner.py | 109 ++++++++------ python/langsmith/run_helpers.py | 29 ++-- python/langsmith/utils.py | 3 +- .../unit_tests/evaluation/test_runner.py | 133 +++++++++++------- python/tests/unit_tests/test_run_helpers.py | 76 +++++++--- 6 files changed, 254 insertions(+), 149 deletions(-) diff --git a/python/langsmith/evaluation/_arunner.py b/python/langsmith/evaluation/_arunner.py index f37901a4b..a8799b083 100644 --- a/python/langsmith/evaluation/_arunner.py +++ b/python/langsmith/evaluation/_arunner.py @@ -31,6 +31,7 @@ from langsmith import run_trees as rt from langsmith import utils as ls_utils from langsmith._internal import _aiter as aitertools +from langsmith._internal._beta_decorator import _warn_once from langsmith.evaluation._runner import ( AEVALUATOR_T, DATA_T, @@ -83,6 +84,7 @@ async def aevaluate( client: Optional[langsmith.Client] = None, blocking: bool = True, experiment: Optional[Union[schemas.TracerSession, str, uuid.UUID]] = None, + upload_results: bool = True, ) -> AsyncExperimentResults: r"""Evaluate an async target system or function on a given dataset. @@ -241,6 +243,8 @@ async def aevaluate( ... ) # doctest: +ELLIPSIS View the evaluation results for experiment:... """ # noqa: E501 + if not upload_results: + _warn_once("'upload_results' parameter is in beta.") if experiment and experiment_prefix: raise ValueError( "Expected at most one of 'experiment' or 'experiment_prefix'," @@ -260,6 +264,7 @@ async def aevaluate( client=client, blocking=blocking, experiment=experiment, + upload_results=upload_results, ) @@ -379,6 +384,7 @@ async def _aevaluate( client: Optional[langsmith.Client] = None, blocking: bool = True, experiment: Optional[Union[schemas.TracerSession, str, uuid.UUID]] = None, + upload_results: bool = True, ) -> AsyncExperimentResults: is_async_target = ( asyncio.iscoroutinefunction(target) @@ -401,6 +407,7 @@ async def _aevaluate( description=description, num_repetitions=num_repetitions, runs=runs, + upload_results=upload_results, ).astart() cache_dir = ls_utils.get_cache_dir(None) if cache_dir is not None: @@ -461,6 +468,7 @@ def __init__( summary_results: Optional[AsyncIterable[EvaluationResults]] = None, description: Optional[str] = None, num_repetitions: int = 1, + upload_results: bool = True, ): super().__init__( experiment=experiment, @@ -476,6 +484,7 @@ def __init__( self._evaluation_results = evaluation_results self._summary_results = summary_results self._num_repetitions = num_repetitions + self._upload_results = upload_results async def aget_examples(self) -> AsyncIterator[schemas.Example]: if self._examples is None: @@ -535,7 +544,7 @@ async def astart(self) -> _AsyncExperimentManager: "No examples found in the dataset." "Please ensure the data provided to aevaluate is not empty." ) - project = self._get_project(first_example) + project = self._get_project(first_example) if self._upload_results else None self._print_experiment_start(project, first_example) self._metadata["num_repetitions"] = self._num_repetitions return self.__class__( @@ -545,6 +554,7 @@ async def astart(self) -> _AsyncExperimentManager: client=self.client, runs=self._runs, evaluation_results=self._evaluation_results, + upload_results=self._upload_results, ) async def awith_predictions( @@ -561,6 +571,7 @@ async def awith_predictions( metadata=self._metadata, client=self.client, runs=(pred["run"] async for pred in r2), + upload_results=self._upload_results, ) async def awith_evaluators( @@ -580,6 +591,7 @@ async def awith_evaluators( runs=(result["run"] async for result in r2), evaluation_results=(result["evaluation_results"] async for result in r3), summary_results=self._summary_results, + upload_results=self._upload_results, ) async def awith_summary_evaluators( @@ -596,6 +608,7 @@ async def awith_summary_evaluators( runs=self.aget_runs(), evaluation_results=self._evaluation_results, summary_results=aggregate_feedback_gen, + upload_results=self._upload_results, ) async def aget_results(self) -> AsyncIterator[ExperimentResultRow]: @@ -675,7 +688,7 @@ async def _arun_evaluators( **current_context, "project_name": "evaluators", "metadata": metadata, - "enabled": True, + "enabled": "local" if not self._upload_results else True, "client": self.client, } ): @@ -689,10 +702,12 @@ async def _arun_evaluators( example=example, ) eval_results["results"].extend( + self.client._select_eval_results(evaluator_response) + ) + if self._upload_results: self.client._log_evaluation_feedback( evaluator_response, run=run, _executor=executor ) - ) except Exception as e: try: feedback_keys = _extract_feedback_keys(evaluator) @@ -709,11 +724,12 @@ async def _arun_evaluators( ] ) eval_results["results"].extend( - # TODO: This is a hack + self.client._select_eval_results(error_response) + ) + if self._upload_results: self.client._log_evaluation_feedback( error_response, run=run, _executor=executor ) - ) except Exception as e2: logger.debug(f"Error parsing feedback keys: {e2}") pass @@ -744,7 +760,7 @@ async def _aapply_summary_evaluators( runs.append(run) examples.append(example) aggregate_feedback = [] - project_id = self._get_experiment().id + project_id = self._get_experiment().id if self._upload_results else None current_context = rh.get_tracing_context() metadata = { **(current_context["metadata"] or {}), @@ -758,7 +774,7 @@ async def _aapply_summary_evaluators( **current_context, "project_name": "evaluators", "metadata": metadata, - "enabled": True, + "enabled": "local" if not self._upload_results else True, "client": self.client, } ): @@ -770,16 +786,17 @@ async def _aapply_summary_evaluators( fn_name=evaluator.__name__, ) aggregate_feedback.extend(flattened_results) - for result in flattened_results: - feedback = result.dict(exclude={"target_run_id"}) - evaluator_info = feedback.pop("evaluator_info", None) - await aitertools.aio_to_thread( - self.client.create_feedback, - **feedback, - run_id=None, - project_id=project_id, - source_info=evaluator_info, - ) + if self._upload_results: + for result in flattened_results: + feedback = result.dict(exclude={"target_run_id"}) + evaluator_info = feedback.pop("evaluator_info", None) + await aitertools.aio_to_thread( + self.client.create_feedback, + **feedback, + run_id=None, + project_id=project_id, + source_info=evaluator_info, + ) except Exception as e: logger.error( f"Error running summary evaluator {repr(evaluator)}: {e}", @@ -815,6 +832,8 @@ async def _get_dataset_splits(self) -> Optional[list[str]]: return list(splits) async def _aend(self) -> None: + if not self._upload_results: + return experiment = self._experiment if experiment is None: raise ValueError("Experiment not started yet.") diff --git a/python/langsmith/evaluation/_runner.py b/python/langsmith/evaluation/_runner.py index 556e697d4..2339601c6 100644 --- a/python/langsmith/evaluation/_runner.py +++ b/python/langsmith/evaluation/_runner.py @@ -44,6 +44,7 @@ from langsmith import run_trees as rt from langsmith import schemas from langsmith import utils as ls_utils +from langsmith._internal._beta_decorator import _warn_once from langsmith.evaluation.evaluator import ( SUMMARY_EVALUATOR_T, ComparisonEvaluationResult, @@ -103,6 +104,7 @@ def evaluate( client: Optional[langsmith.Client] = None, blocking: bool = True, experiment: Optional[Union[schemas.TracerSession, str, uuid.UUID]] = None, + upload_results: bool = True, ) -> ExperimentResults: r"""Evaluate a target system or function on a given dataset. @@ -259,6 +261,8 @@ def evaluate( ... ) # doctest: +ELLIPSIS View the evaluation results for experiment:... """ # noqa: E501 + if not upload_results: + _warn_once("'upload_results' parameter is in beta.") if callable(target) and rh.is_async(target): raise ValueError( "Async functions are not supported by `evaluate`. " @@ -290,6 +294,7 @@ def evaluate( client=client, blocking=blocking, experiment=experiment, + upload_results=upload_results, ) @@ -898,6 +903,7 @@ def _evaluate( client: Optional[langsmith.Client] = None, blocking: bool = True, experiment: Optional[Union[schemas.TracerSession, str, uuid.UUID]] = None, + upload_results: bool = True, ) -> ExperimentResults: # Initialize the experiment manager. client = client or rt.get_cached_client() @@ -918,6 +924,7 @@ def _evaluate( # If provided, we don't need to create a new experiment. runs=runs, # Create or resolve the experiment. + upload_results=upload_results, ).start() cache_dir = ls_utils.get_cache_dir(None) cache_path = ( @@ -1104,9 +1111,9 @@ def _get_project(self, first_example: schemas.Example) -> schemas.TracerSession: return project def _print_experiment_start( - self, project: schemas.TracerSession, first_example: schemas.Example + self, project: Optional[schemas.TracerSession], first_example: schemas.Example ) -> None: - if project.url: + if project and project.url: # TODO: Make this a public API project_url = project.url.split("?")[0] dataset_id = first_example.dataset_id @@ -1162,6 +1169,7 @@ def __init__( summary_results: Optional[Iterable[EvaluationResults]] = None, description: Optional[str] = None, num_repetitions: int = 1, + upload_results: bool = True, ): super().__init__( experiment=experiment, @@ -1175,6 +1183,7 @@ def __init__( self._evaluation_results = evaluation_results self._summary_results = summary_results self._num_repetitions = num_repetitions + self._upload_results = upload_results @property def examples(self) -> Iterable[schemas.Example]: @@ -1215,7 +1224,7 @@ def runs(self) -> Iterable[schemas.Run]: def start(self) -> _ExperimentManager: first_example = next(itertools.islice(self.examples, 1)) - project = self._get_project(first_example) + project = self._get_project(first_example) if self._upload_results else None self._print_experiment_start(project, first_example) self._metadata["num_repetitions"] = self._num_repetitions return self.__class__( @@ -1225,6 +1234,7 @@ def start(self) -> _ExperimentManager: client=self.client, runs=self._runs, evaluation_results=self._evaluation_results, + upload_results=self._upload_results, ) def with_predictions( @@ -1245,6 +1255,7 @@ def with_predictions( metadata=self._metadata, client=self.client, runs=(pred["run"] for pred in r2), + upload_results=self._upload_results, # TODO: Can't do multiple prediction rounds rn. ) @@ -1276,6 +1287,7 @@ def with_evaluators( runs=(result["run"] for result in r2), evaluation_results=(result["evaluation_results"] for result in r3), summary_results=self._summary_results, + upload_results=self._upload_results, ) def with_summary_evaluators( @@ -1296,6 +1308,7 @@ def with_summary_evaluators( runs=self.runs, evaluation_results=self._evaluation_results, summary_results=aggregate_feedback_gen, + upload_results=self._upload_results, ) def get_results(self) -> Iterable[ExperimentResultRow]: @@ -1332,7 +1345,12 @@ def _predict( if max_concurrency == 0: for example in self.examples: yield _forward( - fn, example, self.experiment_name, self._metadata, self.client + fn, + example, + self.experiment_name, + self._metadata, + self.client, + self._upload_results, ) else: @@ -1345,6 +1363,7 @@ def _predict( self.experiment_name, self._metadata, self.client, + self._upload_results, ) for example in self.examples ] @@ -1373,7 +1392,7 @@ def _run_evaluators( **current_context, "project_name": "evaluators", "metadata": metadata, - "enabled": True, + "enabled": "local" if not self._upload_results else True, "client": self.client, } ): @@ -1386,12 +1405,15 @@ def _run_evaluators( run=run, example=example, ) + eval_results["results"].extend( + self.client._select_eval_results(evaluator_response) + ) + if self._upload_results: # TODO: This is a hack self.client._log_evaluation_feedback( evaluator_response, run=run, _executor=executor ) - ) except Exception as e: try: feedback_keys = _extract_feedback_keys(evaluator) @@ -1408,17 +1430,19 @@ def _run_evaluators( ] ) eval_results["results"].extend( + self.client._select_eval_results(error_response) + ) + if self._upload_results: # TODO: This is a hack self.client._log_evaluation_feedback( error_response, run=run, _executor=executor ) - ) except Exception as e2: logger.debug(f"Error parsing feedback keys: {e2}") pass logger.error( f"Error running evaluator {repr(evaluator)} on" - f" run {run.id}: {repr(e)}", + f" run {run.id if run else ''}: {repr(e)}", exc_info=True, ) return ExperimentResultRow( @@ -1447,7 +1471,7 @@ def _score( self._run_evaluators, evaluators, current_results, - executor=executor, + executor, ) else: futures = set() @@ -1457,7 +1481,7 @@ def _score( self._run_evaluators, evaluators, current_results, - executor=executor, + executor, ) ) try: @@ -1481,7 +1505,7 @@ def _apply_summary_evaluators( examples.append(example) aggregate_feedback = [] with ls_utils.ContextThreadPoolExecutor() as executor: - project_id = self._get_experiment().id + project_id = self._get_experiment().id if self._upload_results else None current_context = rh.get_tracing_context() metadata = { **(current_context["metadata"] or {}), @@ -1496,7 +1520,7 @@ def _apply_summary_evaluators( "project_name": "evaluators", "metadata": metadata, "client": self.client, - "enabled": True, + "enabled": "local" if not self._upload_results else True, } ): for evaluator in summary_evaluators: @@ -1508,16 +1532,17 @@ def _apply_summary_evaluators( fn_name=evaluator.__name__, ) aggregate_feedback.extend(flattened_results) - for result in flattened_results: - feedback = result.dict(exclude={"target_run_id"}) - evaluator_info = feedback.pop("evaluator_info", None) - executor.submit( - self.client.create_feedback, - **feedback, - run_id=None, - project_id=project_id, - source_info=evaluator_info, - ) + if self._upload_results: + for result in flattened_results: + feedback = result.dict(exclude={"target_run_id"}) + evaluator_info = feedback.pop("evaluator_info", None) + executor.submit( + self.client.create_feedback, + **feedback, + run_id=None, + project_id=project_id, + source_info=evaluator_info, + ) except Exception as e: logger.error( f"Error running summary evaluator {repr(evaluator)}: {e}", @@ -1551,6 +1576,8 @@ def _get_dataset_splits(self) -> Optional[list[str]]: return list(splits) def _end(self) -> None: + if not self._upload_results: + return experiment = self._experiment if experiment is None: raise ValueError("Experiment not started yet.") @@ -1619,6 +1646,7 @@ def _forward( experiment_name: str, metadata: dict, client: langsmith.Client, + upload_results: bool, ) -> _ForwardResults: run: Optional[schemas.RunBase] = None @@ -1626,33 +1654,26 @@ def _get_run(r: rt.RunTree) -> None: nonlocal run run = r - with rh.tracing_context(enabled=True): + with rh.tracing_context(enabled="local" if not upload_results else True): + example_version = ( + example.modified_at.isoformat() + if example.modified_at + else example.created_at.isoformat() + ) + langsmith_extra = rh.LangSmithExtra( + reference_example_id=example.id, + on_end=_get_run, + project_name=experiment_name, + metadata={**metadata, "example_version": example_version}, + client=client, + ) try: - fn( - example.inputs, - langsmith_extra=rh.LangSmithExtra( - reference_example_id=example.id, - on_end=_get_run, - project_name=experiment_name, - metadata={ - **metadata, - "example_version": ( - example.modified_at.isoformat() - if example.modified_at - else example.created_at.isoformat() - ), - }, - client=client, - ), - ) + fn(example.inputs, langsmith_extra=langsmith_extra) except Exception as e: logger.error( f"Error running target function: {e}", exc_info=True, stacklevel=1 ) - return _ForwardResults( - run=cast(schemas.Run, run), - example=example, - ) + return _ForwardResults(run=cast(schemas.Run, run), example=example) def _resolve_data( diff --git a/python/langsmith/run_helpers.py b/python/langsmith/run_helpers.py index 7510b75ee..80be362c0 100644 --- a/python/langsmith/run_helpers.py +++ b/python/langsmith/run_helpers.py @@ -24,6 +24,7 @@ Generic, Iterator, List, + Literal, Mapping, Optional, Protocol, @@ -58,7 +59,9 @@ _PROJECT_NAME = contextvars.ContextVar[Optional[str]]("_PROJECT_NAME", default=None) _TAGS = contextvars.ContextVar[Optional[List[str]]]("_TAGS", default=None) _METADATA = contextvars.ContextVar[Optional[Dict[str, Any]]]("_METADATA", default=None) -_TRACING_ENABLED = contextvars.ContextVar[Optional[bool]]( + + +_TRACING_ENABLED = contextvars.ContextVar[Optional[Union[bool, Literal["local"]]]]( "_TRACING_ENABLED", default=None ) _CLIENT = contextvars.ContextVar[Optional[ls_client.Client]]("_CLIENT", default=None) @@ -100,7 +103,7 @@ def tracing_context( tags: Optional[List[str]] = None, metadata: Optional[Dict[str, Any]] = None, parent: Optional[Union[run_trees.RunTree, Mapping, str]] = None, - enabled: Optional[bool] = None, + enabled: Optional[Union[bool, Literal["local"]]] = None, client: Optional[ls_client.Client] = None, **kwargs: Any, ) -> Generator[None, None, None]: @@ -935,8 +938,9 @@ def _setup(self) -> run_trees.RunTree: attachments=self.attachments or {}, ) - if enabled: + if enabled is True: self.new_run.post() + if enabled: _TAGS.set(tags_) _METADATA.set(metadata) _PARENT_RUN_TREE.set(self.new_run) @@ -974,7 +978,7 @@ def _teardown( self.new_run.end(error=tb) if self.old_ctx is not None: enabled = utils.tracing_is_enabled(self.old_ctx) - if enabled: + if enabled is True: self.new_run.patch() _set_tracing_context(self.old_ctx) @@ -1218,7 +1222,7 @@ def _container_end( """End the run.""" run_tree = container.get("new_run") if run_tree is None: - # Tracing enabled + # Tracing not enabled return outputs_ = outputs if isinstance(outputs, dict) else {"output": outputs} error_ = None @@ -1226,7 +1230,8 @@ def _container_end( stacktrace = utils._format_exc() error_ = f"{repr(error)}\n\n{stacktrace}" run_tree.end(outputs=outputs_, error=error_) - run_tree.patch() + if utils.tracing_is_enabled() is True: + run_tree.patch() on_end = container.get("on_end") if on_end is not None and callable(on_end): try: @@ -1328,7 +1333,8 @@ def _setup_run( id_ = langsmith_extra.get("run_id") if not parent_run_ and not utils.tracing_is_enabled(): utils.log_once( - logging.DEBUG, "LangSmith tracing is enabled, returning original function." + logging.DEBUG, + "LangSmith tracing is not enabled, returning original function.", ) return _TraceableContainer( new_run=None, @@ -1410,10 +1416,11 @@ def _setup_run( client=client_, # type: ignore attachments=attachments, ) - try: - new_run.post() - except BaseException as e: - LOGGER.error(f"Failed to post run {new_run.id}: {e}") + if utils.tracing_is_enabled() is True: + try: + new_run.post() + except BaseException as e: + LOGGER.error(f"Failed to post run {new_run.id}: {e}") response_container = _TraceableContainer( new_run=new_run, project_name=selected_project, diff --git a/python/langsmith/utils.py b/python/langsmith/utils.py index f7c257d1f..3e8956d1a 100644 --- a/python/langsmith/utils.py +++ b/python/langsmith/utils.py @@ -24,6 +24,7 @@ Iterable, Iterator, List, + Literal, Mapping, Optional, Sequence, @@ -91,7 +92,7 @@ class LangSmithMissingAPIKeyWarning(LangSmithWarning): """Warning for missing API key.""" -def tracing_is_enabled(ctx: Optional[dict] = None) -> bool: +def tracing_is_enabled(ctx: Optional[dict] = None) -> Union[bool, Literal["local"]]: """Return True if tracing is enabled.""" from langsmith.run_helpers import get_current_run_tree, get_tracing_context diff --git a/python/tests/unit_tests/evaluation/test_runner.py b/python/tests/unit_tests/evaluation/test_runner.py index 020d9724c..72dee2128 100644 --- a/python/tests/unit_tests/evaluation/test_runner.py +++ b/python/tests/unit_tests/evaluation/test_runner.py @@ -153,7 +153,10 @@ def _create_example(idx: int) -> ls_schemas.Example: @pytest.mark.skipif(sys.version_info < (3, 9), reason="requires python3.9 or higher") @pytest.mark.parametrize("blocking", [False, True]) @pytest.mark.parametrize("as_runnable", [False, True]) -def test_evaluate_results(blocking: bool, as_runnable: bool) -> None: +@pytest.mark.parametrize("upload_results", [False, True]) +def test_evaluate_results( + blocking: bool, as_runnable: bool, upload_results: bool +) -> None: session = mock.Mock() ds_name = "my-dataset" ds_id = "00886375-eb2a-4038-9032-efff60309896" @@ -272,6 +275,7 @@ def summary_eval_outputs_reference(outputs, reference_outputs): summary_evaluators=summary_evaluators, num_repetitions=NUM_REPETITIONS, blocking=blocking, + upload_results=upload_results, ) if not blocking: deltas = [] @@ -303,40 +307,45 @@ def summary_eval_outputs_reference(outputs, reference_outputs): ) assert len(results._summary_results["results"]) == len(summary_evaluators) - assert fake_request.created_session - _wait_until(lambda: fake_request.runs) N_PREDS = SPLIT_SIZE * NUM_REPETITIONS - _wait_until(lambda: len(ordering_of_stuff) == (N_PREDS * (len(evaluators) + 1))) - _wait_until(lambda: slow_index is not None) - # Want it to be interleaved - assert ordering_of_stuff[:N_PREDS] != ["predict"] * N_PREDS + if upload_results: + assert fake_request.created_session + _wait_until(lambda: fake_request.runs) + _wait_until(lambda: len(ordering_of_stuff) == (N_PREDS * (len(evaluators) + 1))) + _wait_until(lambda: slow_index is not None) + # Want it to be interleaved + assert ordering_of_stuff[:N_PREDS] != ["predict"] * N_PREDS + else: + assert not fake_request.created_session # It's delayed, so it'll be the penultimate event # Will run all other preds and evals, then this, then the last eval assert slow_index == (len(evaluators) + 1) * (N_PREDS - 1) - def score_value(run, example): - return {"score": 0.7} + if upload_results: - ex_results = evaluate_existing( - fake_request.created_session["name"], - evaluators=[score_value], - client=client, - blocking=blocking, - ) - second_item = next(itertools.islice(iter(ex_results), 1, 2)) - first_list = list(ex_results) - second_list = list(ex_results) - second_item_after = next(itertools.islice(iter(ex_results), 1, 2)) - assert len(first_list) == len(second_list) == SPLIT_SIZE * NUM_REPETITIONS - assert first_list == second_list - assert second_item == second_item_after - dev_xample_ids = [e.id for e in dev_split] - for r in ex_results: - assert r["example"].id in dev_xample_ids - assert r["evaluation_results"]["results"][0].score == 0.7 - assert r["run"].reference_example_id in dev_xample_ids - assert not fake_request.should_fail + def score_value(run, example): + return {"score": 0.7} + + ex_results = evaluate_existing( + fake_request.created_session["name"], + evaluators=[score_value], + client=client, + blocking=blocking, + ) + second_item = next(itertools.islice(iter(ex_results), 1, 2)) + first_list = list(ex_results) + second_list = list(ex_results) + second_item_after = next(itertools.islice(iter(ex_results), 1, 2)) + assert len(first_list) == len(second_list) == SPLIT_SIZE * NUM_REPETITIONS + assert first_list == second_list + assert second_item == second_item_after + dev_xample_ids = [e.id for e in dev_split] + for r in ex_results: + assert r["example"].id in dev_xample_ids + assert r["evaluation_results"]["results"][0].score == 0.7 + assert r["run"].reference_example_id in dev_xample_ids + assert not fake_request.should_fail # Returning list of non-dicts not supported. def bad_eval_list(run, example): @@ -405,7 +414,10 @@ async def my_other_func(inputs: dict, other_val: int): @pytest.mark.skipif(sys.version_info < (3, 9), reason="requires python3.9 or higher") @pytest.mark.parametrize("blocking", [False, True]) @pytest.mark.parametrize("as_runnable", [False, True]) -async def test_aevaluate_results(blocking: bool, as_runnable: bool) -> None: +@pytest.mark.parametrize("upload_results", [False, True]) +async def test_aevaluate_results( + blocking: bool, as_runnable: bool, upload_results: bool +) -> None: session = mock.Mock() ds_name = "my-dataset" ds_id = "00886375-eb2a-4038-9032-efff60309896" @@ -527,6 +539,7 @@ def summary_eval_outputs_reference(outputs, reference_outputs): summary_evaluators=summary_evaluators, num_repetitions=NUM_REPETITIONS, blocking=blocking, + upload_results=upload_results, ) if not blocking: deltas = [] @@ -562,36 +575,44 @@ def summary_eval_outputs_reference(outputs, reference_outputs): ) assert len(results._summary_results["results"]) == len(summary_evaluators) - assert fake_request.created_session - _wait_until(lambda: fake_request.runs) N_PREDS = SPLIT_SIZE * NUM_REPETITIONS - _wait_until(lambda: len(ordering_of_stuff) == N_PREDS * (len(evaluators) + 1)) - _wait_until(lambda: slow_index is not None) - # Want it to be interleaved - assert ordering_of_stuff[:N_PREDS] != ["predict"] * N_PREDS - assert slow_index is not None - # It's delayed, so it'll be the penultimate event - # Will run all other preds and evals, then this, then the last eval - assert slow_index == (N_PREDS - 1) * (len(evaluators) + 1) - assert fake_request.created_session["name"] + if upload_results: + assert fake_request.created_session + _wait_until(lambda: fake_request.runs) + _wait_until(lambda: len(ordering_of_stuff) == N_PREDS * (len(evaluators) + 1)) + _wait_until(lambda: slow_index is not None) + # Want it to be interleaved + assert ordering_of_stuff[:N_PREDS] != ["predict"] * N_PREDS + assert slow_index is not None + # It's delayed, so it'll be the penultimate event + # Will run all other preds and evals, then this, then the last eval + assert slow_index == (N_PREDS - 1) * (len(evaluators) + 1) + + assert fake_request.created_session["name"] + else: + assert not fake_request.created_session async def score_value(run, example): return {"score": 0.7} - ex_results = await aevaluate_existing( - fake_request.created_session["name"], evaluators=[score_value], client=client - ) - all_results = [r async for r in ex_results] - assert len(all_results) == SPLIT_SIZE * NUM_REPETITIONS - dev_xample_ids = [e.id for e in dev_split] - async for r in ex_results: - assert r["example"].id in dev_xample_ids - assert r["evaluation_results"]["results"][0].score == 0.7 - assert r["run"].reference_example_id in dev_xample_ids - assert not fake_request.should_fail - # Returning list of non-dicts not supported. + if upload_results: + ex_results = await aevaluate_existing( + fake_request.created_session["name"], + evaluators=[score_value], + client=client, + blocking=blocking, + ) + all_results = [r async for r in ex_results] + assert len(all_results) == SPLIT_SIZE * NUM_REPETITIONS + dev_xample_ids = [e.id for e in dev_split] + async for r in ex_results: + assert r["example"].id in dev_xample_ids + assert r["evaluation_results"]["results"][0].score == 0.7 + assert r["run"].reference_example_id in dev_xample_ids + assert not fake_request.should_fail + # Returning list of non-dicts not supported. async def bad_eval_list(run, example): ordering_of_stuff.append("evaluate") return ["foo", 1] @@ -603,6 +624,7 @@ async def bad_eval_list(run, example): evaluators=[bad_eval_list], num_repetitions=NUM_REPETITIONS, blocking=blocking, + upload_results=upload_results, ) async for r in results: assert r["evaluation_results"]["results"][0].extra == {"error": True} @@ -628,7 +650,12 @@ async def atarget(x): with pytest.raises(ValueError, match="Invalid evaluator function."): await aevaluate( - atarget, data=ds_examples, evaluators=[eval_], client=client + atarget, + data=ds_examples, + evaluators=[eval_], + client=client, + upload_results=upload_results, + blocking=blocking, ) diff --git a/python/tests/unit_tests/test_run_helpers.py b/python/tests/unit_tests/test_run_helpers.py index 34df400e7..2c5658a62 100644 --- a/python/tests/unit_tests/test_run_helpers.py +++ b/python/tests/unit_tests/test_run_helpers.py @@ -7,12 +7,22 @@ import time import uuid import warnings -from typing import Any, AsyncGenerator, Generator, List, Optional, Set, Tuple, cast +from typing import ( + Any, + AsyncGenerator, + Generator, + List, + Optional, + Set, + Tuple, + Union, + cast, +) from unittest.mock import MagicMock, patch import pytest from requests_toolbelt import MultipartEncoder -from typing_extensions import Annotated +from typing_extensions import Annotated, Literal import langsmith from langsmith import Client @@ -913,7 +923,8 @@ def _get_run(r: RunTree) -> None: assert len(child_runs[2].child_runs) == 1 # type: ignore -def test_traceable_regular(): +@pytest.mark.parametrize("enabled", [True, "local"]) +def test_traceable_regular(enabled: Union[bool, Literal["local"]]): @traceable def some_sync_func(query: str, **kwargs: Any) -> list: assert kwargs == {"a": 1, "b": 2} @@ -962,7 +973,7 @@ def _get_run(r: RunTree) -> None: run = r mock_client_ = _get_mock_client() - with tracing_context(enabled=True): + with tracing_context(enabled=enabled): all_chunks = my_answer( "some_query", langsmith_extra={"on_end": _get_run, "client": mock_client_} ) @@ -988,9 +999,12 @@ def _get_run(r: RunTree) -> None: "summarize_answers", ] assert len(child_runs[2].child_runs) == 1 # type: ignore + mock_calls = _get_calls(mock_client_) + assert len(mock_calls) == (0 if enabled == "local" else 1) -async def test_traceable_async(): +@pytest.mark.parametrize("enabled", [True, "local"]) +async def test_traceable_async(enabled: Union[bool, Literal["local"]]): @traceable def some_sync_func(query: str) -> list: return [query, query] @@ -1045,7 +1059,7 @@ def _get_run(r: RunTree) -> None: run = r mock_client_ = _get_mock_client() - with tracing_context(enabled=True): + with tracing_context(enabled=enabled): all_chunks = await my_answer( "some_query", langsmith_extra={"on_end": _get_run, "client": mock_client_} ) @@ -1071,9 +1085,12 @@ def _get_run(r: RunTree) -> None: "summarize_answers", ] assert len(child_runs[2].child_runs) == 1 # type: ignore + mock_calls = _get_calls(mock_client_) + assert len(mock_calls) == (0 if enabled == "local" else 1) -def test_traceable_to_trace(): +@pytest.mark.parametrize("enabled", [True, "local"]) +def test_traceable_to_trace(enabled: Union[bool, Literal["local"]]): @traceable def parent_fn(a: int, b: int) -> int: with langsmith.trace(name="child_fn", inputs={"a": a, "b": b}) as run_tree: @@ -1087,9 +1104,10 @@ def _get_run(r: RunTree) -> None: nonlocal run run = r - with tracing_context(enabled=True): + mock_client_ = _get_mock_client() + with tracing_context(enabled=enabled): result = parent_fn( - 1, 2, langsmith_extra={"on_end": _get_run, "client": _get_mock_client()} + 1, 2, langsmith_extra={"on_end": _get_run, "client": mock_client_} ) assert result == 3 @@ -1103,9 +1121,12 @@ def _get_run(r: RunTree) -> None: assert len(child_runs) == 1 assert child_runs[0].name == "child_fn" assert child_runs[0].inputs == {"a": 1, "b": 2} + mock_calls = _get_calls(mock_client_) + assert len(mock_calls) == (0 if enabled == "local" else 1) -async def test_traceable_to_atrace(): +@pytest.mark.parametrize("enabled", [True, "local"]) +async def test_traceable_to_atrace(enabled: Union[bool, Literal["local"]]): @traceable async def great_grandchild_fn(a: int, b: int) -> int: return a + b @@ -1134,9 +1155,10 @@ def _get_run(r: RunTree) -> None: nonlocal run run = r - with tracing_context(enabled=True): + mock_client_ = _get_mock_client() + with tracing_context(enabled=enabled): result = await parent_fn( - 1, 2, langsmith_extra={"on_end": _get_run, "client": _get_mock_client()} + 1, 2, langsmith_extra={"on_end": _get_run, "client": mock_client_} ) assert result == 3 @@ -1162,15 +1184,18 @@ def _get_run(r: RunTree) -> None: ggc = grandchild.child_runs[1] assert ggc.name == "great_grandchild_fn" assert ggc.inputs == {"a": 1, "b": 2} + mock_calls = _get_calls(mock_client_) + assert len(mock_calls) == (0 if enabled == "local" else 1) -def test_trace_to_traceable(): +@pytest.mark.parametrize("enabled", [True, "local"]) +def test_trace_to_traceable(enabled: Union[bool, Literal["local"]]): @traceable def child_fn(a: int, b: int) -> int: return a + b mock_client_ = _get_mock_client() - with tracing_context(enabled=True): + with tracing_context(enabled=enabled): rid = uuid.uuid4() with langsmith.trace( name="parent_fn", inputs={"a": 1, "b": 2}, client=mock_client_, run_id=rid @@ -1190,7 +1215,7 @@ def child_fn(a: int, b: int) -> int: assert child_runs[0].inputs == {"a": 1, "b": 2} -def test_client_passed_when_traceable_parent(): +def test_client_not_passed_when_traceable_parent(): mock_client = _get_mock_client() rt = RunTree(name="foo", client=mock_client) headers = rt.to_headers() @@ -1201,14 +1226,7 @@ def my_run(foo: str): my_run(foo="bar", langsmith_extra={"parent": headers, "client": mock_client}) mock_calls = _get_calls(mock_client) - assert len(mock_calls) == 1 - call = mock_client.session.request.call_args - assert call.args[0] == "POST" - assert call.args[1].startswith("https://api.smith.langchain.com") - body = json.loads(call.kwargs["data"]) - assert body["post"] - assert body["post"][0]["inputs"] == {"foo": "bar"} - assert body["post"][0]["outputs"] == {"baz": "buzz"} + assert len(mock_calls) == 0 def test_client_passed_when_trace_parent(): @@ -1231,6 +1249,18 @@ def test_client_passed_when_trace_parent(): assert body["post"][0]["outputs"] == {"bar": "baz"} +def test_client_not_called_when_enabled_local(): + mock_client = _get_mock_client() + headers = RunTree(name="foo", client=mock_client).to_headers() + with tracing_context(enabled="local"): + with trace( + name="foo", inputs={"foo": "bar"}, parent=headers, client=mock_client + ) as rt: + rt.outputs["bar"] = "baz" + calls = _get_calls(mock_client) + assert len(calls) == 0 + + def test_from_runnable_config(): try: from langchain_core.tools import tool # type: ignore