diff --git a/python/langsmith/client.py b/python/langsmith/client.py index afafeba0e..d3966f597 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -1070,11 +1070,14 @@ def _run_transform( self, run: Union[ls_schemas.Run, dict, ls_schemas.RunLikeDict], update: bool = False, + copy: bool = False, ) -> dict: """Transform the given run object into a dictionary representation. Args: run (Union[ls_schemas.Run, dict]): The run object to transform. + update (bool, optional): Whether to update the run. Defaults to False. + copy (bool, optional): Whether to copy the run. Defaults to False. Returns: dict: The transformed run object as a dictionary. @@ -1088,8 +1091,12 @@ def _run_transform( elif isinstance(run_create["id"], str): run_create["id"] = uuid.UUID(run_create["id"]) if "inputs" in run_create and run_create["inputs"] is not None: + if copy: + run_create["inputs"] = ls_utils.deepish_copy(run_create["inputs"]) run_create["inputs"] = self._hide_run_inputs(run_create["inputs"]) if "outputs" in run_create and run_create["outputs"] is not None: + if copy: + run_create["outputs"] = ls_utils.deepish_copy(run_create["outputs"]) run_create["outputs"] = self._hide_run_outputs(run_create["outputs"]) if not update and not run_create.get("start_time"): run_create["start_time"] = datetime.datetime.now(datetime.timezone.utc) @@ -1177,9 +1184,8 @@ def create_run( } if not self._filter_for_sampling([run_create]): return - run_create = self._run_transform(run_create) + run_create = self._run_transform(run_create, copy=True) self._insert_runtime_env([run_create]) - if revision_id is not None: run_create["extra"]["metadata"]["revision_id"] = revision_id if ( @@ -1413,6 +1419,7 @@ def update_run( if inputs is not None: data["inputs"] = self._hide_run_inputs(inputs) if outputs is not None: + outputs = ls_utils.deepish_copy(outputs) data["outputs"] = self._hide_run_outputs(outputs) if events is not None: data["events"] = events diff --git a/python/langsmith/run_helpers.py b/python/langsmith/run_helpers.py index 7a0cc278a..7167cd177 100644 --- a/python/langsmith/run_helpers.py +++ b/python/langsmith/run_helpers.py @@ -956,10 +956,27 @@ def _get_parent_run( run_tree = langsmith_extra.get("run_tree") if run_tree: return run_tree + crt = get_current_run_tree() if _runtime_env.get_langchain_core_version() is not None: if rt := run_trees.RunTree.from_runnable_config(config): - return rt - return get_current_run_tree() + # Still need to break ties when alternating between traceable and + # LanChain code. + # Nesting: LC -> LS -> LS, we want to still use LS as the parent + # Otherwise would look like LC -> {LS, LS} (siblings) + if ( + not crt # Simple LC -> LS + # Let user override if manually passed in or invoked in a + # RunnableSequence. This is a naive check. + or (config is not None and config.get("callbacks")) + # If the LangChain dotted order is more nested than the LangSmith + # dotted order, use the LangChain run as the parent. + # Note that this condition shouldn't be triggered in later + # versions of core, since we also update the run_tree context + # vars when updating the RunnableConfig context var. + or rt.dotted_order > crt.dotted_order + ): + return rt + return crt def _setup_run( diff --git a/python/langsmith/run_trees.py b/python/langsmith/run_trees.py index 675aee4ec..cb767108e 100644 --- a/python/langsmith/run_trees.py +++ b/python/langsmith/run_trees.py @@ -233,20 +233,17 @@ def create_child( return run def _get_dicts_safe(self): - try: - return self.dict(exclude={"child_runs"}, exclude_none=True) - except TypeError: - # Things like generators cannot be copied - self_dict = self.dict( - exclude={"child_runs", "inputs", "outputs"}, exclude_none=True - ) - if self.inputs: - # shallow copy - self_dict["inputs"] = self.inputs.copy() - if self.outputs: - # shallow copy - self_dict["outputs"] = self.outputs.copy() - return self_dict + # Things like generators cannot be copied + self_dict = self.dict( + exclude={"child_runs", "inputs", "outputs"}, exclude_none=True + ) + if self.inputs is not None: + # shallow copy. deep copying will occur in the client + self_dict["inputs"] = self.inputs.copy() + if self.outputs is not None: + # shallow copy; deep copying will occur in the client + self_dict["outputs"] = self.outputs.copy() + return self_dict def post(self, exclude_child_runs: bool = True) -> None: """Post the run tree to the API asynchronously.""" diff --git a/python/langsmith/utils.py b/python/langsmith/utils.py index 0217ab4e4..b64904996 100644 --- a/python/langsmith/utils.py +++ b/python/langsmith/utils.py @@ -1,6 +1,7 @@ """Generic utility functions.""" import contextlib +import copy import enum import functools import logging @@ -20,6 +21,7 @@ Optional, Sequence, Tuple, + TypeVar, Union, ) @@ -497,3 +499,57 @@ def _format_exc() -> str: tb_lines = traceback.format_exception(*sys.exc_info()) filtered_lines = [line for line in tb_lines if "langsmith/" not in line] return "".join(filtered_lines) + + +T = TypeVar("T") + + +def _middle_copy( + val: T, memo: Dict[int, Any], max_depth: int = 4, _depth: int = 0 +) -> T: + cls = type(val) + + copier = getattr(cls, "__deepcopy__", None) + if copier is not None: + try: + return copier(memo) + except (TypeError, ValueError, RecursionError): + pass + if _depth >= max_depth: + return val + if isinstance(val, dict): + return { # type: ignore[return-value] + _middle_copy(k, memo, max_depth, _depth + 1): _middle_copy( + v, memo, max_depth, _depth + 1 + ) + for k, v in val.items() + } + if isinstance(val, list): + return [_middle_copy(item, memo, max_depth, _depth + 1) for item in val] # type: ignore[return-value] + if isinstance(val, tuple): + return tuple(_middle_copy(item, memo, max_depth, _depth + 1) for item in val) # type: ignore[return-value] + if isinstance(val, set): + return {_middle_copy(item, memo, max_depth, _depth + 1) for item in val} # type: ignore[return-value] + + return val + + +def deepish_copy(val: T) -> T: + """Deep copy a value with a compromise for uncopyable objects. + + Args: + val: The value to be deep copied. + + Returns: + The deep copied value. + """ + memo: Dict[int, Any] = {} + try: + return copy.deepcopy(val, memo) + except (TypeError, ValueError, RecursionError) as e: + # Generators, locks, etc. cannot be copied + # and raise a TypeError (mentioning pickling, since the dunder methods) + # are re-used for copying. We'll try to do a compromise and copy + # what we can + _LOGGER.debug("Failed to deepcopy input: %s", repr(e)) + return _middle_copy(val, memo) diff --git a/python/pyproject.toml b/python/pyproject.toml index 5fab25448..b92ba21df 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "langsmith" -version = "0.1.64" +version = "0.1.65" description = "Client library to connect to the LangSmith LLM Tracing and Evaluation Platform." authors = ["LangChain "] license = "MIT" diff --git a/python/tests/unit_tests/test_client.py b/python/tests/unit_tests/test_client.py index 62dd302fd..5e07ac147 100644 --- a/python/tests/unit_tests/test_client.py +++ b/python/tests/unit_tests/test_client.py @@ -11,7 +11,7 @@ import time import uuid import weakref -from datetime import datetime +from datetime import datetime, timezone from enum import Enum from io import BytesIO from typing import Any, NamedTuple, Optional @@ -28,6 +28,8 @@ import langsmith.env as ls_env import langsmith.utils as ls_utils +from langsmith import run_trees +from langsmith import schemas as ls_schemas from langsmith.client import ( Client, _dumps_json, @@ -37,7 +39,6 @@ _is_localhost, _serialize_json, ) -from langsmith.schemas import Example _CREATED_AT = datetime(2015, 1, 1, 0, 0, 0) @@ -173,14 +174,14 @@ def test_headers(monkeypatch: pytest.MonkeyPatch) -> None: @mock.patch("langsmith.client.requests.Session") def test_upload_csv(mock_session_cls: mock.Mock) -> None: dataset_id = str(uuid.uuid4()) - example_1 = Example( + example_1 = ls_schemas.Example( id=str(uuid.uuid4()), created_at=_CREATED_AT, inputs={"input": "1"}, outputs={"output": "2"}, dataset_id=dataset_id, ) - example_2 = Example( + example_2 = ls_schemas.Example( id=str(uuid.uuid4()), created_at=_CREATED_AT, inputs={"input": "3"}, @@ -303,6 +304,74 @@ def test_create_run_unicode() -> None: client.update_run(id_, status="completed") +def test_create_run_mutate() -> None: + inputs = {"messages": ["hi"], "mygen": (i for i in range(10))} + session = mock.Mock() + session.request = mock.Mock() + client = Client( + api_url="http://localhost:1984", + api_key="123", + session=session, + info=ls_schemas.LangSmithInfo( + batch_ingest_config=ls_schemas.BatchIngestConfig( + size_limit_bytes=None, # Note this field is not used here + size_limit=100, + scale_up_nthreads_limit=16, + scale_up_qsize_trigger=1000, + scale_down_nempty_trigger=4, + ) + ), + ) + id_ = uuid.uuid4() + run_dict = dict( + id=id_, + name="my_run", + inputs=inputs, + run_type="llm", + trace_id=id_, + dotted_order=run_trees._create_current_dotted_order( + datetime.now(timezone.utc), id_ + ), + ) + client.create_run(**run_dict) # type: ignore + inputs["messages"].append("there") # type: ignore + outputs = {"messages": ["hi", "there"]} + client.update_run( + id_, + outputs=outputs, + end_time=datetime.now(timezone.utc), + trace_id=id_, + dotted_order=run_dict["dotted_order"], + ) + for _ in range(7): + time.sleep(0.1) # Give the background thread time to stop + payloads = [ + json.loads(call[2]["data"]) + for call in session.request.mock_calls + if call.args and call.args[1].endswith("runs/batch") + ] + if payloads: + break + posts = [pr for payload in payloads for pr in payload.get("post", [])] + patches = [pr for payload in payloads for pr in payload.get("patch", [])] + inputs = next( + (pr["inputs"] for pr in itertools.chain(posts, patches) if pr.get("inputs")), + {}, + ) + outputs = next( + (pr["outputs"] for pr in itertools.chain(posts, patches) if pr.get("outputs")), + {}, + ) + # Check that the mutated value wasn't posted + assert "messages" in inputs + assert inputs["messages"] == ["hi"] + assert "mygen" in inputs + assert inputs["mygen"].startswith( # type: ignore + "." + ) + assert outputs == {"messages": ["hi", "there"]} + + class CallTracker: def __init__(self) -> None: self.counter = 0