Skip to content

Commit

Permalink
[Python] 0.1.66
Browse files Browse the repository at this point in the history
## Update RunnableConfig <> RunTree Precedence (#747)

## Attempt deeper copy to avoid mutations (735)
For some code paths currently, if you mutate inputs/outputs within Xs of invocation, langsmith will log the mutated value, which is confusing and the wrong behavior.

We can't naively use deepcopy, as many valid python objects cannot be copied.

This PR creates a compromise solution that attempts to deepcopy and then falls back to copying up to depth 4 while handling errors.

Will hopefully resolve #706 . Placed before the hide_* as well so that if people want to mutate the dict when filtering it doesn't impact the execution flow.
  • Loading branch information
hinthornw authored May 30, 2024
1 parent 621fae6 commit edccdb1
Show file tree
Hide file tree
Showing 6 changed files with 169 additions and 23 deletions.
11 changes: 9 additions & 2 deletions python/langsmith/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1070,11 +1070,14 @@ def _run_transform(
self,
run: Union[ls_schemas.Run, dict, ls_schemas.RunLikeDict],
update: bool = False,
copy: bool = False,
) -> dict:
"""Transform the given run object into a dictionary representation.
Args:
run (Union[ls_schemas.Run, dict]): The run object to transform.
update (bool, optional): Whether to update the run. Defaults to False.
copy (bool, optional): Whether to copy the run. Defaults to False.
Returns:
dict: The transformed run object as a dictionary.
Expand All @@ -1088,8 +1091,12 @@ def _run_transform(
elif isinstance(run_create["id"], str):
run_create["id"] = uuid.UUID(run_create["id"])
if "inputs" in run_create and run_create["inputs"] is not None:
if copy:
run_create["inputs"] = ls_utils.deepish_copy(run_create["inputs"])
run_create["inputs"] = self._hide_run_inputs(run_create["inputs"])
if "outputs" in run_create and run_create["outputs"] is not None:
if copy:
run_create["outputs"] = ls_utils.deepish_copy(run_create["outputs"])
run_create["outputs"] = self._hide_run_outputs(run_create["outputs"])
if not update and not run_create.get("start_time"):
run_create["start_time"] = datetime.datetime.now(datetime.timezone.utc)
Expand Down Expand Up @@ -1177,9 +1184,8 @@ def create_run(
}
if not self._filter_for_sampling([run_create]):
return
run_create = self._run_transform(run_create)
run_create = self._run_transform(run_create, copy=True)
self._insert_runtime_env([run_create])

if revision_id is not None:
run_create["extra"]["metadata"]["revision_id"] = revision_id
if (
Expand Down Expand Up @@ -1413,6 +1419,7 @@ def update_run(
if inputs is not None:
data["inputs"] = self._hide_run_inputs(inputs)
if outputs is not None:
outputs = ls_utils.deepish_copy(outputs)
data["outputs"] = self._hide_run_outputs(outputs)
if events is not None:
data["events"] = events
Expand Down
21 changes: 19 additions & 2 deletions python/langsmith/run_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -956,10 +956,27 @@ def _get_parent_run(
run_tree = langsmith_extra.get("run_tree")
if run_tree:
return run_tree
crt = get_current_run_tree()
if _runtime_env.get_langchain_core_version() is not None:
if rt := run_trees.RunTree.from_runnable_config(config):
return rt
return get_current_run_tree()
# Still need to break ties when alternating between traceable and
# LanChain code.
# Nesting: LC -> LS -> LS, we want to still use LS as the parent
# Otherwise would look like LC -> {LS, LS} (siblings)
if (
not crt # Simple LC -> LS
# Let user override if manually passed in or invoked in a
# RunnableSequence. This is a naive check.
or (config is not None and config.get("callbacks"))
# If the LangChain dotted order is more nested than the LangSmith
# dotted order, use the LangChain run as the parent.
# Note that this condition shouldn't be triggered in later
# versions of core, since we also update the run_tree context
# vars when updating the RunnableConfig context var.
or rt.dotted_order > crt.dotted_order
):
return rt
return crt


def _setup_run(
Expand Down
25 changes: 11 additions & 14 deletions python/langsmith/run_trees.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,20 +233,17 @@ def create_child(
return run

def _get_dicts_safe(self):
try:
return self.dict(exclude={"child_runs"}, exclude_none=True)
except TypeError:
# Things like generators cannot be copied
self_dict = self.dict(
exclude={"child_runs", "inputs", "outputs"}, exclude_none=True
)
if self.inputs:
# shallow copy
self_dict["inputs"] = self.inputs.copy()
if self.outputs:
# shallow copy
self_dict["outputs"] = self.outputs.copy()
return self_dict
# Things like generators cannot be copied
self_dict = self.dict(
exclude={"child_runs", "inputs", "outputs"}, exclude_none=True
)
if self.inputs is not None:
# shallow copy. deep copying will occur in the client
self_dict["inputs"] = self.inputs.copy()
if self.outputs is not None:
# shallow copy; deep copying will occur in the client
self_dict["outputs"] = self.outputs.copy()
return self_dict

def post(self, exclude_child_runs: bool = True) -> None:
"""Post the run tree to the API asynchronously."""
Expand Down
56 changes: 56 additions & 0 deletions python/langsmith/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Generic utility functions."""

import contextlib
import copy
import enum
import functools
import logging
Expand All @@ -20,6 +21,7 @@
Optional,
Sequence,
Tuple,
TypeVar,
Union,
)

Expand Down Expand Up @@ -497,3 +499,57 @@ def _format_exc() -> str:
tb_lines = traceback.format_exception(*sys.exc_info())
filtered_lines = [line for line in tb_lines if "langsmith/" not in line]
return "".join(filtered_lines)


T = TypeVar("T")


def _middle_copy(
val: T, memo: Dict[int, Any], max_depth: int = 4, _depth: int = 0
) -> T:
cls = type(val)

copier = getattr(cls, "__deepcopy__", None)
if copier is not None:
try:
return copier(memo)
except (TypeError, ValueError, RecursionError):
pass
if _depth >= max_depth:
return val
if isinstance(val, dict):
return { # type: ignore[return-value]
_middle_copy(k, memo, max_depth, _depth + 1): _middle_copy(
v, memo, max_depth, _depth + 1
)
for k, v in val.items()
}
if isinstance(val, list):
return [_middle_copy(item, memo, max_depth, _depth + 1) for item in val] # type: ignore[return-value]
if isinstance(val, tuple):
return tuple(_middle_copy(item, memo, max_depth, _depth + 1) for item in val) # type: ignore[return-value]
if isinstance(val, set):
return {_middle_copy(item, memo, max_depth, _depth + 1) for item in val} # type: ignore[return-value]

return val


def deepish_copy(val: T) -> T:
"""Deep copy a value with a compromise for uncopyable objects.
Args:
val: The value to be deep copied.
Returns:
The deep copied value.
"""
memo: Dict[int, Any] = {}
try:
return copy.deepcopy(val, memo)
except (TypeError, ValueError, RecursionError) as e:
# Generators, locks, etc. cannot be copied
# and raise a TypeError (mentioning pickling, since the dunder methods)
# are re-used for copying. We'll try to do a compromise and copy
# what we can
_LOGGER.debug("Failed to deepcopy input: %s", repr(e))
return _middle_copy(val, memo)
2 changes: 1 addition & 1 deletion python/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "langsmith"
version = "0.1.64"
version = "0.1.65"
description = "Client library to connect to the LangSmith LLM Tracing and Evaluation Platform."
authors = ["LangChain <[email protected]>"]
license = "MIT"
Expand Down
77 changes: 73 additions & 4 deletions python/tests/unit_tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import time
import uuid
import weakref
from datetime import datetime
from datetime import datetime, timezone
from enum import Enum
from io import BytesIO
from typing import Any, NamedTuple, Optional
Expand All @@ -28,6 +28,8 @@

import langsmith.env as ls_env
import langsmith.utils as ls_utils
from langsmith import run_trees
from langsmith import schemas as ls_schemas
from langsmith.client import (
Client,
_dumps_json,
Expand All @@ -37,7 +39,6 @@
_is_localhost,
_serialize_json,
)
from langsmith.schemas import Example

_CREATED_AT = datetime(2015, 1, 1, 0, 0, 0)

Expand Down Expand Up @@ -173,14 +174,14 @@ def test_headers(monkeypatch: pytest.MonkeyPatch) -> None:
@mock.patch("langsmith.client.requests.Session")
def test_upload_csv(mock_session_cls: mock.Mock) -> None:
dataset_id = str(uuid.uuid4())
example_1 = Example(
example_1 = ls_schemas.Example(
id=str(uuid.uuid4()),
created_at=_CREATED_AT,
inputs={"input": "1"},
outputs={"output": "2"},
dataset_id=dataset_id,
)
example_2 = Example(
example_2 = ls_schemas.Example(
id=str(uuid.uuid4()),
created_at=_CREATED_AT,
inputs={"input": "3"},
Expand Down Expand Up @@ -303,6 +304,74 @@ def test_create_run_unicode() -> None:
client.update_run(id_, status="completed")


def test_create_run_mutate() -> None:
inputs = {"messages": ["hi"], "mygen": (i for i in range(10))}
session = mock.Mock()
session.request = mock.Mock()
client = Client(
api_url="http://localhost:1984",
api_key="123",
session=session,
info=ls_schemas.LangSmithInfo(
batch_ingest_config=ls_schemas.BatchIngestConfig(
size_limit_bytes=None, # Note this field is not used here
size_limit=100,
scale_up_nthreads_limit=16,
scale_up_qsize_trigger=1000,
scale_down_nempty_trigger=4,
)
),
)
id_ = uuid.uuid4()
run_dict = dict(
id=id_,
name="my_run",
inputs=inputs,
run_type="llm",
trace_id=id_,
dotted_order=run_trees._create_current_dotted_order(
datetime.now(timezone.utc), id_
),
)
client.create_run(**run_dict) # type: ignore
inputs["messages"].append("there") # type: ignore
outputs = {"messages": ["hi", "there"]}
client.update_run(
id_,
outputs=outputs,
end_time=datetime.now(timezone.utc),
trace_id=id_,
dotted_order=run_dict["dotted_order"],
)
for _ in range(7):
time.sleep(0.1) # Give the background thread time to stop
payloads = [
json.loads(call[2]["data"])
for call in session.request.mock_calls
if call.args and call.args[1].endswith("runs/batch")
]
if payloads:
break
posts = [pr for payload in payloads for pr in payload.get("post", [])]
patches = [pr for payload in payloads for pr in payload.get("patch", [])]
inputs = next(
(pr["inputs"] for pr in itertools.chain(posts, patches) if pr.get("inputs")),
{},
)
outputs = next(
(pr["outputs"] for pr in itertools.chain(posts, patches) if pr.get("outputs")),
{},
)
# Check that the mutated value wasn't posted
assert "messages" in inputs
assert inputs["messages"] == ["hi"]
assert "mygen" in inputs
assert inputs["mygen"].startswith( # type: ignore
"<generator object test_create_run_mutate.<locals>."
)
assert outputs == {"messages": ["hi", "there"]}


class CallTracker:
def __init__(self) -> None:
self.counter = 0
Expand Down

0 comments on commit edccdb1

Please sign in to comment.