Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add path as an option for uploading attachments #1331

Draft
wants to merge 19 commits into
base: main
Choose a base branch
from
36 changes: 26 additions & 10 deletions python/langsmith/_internal/_operations.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from __future__ import annotations

Check notice on line 1 in python/langsmith/_internal/_operations.py

View workflow job for this annotation

GitHub Actions / benchmark

Benchmark results

........... WARNING: the benchmark result may be unstable * the standard deviation (113 ms) is 16% of the mean (693 ms) Try to rerun the benchmark with more runs, values and/or loops. Run 'python -m pyperf system tune' command to reduce the system jitter. Use pyperf stats, pyperf dump and pyperf hist to analyze results. Use --quiet option to hide these warnings. create_5_000_run_trees: Mean +- std dev: 693 ms +- 113 ms ........... WARNING: the benchmark result may be unstable * the standard deviation (161 ms) is 11% of the mean (1.44 sec) Try to rerun the benchmark with more runs, values and/or loops. Run 'python -m pyperf system tune' command to reduce the system jitter. Use pyperf stats, pyperf dump and pyperf hist to analyze results. Use --quiet option to hide these warnings. create_10_000_run_trees: Mean +- std dev: 1.44 sec +- 0.16 sec ........... WARNING: the benchmark result may be unstable * the standard deviation (219 ms) is 15% of the mean (1.49 sec) Try to rerun the benchmark with more runs, values and/or loops. Run 'python -m pyperf system tune' command to reduce the system jitter. Use pyperf stats, pyperf dump and pyperf hist to analyze results. Use --quiet option to hide these warnings. create_20_000_run_trees: Mean +- std dev: 1.49 sec +- 0.22 sec ........... dumps_class_nested_py_branch_and_leaf_200x400: Mean +- std dev: 687 us +- 7 us ........... dumps_class_nested_py_leaf_50x100: Mean +- std dev: 25.2 ms +- 0.3 ms ........... dumps_class_nested_py_leaf_100x200: Mean +- std dev: 106 ms +- 3 ms ........... dumps_dataclass_nested_50x100: Mean +- std dev: 25.8 ms +- 0.5 ms ........... WARNING: the benchmark result may be unstable * the standard deviation (18.9 ms) is 25% of the mean (74.5 ms) Try to rerun the benchmark with more runs, values and/or loops. Run 'python -m pyperf system tune' command to reduce the system jitter. Use pyperf stats, pyperf dump and pyperf hist to analyze results. Use --quiet option to hide these warnings. dumps_pydantic_nested_50x100: Mean +- std dev: 74.5 ms +- 18.9 ms ........... dumps_pydanticv1_nested_50x100: Mean +- std dev: 202 ms +- 2 ms

Check notice on line 1 in python/langsmith/_internal/_operations.py

View workflow job for this annotation

GitHub Actions / benchmark

Comparison against main

+-----------------------------------------------+----------+------------------------+ | Benchmark | main | changes | +===============================================+==========+========================+ | dumps_pydanticv1_nested_50x100 | 220 ms | 202 ms: 1.09x faster | +-----------------------------------------------+----------+------------------------+ | create_5_000_run_trees | 731 ms | 693 ms: 1.05x faster | +-----------------------------------------------+----------+------------------------+ | dumps_class_nested_py_branch_and_leaf_200x400 | 690 us | 687 us: 1.00x faster | +-----------------------------------------------+----------+------------------------+ | dumps_class_nested_py_leaf_50x100 | 25.1 ms | 25.2 ms: 1.01x slower | +-----------------------------------------------+----------+------------------------+ | dumps_dataclass_nested_50x100 | 25.5 ms | 25.8 ms: 1.01x slower | +-----------------------------------------------+----------+------------------------+ | dumps_class_nested_py_leaf_100x200 | 104 ms | 106 ms: 1.02x slower | +-----------------------------------------------+----------+------------------------+ | create_10_000_run_trees | 1.41 sec | 1.44 sec: 1.02x slower | +-----------------------------------------------+----------+------------------------+ | create_20_000_run_trees | 1.39 sec | 1.49 sec: 1.07x slower | +-----------------------------------------------+----------+------------------------+ | dumps_pydantic_nested_50x100 | 65.9 ms | 74.5 ms: 1.13x slower | +-----------------------------------------------+----------+------------------------+ | Geometric mean | (ref) | 1.01x slower | +-----------------------------------------------+----------+------------------------+

import itertools
import logging
import os
import uuid
from typing import Literal, Optional, Union, cast

Expand Down Expand Up @@ -246,7 +247,7 @@
),
)
if op.attachments:
for n, (content_type, valb) in op.attachments.items():
for n, (content_type, data) in op.attachments.items():
if "." in n:
logger.warning(
f"Skipping logging of attachment '{n}' "
Expand All @@ -256,17 +257,32 @@
)
continue

acc_parts.append(
(
f"attachment.{op.id}.{n}",
if isinstance(data, bytes):
acc_parts.append(
(
None,
valb,
content_type,
{"Content-Length": str(len(valb))},
),
f"attachment.{op.id}.{n}",
(
None,
data,
content_type,
{"Content-Length": str(len(data))},
),
)
)
else:
file_path = data
file_size = os.path.getsize(file_path)
acc_parts.append(
(
f"attachment.{op.id}.{n}",
(
None,
open(file_path, "rb"), # type: ignore[arg-type]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we not need to read this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, I believe the multipart endpoint allows a buffer to be passed (I also added a test that should cover this)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is pretty cve worthy so we should be careful about cases where this is allowed/encouraged

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My suggestion would be to require either an environment variable of LANGSMITH_DANGEROUSLY_ALLOW_FILESYSTEM or some client init variable saying this. Otherwise throw an error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to explicitly close the files the are opened here, using some try, finally pattern. I would structure the code such that the multipart form is assembled and sent within the same method we are opening the file handles:

try:
    # Prepare the files for the multipart request
    # Make the POST request
finally:
    # Explicitly close the files

f"{content_type}; length={file_size}",
{},
),
)
)
)
return MultipartPartsAndContext(
acc_parts,
f"trace={op.trace_id},id={op.id}",
Expand Down
93 changes: 56 additions & 37 deletions python/langsmith/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1835,7 +1835,12 @@ def read_run(
response = self.request_with_retries(
"GET", f"/runs/{_as_uuid(run_id, 'run_id')}"
)
run = ls_schemas.Run(**response.json(), _host_url=self._host_url)
attachments = _convert_stored_attachments_to_attachments_dict(
response.json(), "s3_urls"
)
run = ls_schemas.Run(
attachments=attachments, **response.json(), _host_url=self._host_url
)
if load_child_runs and run.child_run_ids:
run = self._load_child_runs(run)
return run
Expand Down Expand Up @@ -2031,7 +2036,13 @@ def list_runs(
for i, run in enumerate(
self._get_cursor_paginated_list("/runs/query", body=body_query)
):
yield ls_schemas.Run(**run, _host_url=self._host_url)
# Should this be behind a flag?
attachments = _convert_stored_attachments_to_attachments_dict(
run, "s3_urls"
)
yield ls_schemas.Run(
attachments=attachments, **run, _host_url=self._host_url
)
if limit is not None and i + 1 >= limit:
break

Expand Down Expand Up @@ -3470,6 +3481,7 @@ def _prepare_multipart_data(
| List[ls_schemas.ExampleUpdateWithAttachments],
],
include_dataset_id: bool = False,
dangerously_allow_filesystem: Optional[bool] = False,
) -> Tuple[Any, bytes]:
parts: List[MultipartPart] = []
if include_dataset_id:
Expand Down Expand Up @@ -3555,19 +3567,24 @@ def _prepare_multipart_data(
for name, attachment in example.attachments.items():
if isinstance(attachment, tuple):
if isinstance(attachment[1], Path):
mime_type, file_path = attachment
file_size = os.path.getsize(file_path)
parts.append(
(
f"{example_id}.attachment.{name}",
if dangerously_allow_filesystem == True:
mime_type, file_path = attachment
file_size = os.path.getsize(file_path)
parts.append(
(
None,
open(file_path, "rb"), # type: ignore[arg-type]
f"{mime_type}; length={file_size}",
{},
),
f"{example_id}.attachment.{name}",
(
None,
open(file_path, "rb"), # type: ignore[arg-type]
f"{mime_type}; length={file_size}",
{},
),
)
)
else:
raise ValueError(
"dangerously_allow_filesystem must be True to upload files from the filesystem"
)
)
else:
mime_type, data = attachment
parts.append(
Expand Down Expand Up @@ -3624,6 +3641,7 @@ def update_examples_multipart(
*,
dataset_id: ID_TYPE,
updates: Optional[List[ls_schemas.ExampleUpdateWithAttachments]] = None,
dangerously_allow_filesystem: Optional[bool] = False,
) -> ls_schemas.UpsertExamplesResponse:
"""Upload examples."""
if not (self.info.instance_flags or {}).get(
Expand All @@ -3635,7 +3653,7 @@ def update_examples_multipart(
if updates is None:
updates = []

encoder, data = self._prepare_multipart_data(updates, include_dataset_id=False)
encoder, data = self._prepare_multipart_data(updates, include_dataset_id=False, dangerously_allow_filesystem=dangerously_allow_filesystem)

response = self.request_with_retries(
"PATCH",
Expand All @@ -3656,6 +3674,7 @@ def upload_examples_multipart(
*,
dataset_id: ID_TYPE,
uploads: Optional[List[ls_schemas.ExampleUploadWithAttachments]] = None,
dangerously_allow_filesystem: Optional[bool] = False,
) -> ls_schemas.UpsertExamplesResponse:
"""Upload examples."""
if not (self.info.instance_flags or {}).get(
Expand All @@ -3666,7 +3685,7 @@ def upload_examples_multipart(
)
if uploads is None:
uploads = []
encoder, data = self._prepare_multipart_data(uploads, include_dataset_id=False)
encoder, data = self._prepare_multipart_data(uploads, include_dataset_id=False, dangerously_allow_filesystem=dangerously_allow_filesystem)

response = self.request_with_retries(
"POST",
Expand All @@ -3686,6 +3705,7 @@ def upsert_examples_multipart(
self,
*,
upserts: Optional[List[ls_schemas.ExampleUpsertWithAttachments]] = None,
dangerously_allow_filesystem: Optional[bool] = False,
) -> ls_schemas.UpsertExamplesResponse:
"""Upsert examples.

Expand All @@ -3702,7 +3722,7 @@ def upsert_examples_multipart(
if upserts is None:
upserts = []

encoder, data = self._prepare_multipart_data(upserts, include_dataset_id=True)
encoder, data = self._prepare_multipart_data(upserts, include_dataset_id=True, dangerously_allow_filesystem=dangerously_allow_filesystem)

response = self.request_with_retries(
"POST",
Expand Down Expand Up @@ -3894,16 +3914,9 @@ def read_example(
)

example = response.json()
attachments = {}
if example["attachment_urls"]:
for key, value in example["attachment_urls"].items():
response = requests.get(value["presigned_url"], stream=True)
response.raise_for_status()
reader = io.BytesIO(response.content)
attachments[key.removeprefix("attachment.")] = {
"presigned_url": value["presigned_url"],
"reader": reader,
}
attachments = _convert_stored_attachments_to_attachments_dict(
example, "attachment_urls"
)

return ls_schemas.Example(
**{k: v for k, v in example.items() if k != "attachment_urls"},
Expand Down Expand Up @@ -3980,17 +3993,9 @@ def list_examples(
for i, example in enumerate(
self._get_paginated_list("/examples", params=params)
):
attachments = {}
if example["attachment_urls"]:
for key, value in example["attachment_urls"].items():
response = requests.get(value["presigned_url"], stream=True)
response.raise_for_status()
reader = io.BytesIO(response.content)
attachments[key.removeprefix("attachment.")] = {
"presigned_url": value["presigned_url"],
"reader": reader,
}

attachments = _convert_stored_attachments_to_attachments_dict(
example, "attachment_urls"
)
yield ls_schemas.Example(
**{k: v for k, v in example.items() if k != "attachment_urls"},
attachments=attachments,
Expand Down Expand Up @@ -6657,3 +6662,17 @@ def convert_prompt_to_anthropic_format(
return anthropic._get_request_payload(messages, stop=stop)
except Exception as e:
raise ls_utils.LangSmithError(f"Error converting to Anthropic format: {e}")


def _convert_stored_attachments_to_attachments_dict(data, attachments_key):
attachments_dict = {}
if attachments_key in data and data[attachments_key]:
for key, value in data[attachments_key].items():
response = requests.get(value["presigned_url"], stream=True)
response.raise_for_status()
reader = io.BytesIO(response.content)
attachments_dict[key.removeprefix("attachment.")] = {
"presigned_url": value["presigned_url"],
"reader": reader,
}
return attachments_dict
4 changes: 3 additions & 1 deletion python/langsmith/run_trees.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,9 @@ def patch(self) -> None:
"""Patch the run tree to the API in a background thread."""
if not self.end_time:
self.end()
attachments = self.attachments
attachments = {
a: v for a, v in self.attachments.items() if isinstance(v, tuple)
}
Comment on lines +321 to +323
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was done strictly for typing

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is v ever not a tuple?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If so might be worth doing in a for loop and raising an error if not. I think would be unexpected. But mypy probably knows better than me, so there might be a case we're not handling here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

v is never not a tuple here, this is just because of the union types I believe and it thinks that something could be passed here that is never passed

try:
# Avoid loading the same attachment twice
if attachments:
Expand Down
17 changes: 11 additions & 6 deletions python/langsmith/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
StrictInt,
)

from pathlib import Path

from typing_extensions import Literal

SCORE_TYPE = Union[StrictBool, StrictInt, StrictFloat, None]
Expand All @@ -63,7 +65,7 @@ def my_function(bar: int, my_val: Attachment):
data: bytes


Attachments = Dict[str, Union[Tuple[str, bytes], Attachment]]
Attachments = Dict[str, Union[Tuple[str, bytes], Attachment, Tuple[str, Path]]]
isahers1 marked this conversation as resolved.
Show resolved Hide resolved
"""Attachments associated with the run.
Each entry is a tuple of (mime_type, bytes), or (mime_type, file_path)"""

Expand All @@ -76,10 +78,6 @@ def read(self, size: int = -1) -> bytes:
"""Read function."""
...

def write(self, b: bytes) -> int:
"""Write function."""
...

def seek(self, offset: int, whence: int = 0) -> int:
"""Seek function."""
...
Expand Down Expand Up @@ -370,7 +368,9 @@ class RunBase(BaseModel):
tags: Optional[List[str]] = None
"""Tags for categorizing or annotating the run."""

attachments: Attachments = Field(default_factory=dict)
attachments: Union[Attachments, Dict[str, AttachmentInfo]] = Field(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this related to this pr?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah need this for the changes in read_run

default_factory=dict
)
"""Attachments associated with the run.
Each entry is a tuple of (mime_type, bytes)."""

Expand All @@ -390,6 +390,11 @@ def __repr__(self):
"""Return a string representation of the RunBase object."""
return f"{self.__class__}(id={self.id}, name='{self.name}', run_type='{self.run_type}')"

class Config:
"""Configuration class for the schema."""

arbitrary_types_allowed = True

Comment on lines +393 to +397
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needed to allow the AttachmentInfo


class Run(RunBase):
"""Run schema when loading from the DB."""
Expand Down
Loading
Loading