Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add path as an option for uploading attachments #1331

Draft
wants to merge 19 commits into
base: main
Choose a base branch
from
36 changes: 26 additions & 10 deletions python/langsmith/_internal/_operations.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from __future__ import annotations

Check notice on line 1 in python/langsmith/_internal/_operations.py

View workflow job for this annotation

GitHub Actions / benchmark

Benchmark results

........... WARNING: the benchmark result may be unstable * the standard deviation (91.1 ms) is 13% of the mean (701 ms) Try to rerun the benchmark with more runs, values and/or loops. Run 'python -m pyperf system tune' command to reduce the system jitter. Use pyperf stats, pyperf dump and pyperf hist to analyze results. Use --quiet option to hide these warnings. create_5_000_run_trees: Mean +- std dev: 701 ms +- 91 ms ........... WARNING: the benchmark result may be unstable * the standard deviation (148 ms) is 10% of the mean (1.45 sec) Try to rerun the benchmark with more runs, values and/or loops. Run 'python -m pyperf system tune' command to reduce the system jitter. Use pyperf stats, pyperf dump and pyperf hist to analyze results. Use --quiet option to hide these warnings. create_10_000_run_trees: Mean +- std dev: 1.45 sec +- 0.15 sec ........... WARNING: the benchmark result may be unstable * the standard deviation (205 ms) is 14% of the mean (1.45 sec) Try to rerun the benchmark with more runs, values and/or loops. Run 'python -m pyperf system tune' command to reduce the system jitter. Use pyperf stats, pyperf dump and pyperf hist to analyze results. Use --quiet option to hide these warnings. create_20_000_run_trees: Mean +- std dev: 1.45 sec +- 0.21 sec ........... dumps_class_nested_py_branch_and_leaf_200x400: Mean +- std dev: 692 us +- 14 us ........... dumps_class_nested_py_leaf_50x100: Mean +- std dev: 25.1 ms +- 0.3 ms ........... dumps_class_nested_py_leaf_100x200: Mean +- std dev: 104 ms +- 3 ms ........... dumps_dataclass_nested_50x100: Mean +- std dev: 25.4 ms +- 0.5 ms ........... WARNING: the benchmark result may be unstable * the standard deviation (16.9 ms) is 23% of the mean (72.2 ms) Try to rerun the benchmark with more runs, values and/or loops. Run 'python -m pyperf system tune' command to reduce the system jitter. Use pyperf stats, pyperf dump and pyperf hist to analyze results. Use --quiet option to hide these warnings. dumps_pydantic_nested_50x100: Mean +- std dev: 72.2 ms +- 16.9 ms ........... dumps_pydanticv1_nested_50x100: Mean +- std dev: 200 ms +- 2 ms

Check notice on line 1 in python/langsmith/_internal/_operations.py

View workflow job for this annotation

GitHub Actions / benchmark

Comparison against main

+-----------------------------------------------+----------+------------------------+ | Benchmark | main | changes | +===============================================+==========+========================+ | dumps_pydanticv1_nested_50x100 | 220 ms | 200 ms: 1.10x faster | +-----------------------------------------------+----------+------------------------+ | create_5_000_run_trees | 731 ms | 701 ms: 1.04x faster | +-----------------------------------------------+----------+------------------------+ | dumps_dataclass_nested_50x100 | 25.5 ms | 25.4 ms: 1.00x faster | +-----------------------------------------------+----------+------------------------+ | dumps_class_nested_py_leaf_50x100 | 25.1 ms | 25.1 ms: 1.00x slower | +-----------------------------------------------+----------+------------------------+ | dumps_class_nested_py_branch_and_leaf_200x400 | 690 us | 692 us: 1.00x slower | +-----------------------------------------------+----------+------------------------+ | dumps_class_nested_py_leaf_100x200 | 104 ms | 104 ms: 1.00x slower | +-----------------------------------------------+----------+------------------------+ | create_10_000_run_trees | 1.41 sec | 1.45 sec: 1.02x slower | +-----------------------------------------------+----------+------------------------+ | create_20_000_run_trees | 1.39 sec | 1.45 sec: 1.04x slower | +-----------------------------------------------+----------+------------------------+ | dumps_pydantic_nested_50x100 | 65.9 ms | 72.2 ms: 1.09x slower | +-----------------------------------------------+----------+------------------------+ | Geometric mean | (ref) | 1.00x slower | +-----------------------------------------------+----------+------------------------+

import itertools
import logging
import os
import uuid
from typing import Literal, Optional, Union, cast

Expand Down Expand Up @@ -246,7 +247,7 @@
),
)
if op.attachments:
for n, (content_type, valb) in op.attachments.items():
for n, (content_type, data) in op.attachments.items():
if "." in n:
logger.warning(
f"Skipping logging of attachment '{n}' "
Expand All @@ -256,17 +257,32 @@
)
continue

acc_parts.append(
(
f"attachment.{op.id}.{n}",
if isinstance(data, bytes):
acc_parts.append(
(
None,
valb,
content_type,
{"Content-Length": str(len(valb))},
),
f"attachment.{op.id}.{n}",
(
None,
data,
content_type,
{"Content-Length": str(len(data))},
),
)
)
else:
file_path = data
file_size = os.path.getsize(file_path)
acc_parts.append(
(
f"attachment.{op.id}.{n}",
(
None,
open(file_path, "rb"), # type: ignore[arg-type]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we not need to read this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, I believe the multipart endpoint allows a buffer to be passed (I also added a test that should cover this)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is pretty cve worthy so we should be careful about cases where this is allowed/encouraged

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My suggestion would be to require either an environment variable of LANGSMITH_DANGEROUSLY_ALLOW_FILESYSTEM or some client init variable saying this. Otherwise throw an error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to explicitly close the files the are opened here, using some try, finally pattern. I would structure the code such that the multipart form is assembled and sent within the same method we are opening the file handles:

try:
    # Prepare the files for the multipart request
    # Make the POST request
finally:
    # Explicitly close the files

f"{content_type}; length={file_size}",
{},
),
)
)
)
return MultipartPartsAndContext(
acc_parts,
f"trace={op.trace_id},id={op.id}",
Expand Down
128 changes: 91 additions & 37 deletions python/langsmith/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1537,6 +1537,7 @@ def multipart_ingest(
] = None,
*,
pre_sampled: bool = False,
dangerously_allow_filesystem: bool = False,
) -> None:
"""Batch ingest/upsert multiple runs in the Langsmith system.

Expand Down Expand Up @@ -1621,6 +1622,18 @@ def multipart_ingest(
)
)

for op in serialized_ops:
if isinstance(op, SerializedRunOperation) and op.attachments:
for attachment in op.attachments.values():
if (
isinstance(attachment, tuple)
and isinstance(attachment[1], Path)
and not dangerously_allow_filesystem
):
raise ValueError(
"Must set dangerously_allow_filesystem to True to use filesystem paths in multipart ingest."
)

# sent the runs in multipart requests
self._multipart_ingest_ops(serialized_ops)

Expand Down Expand Up @@ -1684,6 +1697,7 @@ def update_run(
extra: Optional[Dict] = None,
tags: Optional[List[str]] = None,
attachments: Optional[ls_schemas.Attachments] = None,
dangerously_allow_filesystem: bool = False,
**kwargs: Any,
) -> None:
"""Update a run in the LangSmith API.
Expand Down Expand Up @@ -1726,6 +1740,15 @@ def update_run(
"session_name": kwargs.pop("session_name", None),
}
if attachments:
for _, attachment in attachments.items():
if (
isinstance(attachment, tuple)
and isinstance(attachment[1], Path)
and not dangerously_allow_filesystem
):
raise ValueError(
"Must set dangerously_allow_filesystem=True to allow filesystem attachments."
)
data["attachments"] = attachments
use_multipart = (
self.tracing_queue is not None
Expand Down Expand Up @@ -1835,7 +1858,12 @@ def read_run(
response = self.request_with_retries(
"GET", f"/runs/{_as_uuid(run_id, 'run_id')}"
)
run = ls_schemas.Run(**response.json(), _host_url=self._host_url)
attachments = _convert_stored_attachments_to_attachments_dict(
response.json(), "s3_urls"
)
run = ls_schemas.Run(
attachments=attachments, **response.json(), _host_url=self._host_url
)
if load_child_runs and run.child_run_ids:
run = self._load_child_runs(run)
return run
Expand Down Expand Up @@ -2031,7 +2059,13 @@ def list_runs(
for i, run in enumerate(
self._get_cursor_paginated_list("/runs/query", body=body_query)
):
yield ls_schemas.Run(**run, _host_url=self._host_url)
# Should this be behind a flag?
attachments = _convert_stored_attachments_to_attachments_dict(
run, "s3_urls"
)
yield ls_schemas.Run(
attachments=attachments, **run, _host_url=self._host_url
)
if limit is not None and i + 1 >= limit:
break

Expand Down Expand Up @@ -3470,6 +3504,7 @@ def _prepare_multipart_data(
| List[ls_schemas.ExampleUpdateWithAttachments],
],
include_dataset_id: bool = False,
dangerously_allow_filesystem: bool = False,
) -> Tuple[Any, bytes]:
parts: List[MultipartPart] = []
if include_dataset_id:
Expand Down Expand Up @@ -3555,19 +3590,24 @@ def _prepare_multipart_data(
for name, attachment in example.attachments.items():
if isinstance(attachment, tuple):
if isinstance(attachment[1], Path):
mime_type, file_path = attachment
file_size = os.path.getsize(file_path)
parts.append(
(
f"{example_id}.attachment.{name}",
if dangerously_allow_filesystem:
mime_type, file_path = attachment
file_size = os.path.getsize(file_path)
parts.append(
(
None,
open(file_path, "rb"), # type: ignore[arg-type]
f"{mime_type}; length={file_size}",
{},
),
f"{example_id}.attachment.{name}",
(
None,
open(file_path, "rb"), # type: ignore[arg-type]
f"{mime_type}; length={file_size}",
{},
),
)
)
else:
raise ValueError(
"dangerously_allow_filesystem must be True to upload files from the filesystem"
)
)
else:
mime_type, data = attachment
parts.append(
Expand Down Expand Up @@ -3624,6 +3664,7 @@ def update_examples_multipart(
*,
dataset_id: ID_TYPE,
updates: Optional[List[ls_schemas.ExampleUpdateWithAttachments]] = None,
dangerously_allow_filesystem: bool = False,
) -> ls_schemas.UpsertExamplesResponse:
"""Upload examples."""
if not (self.info.instance_flags or {}).get(
Expand All @@ -3635,7 +3676,11 @@ def update_examples_multipart(
if updates is None:
updates = []

encoder, data = self._prepare_multipart_data(updates, include_dataset_id=False)
encoder, data = self._prepare_multipart_data(
updates,
include_dataset_id=False,
dangerously_allow_filesystem=dangerously_allow_filesystem,
)

response = self.request_with_retries(
"PATCH",
Expand All @@ -3656,6 +3701,7 @@ def upload_examples_multipart(
*,
dataset_id: ID_TYPE,
uploads: Optional[List[ls_schemas.ExampleUploadWithAttachments]] = None,
dangerously_allow_filesystem: bool = False,
) -> ls_schemas.UpsertExamplesResponse:
"""Upload examples."""
if not (self.info.instance_flags or {}).get(
Expand All @@ -3666,7 +3712,11 @@ def upload_examples_multipart(
)
if uploads is None:
uploads = []
encoder, data = self._prepare_multipart_data(uploads, include_dataset_id=False)
encoder, data = self._prepare_multipart_data(
uploads,
include_dataset_id=False,
dangerously_allow_filesystem=dangerously_allow_filesystem,
)

response = self.request_with_retries(
"POST",
Expand All @@ -3686,6 +3736,7 @@ def upsert_examples_multipart(
self,
*,
upserts: Optional[List[ls_schemas.ExampleUpsertWithAttachments]] = None,
dangerously_allow_filesystem: bool = False,
) -> ls_schemas.UpsertExamplesResponse:
"""Upsert examples.

Expand All @@ -3702,7 +3753,11 @@ def upsert_examples_multipart(
if upserts is None:
upserts = []

encoder, data = self._prepare_multipart_data(upserts, include_dataset_id=True)
encoder, data = self._prepare_multipart_data(
upserts,
include_dataset_id=True,
dangerously_allow_filesystem=dangerously_allow_filesystem,
)

response = self.request_with_retries(
"POST",
Expand Down Expand Up @@ -3894,16 +3949,9 @@ def read_example(
)

example = response.json()
attachments = {}
if example["attachment_urls"]:
for key, value in example["attachment_urls"].items():
response = requests.get(value["presigned_url"], stream=True)
response.raise_for_status()
reader = io.BytesIO(response.content)
attachments[key.removeprefix("attachment.")] = {
"presigned_url": value["presigned_url"],
"reader": reader,
}
attachments = _convert_stored_attachments_to_attachments_dict(
example, "attachment_urls"
)

return ls_schemas.Example(
**{k: v for k, v in example.items() if k != "attachment_urls"},
Expand Down Expand Up @@ -3980,17 +4028,9 @@ def list_examples(
for i, example in enumerate(
self._get_paginated_list("/examples", params=params)
):
attachments = {}
if example["attachment_urls"]:
for key, value in example["attachment_urls"].items():
response = requests.get(value["presigned_url"], stream=True)
response.raise_for_status()
reader = io.BytesIO(response.content)
attachments[key.removeprefix("attachment.")] = {
"presigned_url": value["presigned_url"],
"reader": reader,
}

attachments = _convert_stored_attachments_to_attachments_dict(
example, "attachment_urls"
)
yield ls_schemas.Example(
**{k: v for k, v in example.items() if k != "attachment_urls"},
attachments=attachments,
Expand Down Expand Up @@ -6657,3 +6697,17 @@ def convert_prompt_to_anthropic_format(
return anthropic._get_request_payload(messages, stop=stop)
except Exception as e:
raise ls_utils.LangSmithError(f"Error converting to Anthropic format: {e}")


def _convert_stored_attachments_to_attachments_dict(data, attachments_key):
attachments_dict = {}
if attachments_key in data and data[attachments_key]:
for key, value in data[attachments_key].items():
response = requests.get(value["presigned_url"], stream=True)
response.raise_for_status()
reader = io.BytesIO(response.content)
attachments_dict[key.removeprefix("attachment.")] = {
"presigned_url": value["presigned_url"],
"reader": reader,
}
return attachments_dict
4 changes: 3 additions & 1 deletion python/langsmith/run_trees.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,9 @@ def patch(self) -> None:
"""Patch the run tree to the API in a background thread."""
if not self.end_time:
self.end()
attachments = self.attachments
attachments = {
a: v for a, v in self.attachments.items() if isinstance(v, tuple)
}
Comment on lines +321 to +323
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was done strictly for typing

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is v ever not a tuple?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If so might be worth doing in a for loop and raising an error if not. I think would be unexpected. But mypy probably knows better than me, so there might be a case we're not handling here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

v is never not a tuple here, this is just because of the union types I believe and it thinks that something could be passed here that is never passed

try:
# Avoid loading the same attachment twice
if attachments:
Expand Down
17 changes: 11 additions & 6 deletions python/langsmith/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
StrictInt,
)

from pathlib import Path

from typing_extensions import Literal

SCORE_TYPE = Union[StrictBool, StrictInt, StrictFloat, None]
Expand All @@ -63,7 +65,7 @@ def my_function(bar: int, my_val: Attachment):
data: bytes


Attachments = Dict[str, Union[Tuple[str, bytes], Attachment]]
Attachments = Dict[str, Union[Tuple[str, bytes], Attachment, Tuple[str, Path]]]
isahers1 marked this conversation as resolved.
Show resolved Hide resolved
"""Attachments associated with the run.
Each entry is a tuple of (mime_type, bytes), or (mime_type, file_path)"""

Expand All @@ -76,10 +78,6 @@ def read(self, size: int = -1) -> bytes:
"""Read function."""
...

def write(self, b: bytes) -> int:
"""Write function."""
...

def seek(self, offset: int, whence: int = 0) -> int:
"""Seek function."""
...
Expand Down Expand Up @@ -370,7 +368,9 @@ class RunBase(BaseModel):
tags: Optional[List[str]] = None
"""Tags for categorizing or annotating the run."""

attachments: Attachments = Field(default_factory=dict)
attachments: Union[Attachments, Dict[str, AttachmentInfo]] = Field(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this related to this pr?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah need this for the changes in read_run

default_factory=dict
)
"""Attachments associated with the run.
Each entry is a tuple of (mime_type, bytes)."""

Expand All @@ -390,6 +390,11 @@ def __repr__(self):
"""Return a string representation of the RunBase object."""
return f"{self.__class__}(id={self.id}, name='{self.name}', run_type='{self.run_type}')"

class Config:
"""Configuration class for the schema."""

arbitrary_types_allowed = True

Comment on lines +393 to +397
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needed to allow the AttachmentInfo


class Run(RunBase):
"""Run schema when loading from the DB."""
Expand Down
Loading
Loading