Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add path as an option for uploading attachments #1331

Draft
wants to merge 27 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 26 additions & 10 deletions python/langsmith/_internal/_operations.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from __future__ import annotations

Check notice on line 1 in python/langsmith/_internal/_operations.py

View workflow job for this annotation

GitHub Actions / benchmark

Benchmark results

........... WARNING: the benchmark result may be unstable * the standard deviation (102 ms) is 15% of the mean (704 ms) Try to rerun the benchmark with more runs, values and/or loops. Run 'python -m pyperf system tune' command to reduce the system jitter. Use pyperf stats, pyperf dump and pyperf hist to analyze results. Use --quiet option to hide these warnings. create_5_000_run_trees: Mean +- std dev: 704 ms +- 102 ms ........... WARNING: the benchmark result may be unstable * the standard deviation (190 ms) is 13% of the mean (1.43 sec) Try to rerun the benchmark with more runs, values and/or loops. Run 'python -m pyperf system tune' command to reduce the system jitter. Use pyperf stats, pyperf dump and pyperf hist to analyze results. Use --quiet option to hide these warnings. create_10_000_run_trees: Mean +- std dev: 1.43 sec +- 0.19 sec ........... WARNING: the benchmark result may be unstable * the standard deviation (151 ms) is 11% of the mean (1.41 sec) Try to rerun the benchmark with more runs, values and/or loops. Run 'python -m pyperf system tune' command to reduce the system jitter. Use pyperf stats, pyperf dump and pyperf hist to analyze results. Use --quiet option to hide these warnings. create_20_000_run_trees: Mean +- std dev: 1.41 sec +- 0.15 sec ........... dumps_class_nested_py_branch_and_leaf_200x400: Mean +- std dev: 705 us +- 29 us ........... dumps_class_nested_py_leaf_50x100: Mean +- std dev: 25.1 ms +- 0.2 ms ........... dumps_class_nested_py_leaf_100x200: Mean +- std dev: 104 ms +- 2 ms ........... dumps_dataclass_nested_50x100: Mean +- std dev: 25.4 ms +- 0.3 ms ........... WARNING: the benchmark result may be unstable * the standard deviation (17.0 ms) is 23% of the mean (72.5 ms) Try to rerun the benchmark with more runs, values and/or loops. Run 'python -m pyperf system tune' command to reduce the system jitter. Use pyperf stats, pyperf dump and pyperf hist to analyze results. Use --quiet option to hide these warnings. dumps_pydantic_nested_50x100: Mean +- std dev: 72.5 ms +- 17.0 ms ........... dumps_pydanticv1_nested_50x100: Mean +- std dev: 197 ms +- 2 ms

Check notice on line 1 in python/langsmith/_internal/_operations.py

View workflow job for this annotation

GitHub Actions / benchmark

Comparison against main

+-----------------------------------------------+----------+------------------------+ | Benchmark | main | changes | +===============================================+==========+========================+ | dumps_pydanticv1_nested_50x100 | 218 ms | 197 ms: 1.10x faster | +-----------------------------------------------+----------+------------------------+ | create_5_000_run_trees | 724 ms | 704 ms: 1.03x faster | +-----------------------------------------------+----------+------------------------+ | dumps_class_nested_py_leaf_50x100 | 24.9 ms | 25.1 ms: 1.01x slower | +-----------------------------------------------+----------+------------------------+ | dumps_dataclass_nested_50x100 | 25.1 ms | 25.4 ms: 1.01x slower | +-----------------------------------------------+----------+------------------------+ | dumps_class_nested_py_leaf_100x200 | 103 ms | 104 ms: 1.01x slower | +-----------------------------------------------+----------+------------------------+ | dumps_class_nested_py_branch_and_leaf_200x400 | 695 us | 705 us: 1.01x slower | +-----------------------------------------------+----------+------------------------+ | create_20_000_run_trees | 1.39 sec | 1.41 sec: 1.02x slower | +-----------------------------------------------+----------+------------------------+ | create_10_000_run_trees | 1.39 sec | 1.43 sec: 1.03x slower | +-----------------------------------------------+----------+------------------------+ | dumps_pydantic_nested_50x100 | 64.6 ms | 72.5 ms: 1.12x slower | +-----------------------------------------------+----------+------------------------+ | Geometric mean | (ref) | 1.01x slower | +-----------------------------------------------+----------+------------------------+

import itertools
import logging
import os
import uuid
from typing import Literal, Optional, Union, cast

Expand Down Expand Up @@ -246,7 +247,7 @@
),
)
if op.attachments:
for n, (content_type, valb) in op.attachments.items():
for n, (content_type, data) in op.attachments.items():
isahers1 marked this conversation as resolved.
Show resolved Hide resolved
if "." in n:
logger.warning(
f"Skipping logging of attachment '{n}' "
Expand All @@ -256,17 +257,32 @@
)
continue

acc_parts.append(
(
f"attachment.{op.id}.{n}",
if isinstance(data, bytes):
acc_parts.append(
(
None,
valb,
content_type,
{"Content-Length": str(len(valb))},
),
f"attachment.{op.id}.{n}",
(
None,
data,
content_type,
{"Content-Length": str(len(data))},
),
)
)
else:
file_path = data
file_size = os.path.getsize(file_path)
acc_parts.append(
(
f"attachment.{op.id}.{n}",
(
None,
open(file_path, "rb"), # type: ignore[arg-type]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we not need to read this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, I believe the multipart endpoint allows a buffer to be passed (I also added a test that should cover this)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is pretty cve worthy so we should be careful about cases where this is allowed/encouraged

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My suggestion would be to require either an environment variable of LANGSMITH_DANGEROUSLY_ALLOW_FILESYSTEM or some client init variable saying this. Otherwise throw an error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to explicitly close the files the are opened here, using some try, finally pattern. I would structure the code such that the multipart form is assembled and sent within the same method we are opening the file handles:

try:
    # Prepare the files for the multipart request
    # Make the POST request
finally:
    # Explicitly close the files

f"{content_type}; length={file_size}",
{},
),
)
)
)
return MultipartPartsAndContext(
acc_parts,
f"trace={op.trace_id},id={op.id}",
Expand Down
56 changes: 33 additions & 23 deletions python/langsmith/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1835,7 +1835,12 @@ def read_run(
response = self.request_with_retries(
"GET", f"/runs/{_as_uuid(run_id, 'run_id')}"
)
run = ls_schemas.Run(**response.json(), _host_url=self._host_url)
attachments = _convert_stored_attachments_to_attachments_dict(
response.json(), "s3_urls"
)
run = ls_schemas.Run(
attachments=attachments, **response.json(), _host_url=self._host_url
)
if load_child_runs and run.child_run_ids:
run = self._load_child_runs(run)
return run
Expand Down Expand Up @@ -2031,7 +2036,13 @@ def list_runs(
for i, run in enumerate(
self._get_cursor_paginated_list("/runs/query", body=body_query)
):
yield ls_schemas.Run(**run, _host_url=self._host_url)
# Should this be behind a flag?
attachments = _convert_stored_attachments_to_attachments_dict(
run, "s3_urls"
)
yield ls_schemas.Run(
attachments=attachments, **run, _host_url=self._host_url
)
if limit is not None and i + 1 >= limit:
break

Expand Down Expand Up @@ -3894,16 +3905,9 @@ def read_example(
)

example = response.json()
attachments = {}
if example["attachment_urls"]:
for key, value in example["attachment_urls"].items():
response = requests.get(value["presigned_url"], stream=True)
response.raise_for_status()
reader = io.BytesIO(response.content)
attachments[key.removeprefix("attachment.")] = {
"presigned_url": value["presigned_url"],
"reader": reader,
}
attachments = _convert_stored_attachments_to_attachments_dict(
example, "attachment_urls"
)

return ls_schemas.Example(
**{k: v for k, v in example.items() if k != "attachment_urls"},
Expand Down Expand Up @@ -3980,17 +3984,9 @@ def list_examples(
for i, example in enumerate(
self._get_paginated_list("/examples", params=params)
):
attachments = {}
if example["attachment_urls"]:
for key, value in example["attachment_urls"].items():
response = requests.get(value["presigned_url"], stream=True)
response.raise_for_status()
reader = io.BytesIO(response.content)
attachments[key.removeprefix("attachment.")] = {
"presigned_url": value["presigned_url"],
"reader": reader,
}

attachments = _convert_stored_attachments_to_attachments_dict(
example, "attachment_urls"
)
yield ls_schemas.Example(
**{k: v for k, v in example.items() if k != "attachment_urls"},
attachments=attachments,
Expand Down Expand Up @@ -6657,3 +6653,17 @@ def convert_prompt_to_anthropic_format(
return anthropic._get_request_payload(messages, stop=stop)
except Exception as e:
raise ls_utils.LangSmithError(f"Error converting to Anthropic format: {e}")


def _convert_stored_attachments_to_attachments_dict(data, attachments_key):
attachments_dict = {}
isahers1 marked this conversation as resolved.
Show resolved Hide resolved
if attachments_key in data and data[attachments_key]:
for key, value in data[attachments_key].items():
response = requests.get(value["presigned_url"], stream=True)
response.raise_for_status()
reader = io.BytesIO(response.content)
attachments_dict[key.removeprefix("attachment.")] = {
"presigned_url": value["presigned_url"],
"reader": reader,
}
return attachments_dict
4 changes: 3 additions & 1 deletion python/langsmith/run_trees.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,9 @@ def patch(self) -> None:
"""Patch the run tree to the API in a background thread."""
if not self.end_time:
self.end()
attachments = self.attachments
attachments = {
a: v for a, v in self.attachments.items() if isinstance(v, tuple)
}
Comment on lines +321 to +323
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was done strictly for typing

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is v ever not a tuple?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If so might be worth doing in a for loop and raising an error if not. I think would be unexpected. But mypy probably knows better than me, so there might be a case we're not handling here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

v is never not a tuple here, this is just because of the union types I believe and it thinks that something could be passed here that is never passed

try:
# Avoid loading the same attachment twice
if attachments:
Expand Down
17 changes: 11 additions & 6 deletions python/langsmith/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
StrictInt,
)

from pathlib import Path

from typing_extensions import Literal

SCORE_TYPE = Union[StrictBool, StrictInt, StrictFloat, None]
Expand All @@ -63,7 +65,7 @@ def my_function(bar: int, my_val: Attachment):
data: bytes


Attachments = Dict[str, Union[Tuple[str, bytes], Attachment]]
Attachments = Dict[str, Union[Tuple[str, bytes], Attachment, Tuple[str, Path]]]
isahers1 marked this conversation as resolved.
Show resolved Hide resolved
"""Attachments associated with the run.
Each entry is a tuple of (mime_type, bytes), or (mime_type, file_path)"""

Expand All @@ -76,10 +78,6 @@ def read(self, size: int = -1) -> bytes:
"""Read function."""
...

def write(self, b: bytes) -> int:
"""Write function."""
...

def seek(self, offset: int, whence: int = 0) -> int:
"""Seek function."""
...
Expand Down Expand Up @@ -370,7 +368,9 @@ class RunBase(BaseModel):
tags: Optional[List[str]] = None
"""Tags for categorizing or annotating the run."""

attachments: Attachments = Field(default_factory=dict)
attachments: Union[Attachments, Dict[str, AttachmentInfo]] = Field(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this related to this pr?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah need this for the changes in read_run

default_factory=dict
)
"""Attachments associated with the run.
Each entry is a tuple of (mime_type, bytes)."""

Expand All @@ -390,6 +390,11 @@ def __repr__(self):
"""Return a string representation of the RunBase object."""
return f"{self.__class__}(id={self.id}, name='{self.name}', run_type='{self.run_type}')"

class Config:
"""Configuration class for the schema."""

arbitrary_types_allowed = True

Comment on lines +393 to +397
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needed to allow the AttachmentInfo


class Run(RunBase):
"""Run schema when loading from the DB."""
Expand Down
Loading
Loading