From edccdb17deaa8a79dd3dd2a491a7e4f713235938 Mon Sep 17 00:00:00 2001 From: William FH <13333726+hinthornw@users.noreply.github.com> Date: Thu, 30 May 2024 07:07:06 -0700 Subject: [PATCH] [Python] 0.1.66 ## Update RunnableConfig <> RunTree Precedence (#747) ## Attempt deeper copy to avoid mutations (735) For some code paths currently, if you mutate inputs/outputs within Xs of invocation, langsmith will log the mutated value, which is confusing and the wrong behavior. We can't naively use deepcopy, as many valid python objects cannot be copied. This PR creates a compromise solution that attempts to deepcopy and then falls back to copying up to depth 4 while handling errors. Will hopefully resolve #706 . Placed before the hide_* as well so that if people want to mutate the dict when filtering it doesn't impact the execution flow. --- python/langsmith/client.py | 11 +++- python/langsmith/run_helpers.py | 21 ++++++- python/langsmith/run_trees.py | 25 ++++----- python/langsmith/utils.py | 56 +++++++++++++++++++ python/pyproject.toml | 2 +- python/tests/unit_tests/test_client.py | 77 ++++++++++++++++++++++++-- 6 files changed, 169 insertions(+), 23 deletions(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index afafeba0e..d3966f597 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -1070,11 +1070,14 @@ def _run_transform( self, run: Union[ls_schemas.Run, dict, ls_schemas.RunLikeDict], update: bool = False, + copy: bool = False, ) -> dict: """Transform the given run object into a dictionary representation. Args: run (Union[ls_schemas.Run, dict]): The run object to transform. + update (bool, optional): Whether to update the run. Defaults to False. + copy (bool, optional): Whether to copy the run. Defaults to False. Returns: dict: The transformed run object as a dictionary. @@ -1088,8 +1091,12 @@ def _run_transform( elif isinstance(run_create["id"], str): run_create["id"] = uuid.UUID(run_create["id"]) if "inputs" in run_create and run_create["inputs"] is not None: + if copy: + run_create["inputs"] = ls_utils.deepish_copy(run_create["inputs"]) run_create["inputs"] = self._hide_run_inputs(run_create["inputs"]) if "outputs" in run_create and run_create["outputs"] is not None: + if copy: + run_create["outputs"] = ls_utils.deepish_copy(run_create["outputs"]) run_create["outputs"] = self._hide_run_outputs(run_create["outputs"]) if not update and not run_create.get("start_time"): run_create["start_time"] = datetime.datetime.now(datetime.timezone.utc) @@ -1177,9 +1184,8 @@ def create_run( } if not self._filter_for_sampling([run_create]): return - run_create = self._run_transform(run_create) + run_create = self._run_transform(run_create, copy=True) self._insert_runtime_env([run_create]) - if revision_id is not None: run_create["extra"]["metadata"]["revision_id"] = revision_id if ( @@ -1413,6 +1419,7 @@ def update_run( if inputs is not None: data["inputs"] = self._hide_run_inputs(inputs) if outputs is not None: + outputs = ls_utils.deepish_copy(outputs) data["outputs"] = self._hide_run_outputs(outputs) if events is not None: data["events"] = events diff --git a/python/langsmith/run_helpers.py b/python/langsmith/run_helpers.py index 7a0cc278a..7167cd177 100644 --- a/python/langsmith/run_helpers.py +++ b/python/langsmith/run_helpers.py @@ -956,10 +956,27 @@ def _get_parent_run( run_tree = langsmith_extra.get("run_tree") if run_tree: return run_tree + crt = get_current_run_tree() if _runtime_env.get_langchain_core_version() is not None: if rt := run_trees.RunTree.from_runnable_config(config): - return rt - return get_current_run_tree() + # Still need to break ties when alternating between traceable and + # LanChain code. + # Nesting: LC -> LS -> LS, we want to still use LS as the parent + # Otherwise would look like LC -> {LS, LS} (siblings) + if ( + not crt # Simple LC -> LS + # Let user override if manually passed in or invoked in a + # RunnableSequence. This is a naive check. + or (config is not None and config.get("callbacks")) + # If the LangChain dotted order is more nested than the LangSmith + # dotted order, use the LangChain run as the parent. + # Note that this condition shouldn't be triggered in later + # versions of core, since we also update the run_tree context + # vars when updating the RunnableConfig context var. + or rt.dotted_order > crt.dotted_order + ): + return rt + return crt def _setup_run( diff --git a/python/langsmith/run_trees.py b/python/langsmith/run_trees.py index 675aee4ec..cb767108e 100644 --- a/python/langsmith/run_trees.py +++ b/python/langsmith/run_trees.py @@ -233,20 +233,17 @@ def create_child( return run def _get_dicts_safe(self): - try: - return self.dict(exclude={"child_runs"}, exclude_none=True) - except TypeError: - # Things like generators cannot be copied - self_dict = self.dict( - exclude={"child_runs", "inputs", "outputs"}, exclude_none=True - ) - if self.inputs: - # shallow copy - self_dict["inputs"] = self.inputs.copy() - if self.outputs: - # shallow copy - self_dict["outputs"] = self.outputs.copy() - return self_dict + # Things like generators cannot be copied + self_dict = self.dict( + exclude={"child_runs", "inputs", "outputs"}, exclude_none=True + ) + if self.inputs is not None: + # shallow copy. deep copying will occur in the client + self_dict["inputs"] = self.inputs.copy() + if self.outputs is not None: + # shallow copy; deep copying will occur in the client + self_dict["outputs"] = self.outputs.copy() + return self_dict def post(self, exclude_child_runs: bool = True) -> None: """Post the run tree to the API asynchronously.""" diff --git a/python/langsmith/utils.py b/python/langsmith/utils.py index 0217ab4e4..b64904996 100644 --- a/python/langsmith/utils.py +++ b/python/langsmith/utils.py @@ -1,6 +1,7 @@ """Generic utility functions.""" import contextlib +import copy import enum import functools import logging @@ -20,6 +21,7 @@ Optional, Sequence, Tuple, + TypeVar, Union, ) @@ -497,3 +499,57 @@ def _format_exc() -> str: tb_lines = traceback.format_exception(*sys.exc_info()) filtered_lines = [line for line in tb_lines if "langsmith/" not in line] return "".join(filtered_lines) + + +T = TypeVar("T") + + +def _middle_copy( + val: T, memo: Dict[int, Any], max_depth: int = 4, _depth: int = 0 +) -> T: + cls = type(val) + + copier = getattr(cls, "__deepcopy__", None) + if copier is not None: + try: + return copier(memo) + except (TypeError, ValueError, RecursionError): + pass + if _depth >= max_depth: + return val + if isinstance(val, dict): + return { # type: ignore[return-value] + _middle_copy(k, memo, max_depth, _depth + 1): _middle_copy( + v, memo, max_depth, _depth + 1 + ) + for k, v in val.items() + } + if isinstance(val, list): + return [_middle_copy(item, memo, max_depth, _depth + 1) for item in val] # type: ignore[return-value] + if isinstance(val, tuple): + return tuple(_middle_copy(item, memo, max_depth, _depth + 1) for item in val) # type: ignore[return-value] + if isinstance(val, set): + return {_middle_copy(item, memo, max_depth, _depth + 1) for item in val} # type: ignore[return-value] + + return val + + +def deepish_copy(val: T) -> T: + """Deep copy a value with a compromise for uncopyable objects. + + Args: + val: The value to be deep copied. + + Returns: + The deep copied value. + """ + memo: Dict[int, Any] = {} + try: + return copy.deepcopy(val, memo) + except (TypeError, ValueError, RecursionError) as e: + # Generators, locks, etc. cannot be copied + # and raise a TypeError (mentioning pickling, since the dunder methods) + # are re-used for copying. We'll try to do a compromise and copy + # what we can + _LOGGER.debug("Failed to deepcopy input: %s", repr(e)) + return _middle_copy(val, memo) diff --git a/python/pyproject.toml b/python/pyproject.toml index 5fab25448..b92ba21df 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "langsmith" -version = "0.1.64" +version = "0.1.65" description = "Client library to connect to the LangSmith LLM Tracing and Evaluation Platform." authors = ["LangChain "] license = "MIT" diff --git a/python/tests/unit_tests/test_client.py b/python/tests/unit_tests/test_client.py index 62dd302fd..5e07ac147 100644 --- a/python/tests/unit_tests/test_client.py +++ b/python/tests/unit_tests/test_client.py @@ -11,7 +11,7 @@ import time import uuid import weakref -from datetime import datetime +from datetime import datetime, timezone from enum import Enum from io import BytesIO from typing import Any, NamedTuple, Optional @@ -28,6 +28,8 @@ import langsmith.env as ls_env import langsmith.utils as ls_utils +from langsmith import run_trees +from langsmith import schemas as ls_schemas from langsmith.client import ( Client, _dumps_json, @@ -37,7 +39,6 @@ _is_localhost, _serialize_json, ) -from langsmith.schemas import Example _CREATED_AT = datetime(2015, 1, 1, 0, 0, 0) @@ -173,14 +174,14 @@ def test_headers(monkeypatch: pytest.MonkeyPatch) -> None: @mock.patch("langsmith.client.requests.Session") def test_upload_csv(mock_session_cls: mock.Mock) -> None: dataset_id = str(uuid.uuid4()) - example_1 = Example( + example_1 = ls_schemas.Example( id=str(uuid.uuid4()), created_at=_CREATED_AT, inputs={"input": "1"}, outputs={"output": "2"}, dataset_id=dataset_id, ) - example_2 = Example( + example_2 = ls_schemas.Example( id=str(uuid.uuid4()), created_at=_CREATED_AT, inputs={"input": "3"}, @@ -303,6 +304,74 @@ def test_create_run_unicode() -> None: client.update_run(id_, status="completed") +def test_create_run_mutate() -> None: + inputs = {"messages": ["hi"], "mygen": (i for i in range(10))} + session = mock.Mock() + session.request = mock.Mock() + client = Client( + api_url="http://localhost:1984", + api_key="123", + session=session, + info=ls_schemas.LangSmithInfo( + batch_ingest_config=ls_schemas.BatchIngestConfig( + size_limit_bytes=None, # Note this field is not used here + size_limit=100, + scale_up_nthreads_limit=16, + scale_up_qsize_trigger=1000, + scale_down_nempty_trigger=4, + ) + ), + ) + id_ = uuid.uuid4() + run_dict = dict( + id=id_, + name="my_run", + inputs=inputs, + run_type="llm", + trace_id=id_, + dotted_order=run_trees._create_current_dotted_order( + datetime.now(timezone.utc), id_ + ), + ) + client.create_run(**run_dict) # type: ignore + inputs["messages"].append("there") # type: ignore + outputs = {"messages": ["hi", "there"]} + client.update_run( + id_, + outputs=outputs, + end_time=datetime.now(timezone.utc), + trace_id=id_, + dotted_order=run_dict["dotted_order"], + ) + for _ in range(7): + time.sleep(0.1) # Give the background thread time to stop + payloads = [ + json.loads(call[2]["data"]) + for call in session.request.mock_calls + if call.args and call.args[1].endswith("runs/batch") + ] + if payloads: + break + posts = [pr for payload in payloads for pr in payload.get("post", [])] + patches = [pr for payload in payloads for pr in payload.get("patch", [])] + inputs = next( + (pr["inputs"] for pr in itertools.chain(posts, patches) if pr.get("inputs")), + {}, + ) + outputs = next( + (pr["outputs"] for pr in itertools.chain(posts, patches) if pr.get("outputs")), + {}, + ) + # Check that the mutated value wasn't posted + assert "messages" in inputs + assert inputs["messages"] == ["hi"] + assert "mygen" in inputs + assert inputs["mygen"].startswith( # type: ignore + "." + ) + assert outputs == {"messages": ["hi", "there"]} + + class CallTracker: def __init__(self) -> None: self.counter = 0