diff --git a/python/langsmith/_internal/_operations.py b/python/langsmith/_internal/_operations.py index 66decff0f..ccb046797 100644 --- a/python/langsmith/_internal/_operations.py +++ b/python/langsmith/_internal/_operations.py @@ -2,8 +2,10 @@ import itertools import logging +import os import uuid -from typing import Literal, Optional, Union, cast +from io import BufferedReader +from typing import Dict, Literal, Optional, Union, cast from langsmith import schemas as ls_schemas from langsmith._internal import _orjson @@ -211,9 +213,9 @@ def serialized_feedback_operation_to_multipart_parts_and_context( def serialized_run_operation_to_multipart_parts_and_context( op: SerializedRunOperation, -) -> MultipartPartsAndContext: +) -> tuple[MultipartPartsAndContext, Dict[str, BufferedReader]]: acc_parts: list[MultipartPart] = [] - + opened_files_dict: Dict[str, BufferedReader] = {} # this is main object, minus inputs/outputs/events/attachments acc_parts.append( ( @@ -246,7 +248,7 @@ def serialized_run_operation_to_multipart_parts_and_context( ), ) if op.attachments: - for n, (content_type, valb) in op.attachments.items(): + for n, (content_type, data) in op.attachments.items(): if "." in n: logger.warning( f"Skipping logging of attachment '{n}' " @@ -256,18 +258,37 @@ def serialized_run_operation_to_multipart_parts_and_context( ) continue - acc_parts.append( - ( - f"attachment.{op.id}.{n}", + if isinstance(data, bytes): + acc_parts.append( ( - None, - valb, - content_type, - {"Content-Length": str(len(valb))}, - ), + f"attachment.{op.id}.{n}", + ( + None, + data, + content_type, + {"Content-Length": str(len(data))}, + ), + ) + ) + else: + file_size = os.path.getsize(data) + if str(data) in opened_files_dict: + file = opened_files_dict[str(data)] + else: + file = open(data, "rb") + opened_files_dict[str(data)] = file + acc_parts.append( + ( + f"attachment.{op.id}.{n}", + ( + None, + file, # type: ignore[arg-type] + f"{content_type}; length={file_size}", + {}, + ), + ) ) - ) return MultipartPartsAndContext( acc_parts, f"trace={op.trace_id},id={op.id}", - ) + ), opened_files_dict diff --git a/python/langsmith/client.py b/python/langsmith/client.py index f683a36f1..636f08396 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -1512,11 +1512,14 @@ def _multipart_ingest_ops( self, ops: list[Union[SerializedRunOperation, SerializedFeedbackOperation]] ) -> None: parts: list[MultipartPartsAndContext] = [] + opened_files_dict: Dict[str, io.BufferedReader] = {} for op in ops: if isinstance(op, SerializedRunOperation): - parts.append( + part, opened_files = ( serialized_run_operation_to_multipart_parts_and_context(op) ) + parts.append(part) + opened_files_dict.update(opened_files) elif isinstance(op, SerializedFeedbackOperation): parts.append( serialized_feedback_operation_to_multipart_parts_and_context(op) @@ -1525,7 +1528,10 @@ def _multipart_ingest_ops( logger.error("Unknown operation type in tracing queue: %s", type(op)) acc_multipart = join_multipart_parts_and_context(parts) if acc_multipart: - self._send_multipart_req(acc_multipart) + try: + self._send_multipart_req(acc_multipart) + finally: + _close_files(list(opened_files_dict.values())) def multipart_ingest( self, @@ -1537,6 +1543,7 @@ def multipart_ingest( ] = None, *, pre_sampled: bool = False, + dangerously_allow_filesystem: bool = False, ) -> None: """Batch ingest/upsert multiple runs in the Langsmith system. @@ -1621,6 +1628,18 @@ def multipart_ingest( ) ) + for op in serialized_ops: + if isinstance(op, SerializedRunOperation) and op.attachments: + for attachment in op.attachments.values(): + if ( + isinstance(attachment, tuple) + and isinstance(attachment[1], Path) + and not dangerously_allow_filesystem + ): + raise ValueError( + "Must set dangerously_allow_filesystem to True to use filesystem paths in multipart ingest." + ) + # sent the runs in multipart requests self._multipart_ingest_ops(serialized_ops) @@ -1684,6 +1703,7 @@ def update_run( extra: Optional[Dict] = None, tags: Optional[List[str]] = None, attachments: Optional[ls_schemas.Attachments] = None, + dangerously_allow_filesystem: bool = False, **kwargs: Any, ) -> None: """Update a run in the LangSmith API. @@ -1726,6 +1746,15 @@ def update_run( "session_name": kwargs.pop("session_name", None), } if attachments: + for _, attachment in attachments.items(): + if ( + isinstance(attachment, tuple) + and isinstance(attachment[1], Path) + and not dangerously_allow_filesystem + ): + raise ValueError( + "Must set dangerously_allow_filesystem=True to allow filesystem attachments." + ) data["attachments"] = attachments use_multipart = ( self.tracing_queue is not None @@ -1835,7 +1864,12 @@ def read_run( response = self.request_with_retries( "GET", f"/runs/{_as_uuid(run_id, 'run_id')}" ) - run = ls_schemas.Run(**response.json(), _host_url=self._host_url) + attachments = _convert_stored_attachments_to_attachments_dict( + response.json(), "s3_urls" + ) + run = ls_schemas.Run( + attachments=attachments, **response.json(), _host_url=self._host_url + ) if load_child_runs and run.child_run_ids: run = self._load_child_runs(run) return run @@ -2031,7 +2065,13 @@ def list_runs( for i, run in enumerate( self._get_cursor_paginated_list("/runs/query", body=body_query) ): - yield ls_schemas.Run(**run, _host_url=self._host_url) + # Should this be behind a flag? + attachments = _convert_stored_attachments_to_attachments_dict( + run, "s3_urls" + ) + yield ls_schemas.Run( + attachments=attachments, **run, _host_url=self._host_url + ) if limit is not None and i + 1 >= limit: break @@ -3470,8 +3510,10 @@ def _prepare_multipart_data( | List[ls_schemas.ExampleUpdateWithAttachments], ], include_dataset_id: bool = False, - ) -> Tuple[Any, bytes]: + dangerously_allow_filesystem: bool = False, + ) -> tuple[Any, bytes, Dict[str, io.BufferedReader]]: parts: List[MultipartPart] = [] + opened_files_dict: Dict[str, io.BufferedReader] = {} if include_dataset_id: if not isinstance(examples[0], ls_schemas.ExampleUpsertWithAttachments): raise ValueError( @@ -3552,34 +3594,30 @@ def _prepare_multipart_data( ) if example.attachments: - for name, attachment in example.attachments.items(): - if isinstance(attachment, tuple): - if isinstance(attachment[1], Path): - mime_type, file_path = attachment - file_size = os.path.getsize(file_path) + for name, (mime_type, attachment_data) in example.attachments.items(): + if isinstance(attachment_data, Path): + if dangerously_allow_filesystem: + file_size = os.path.getsize(attachment_data) + if str(attachment_data) in opened_files_dict: + file = opened_files_dict[str(attachment_data)] + else: + file = open(attachment_data, "rb") + opened_files_dict[str(attachment_data)] = file + parts.append( ( f"{example_id}.attachment.{name}", ( None, - open(file_path, "rb"), # type: ignore[arg-type] + file, # type: ignore[arg-type] f"{mime_type}; length={file_size}", {}, ), ) ) else: - mime_type, data = attachment - parts.append( - ( - f"{example_id}.attachment.{name}", - ( - None, - data, - f"{mime_type}; length={len(data)}", - {}, - ), - ) + raise ValueError( + "dangerously_allow_filesystem must be True to upload files from the filesystem" ) else: parts.append( @@ -3587,8 +3625,8 @@ def _prepare_multipart_data( f"{example_id}.attachment.{name}", ( None, - attachment.data, - f"{attachment.mime_type}; length={len(attachment.data)}", + attachment_data, + f"{mime_type}; length={len(attachment_data)}", {}, ), ) @@ -3617,13 +3655,14 @@ def _prepare_multipart_data( else: data = encoder - return encoder, data + return encoder, data, opened_files_dict def update_examples_multipart( self, *, dataset_id: ID_TYPE, updates: Optional[List[ls_schemas.ExampleUpdateWithAttachments]] = None, + dangerously_allow_filesystem: bool = False, ) -> ls_schemas.UpsertExamplesResponse: """Upload examples.""" if not (self.info.instance_flags or {}).get( @@ -3635,20 +3674,27 @@ def update_examples_multipart( if updates is None: updates = [] - encoder, data = self._prepare_multipart_data(updates, include_dataset_id=False) + encoder, data, opened_files_dict = self._prepare_multipart_data( + updates, + include_dataset_id=False, + dangerously_allow_filesystem=dangerously_allow_filesystem, + ) - response = self.request_with_retries( - "PATCH", - f"/v1/platform/datasets/{dataset_id}/examples", - request_kwargs={ - "data": data, - "headers": { - **self._headers, - "Content-Type": encoder.content_type, + try: + response = self.request_with_retries( + "PATCH", + f"/v1/platform/datasets/{dataset_id}/examples", + request_kwargs={ + "data": data, + "headers": { + **self._headers, + "Content-Type": encoder.content_type, + }, }, - }, - ) - ls_utils.raise_for_status_with_text(response) + ) + ls_utils.raise_for_status_with_text(response) + finally: + _close_files(list(opened_files_dict.values())) return response.json() def upload_examples_multipart( @@ -3656,6 +3702,7 @@ def upload_examples_multipart( *, dataset_id: ID_TYPE, uploads: Optional[List[ls_schemas.ExampleUploadWithAttachments]] = None, + dangerously_allow_filesystem: bool = False, ) -> ls_schemas.UpsertExamplesResponse: """Upload examples.""" if not (self.info.instance_flags or {}).get( @@ -3666,26 +3713,34 @@ def upload_examples_multipart( ) if uploads is None: uploads = [] - encoder, data = self._prepare_multipart_data(uploads, include_dataset_id=False) + encoder, data, opened_files_dict = self._prepare_multipart_data( + uploads, + include_dataset_id=False, + dangerously_allow_filesystem=dangerously_allow_filesystem, + ) - response = self.request_with_retries( - "POST", - f"/v1/platform/datasets/{dataset_id}/examples", - request_kwargs={ - "data": data, - "headers": { - **self._headers, - "Content-Type": encoder.content_type, + try: + response = self.request_with_retries( + "POST", + f"/v1/platform/datasets/{dataset_id}/examples", + request_kwargs={ + "data": data, + "headers": { + **self._headers, + "Content-Type": encoder.content_type, + }, }, - }, - ) - ls_utils.raise_for_status_with_text(response) + ) + ls_utils.raise_for_status_with_text(response) + finally: + _close_files(list(opened_files_dict.values())) return response.json() def upsert_examples_multipart( self, *, upserts: Optional[List[ls_schemas.ExampleUpsertWithAttachments]] = None, + dangerously_allow_filesystem: bool = False, ) -> ls_schemas.UpsertExamplesResponse: """Upsert examples. @@ -3702,20 +3757,27 @@ def upsert_examples_multipart( if upserts is None: upserts = [] - encoder, data = self._prepare_multipart_data(upserts, include_dataset_id=True) + encoder, data, opened_files_dict = self._prepare_multipart_data( + upserts, + include_dataset_id=True, + dangerously_allow_filesystem=dangerously_allow_filesystem, + ) - response = self.request_with_retries( - "POST", - "/v1/platform/examples/multipart", - request_kwargs={ - "data": data, - "headers": { - **self._headers, - "Content-Type": encoder.content_type, + try: + response = self.request_with_retries( + "POST", + "/v1/platform/examples/multipart", + request_kwargs={ + "data": data, + "headers": { + **self._headers, + "Content-Type": encoder.content_type, + }, }, - }, - ) - ls_utils.raise_for_status_with_text(response) + ) + ls_utils.raise_for_status_with_text(response) + finally: + _close_files(list(opened_files_dict.values())) return response.json() def create_examples( @@ -3894,16 +3956,9 @@ def read_example( ) example = response.json() - attachments = {} - if example["attachment_urls"]: - for key, value in example["attachment_urls"].items(): - response = requests.get(value["presigned_url"], stream=True) - response.raise_for_status() - reader = io.BytesIO(response.content) - attachments[key.removeprefix("attachment.")] = { - "presigned_url": value["presigned_url"], - "reader": reader, - } + attachments = _convert_stored_attachments_to_attachments_dict( + example, "attachment_urls" + ) return ls_schemas.Example( **{k: v for k, v in example.items() if k != "attachment_urls"}, @@ -3980,17 +4035,9 @@ def list_examples( for i, example in enumerate( self._get_paginated_list("/examples", params=params) ): - attachments = {} - if example["attachment_urls"]: - for key, value in example["attachment_urls"].items(): - response = requests.get(value["presigned_url"], stream=True) - response.raise_for_status() - reader = io.BytesIO(response.content) - attachments[key.removeprefix("attachment.")] = { - "presigned_url": value["presigned_url"], - "reader": reader, - } - + attachments = _convert_stored_attachments_to_attachments_dict( + example, "attachment_urls" + ) yield ls_schemas.Example( **{k: v for k, v in example.items() if k != "attachment_urls"}, attachments=attachments, @@ -6657,3 +6704,26 @@ def convert_prompt_to_anthropic_format( return anthropic._get_request_payload(messages, stop=stop) except Exception as e: raise ls_utils.LangSmithError(f"Error converting to Anthropic format: {e}") + + +def _convert_stored_attachments_to_attachments_dict(data, attachments_key): + attachments_dict = {} + if attachments_key in data and data[attachments_key]: + for key, value in data[attachments_key].items(): + response = requests.get(value["presigned_url"], stream=True) + response.raise_for_status() + reader = io.BytesIO(response.content) + attachments_dict[key.removeprefix("attachment.")] = { + "presigned_url": value["presigned_url"], + "reader": reader, + } + return attachments_dict + + +def _close_files(files: List[io.BufferedReader]) -> None: + """Close all opened files used in multipart requests.""" + for file in files: + try: + file.close() + except Exception: + pass diff --git a/python/langsmith/run_trees.py b/python/langsmith/run_trees.py index 63f0cb4e5..8a44cc01c 100644 --- a/python/langsmith/run_trees.py +++ b/python/langsmith/run_trees.py @@ -318,7 +318,9 @@ def patch(self) -> None: """Patch the run tree to the API in a background thread.""" if not self.end_time: self.end() - attachments = self.attachments + attachments = { + a: v for a, v in self.attachments.items() if isinstance(v, tuple) + } try: # Avoid loading the same attachment twice if attachments: diff --git a/python/langsmith/schemas.py b/python/langsmith/schemas.py index acedaf177..c642dd428 100644 --- a/python/langsmith/schemas.py +++ b/python/langsmith/schemas.py @@ -39,6 +39,8 @@ StrictInt, ) +from pathlib import Path + from typing_extensions import Literal SCORE_TYPE = Union[StrictBool, StrictInt, StrictFloat, None] @@ -60,10 +62,10 @@ def my_function(bar: int, my_val: Attachment): """ mime_type: str - data: bytes + data: Union[bytes, Path] -Attachments = Dict[str, Union[Tuple[str, bytes], Attachment]] +Attachments = Dict[str, Union[Tuple[str, bytes], Attachment, Tuple[str, Path]]] """Attachments associated with the run. Each entry is a tuple of (mime_type, bytes), or (mime_type, file_path)""" @@ -76,10 +78,6 @@ def read(self, size: int = -1) -> bytes: """Read function.""" ... - def write(self, b: bytes) -> int: - """Write function.""" - ... - def seek(self, offset: int, whence: int = 0) -> int: """Seek function.""" ... @@ -370,7 +368,9 @@ class RunBase(BaseModel): tags: Optional[List[str]] = None """Tags for categorizing or annotating the run.""" - attachments: Attachments = Field(default_factory=dict) + attachments: Union[Attachments, Dict[str, AttachmentInfo]] = Field( + default_factory=dict + ) """Attachments associated with the run. Each entry is a tuple of (mime_type, bytes).""" @@ -390,6 +390,11 @@ def __repr__(self): """Return a string representation of the RunBase object.""" return f"{self.__class__}(id={self.id}, name='{self.name}', run_type='{self.run_type}')" + class Config: + """Configuration class for the schema.""" + + arbitrary_types_allowed = True + class Run(RunBase): """Run schema when loading from the DB.""" diff --git a/python/tests/integration_tests/test_client.py b/python/tests/integration_tests/test_client.py index 3bcd9d04c..786883a30 100644 --- a/python/tests/integration_tests/test_client.py +++ b/python/tests/integration_tests/test_client.py @@ -10,6 +10,7 @@ import time import uuid from datetime import timedelta +from pathlib import Path from typing import Any, Callable, Dict from unittest import mock from uuid import uuid4 @@ -64,7 +65,7 @@ def langchain_client() -> Client: "dataset_examples_multipart_enabled": True, "examples_multipart_enabled": True, } - }, + } ) @@ -377,6 +378,41 @@ def test_persist_update_run(langchain_client: Client) -> None: langchain_client.delete_project(project_name=project_name) +def test_update_run_attachments(langchain_client: Client) -> None: + """Test the persist and update methods work as expected.""" + project_name = "__test_update_run_attachments" + uuid4().hex[:4] + if langchain_client.has_project(project_name): + langchain_client.delete_project(project_name=project_name) + try: + trace_id = uuid4() + start_time = datetime.datetime.now(datetime.timezone.utc) + run: dict = dict( + id=str(trace_id), + name="test_run", + run_type="llm", + inputs={"text": "hello world"}, + project_name=project_name, + api_url=os.getenv("LANGCHAIN_ENDPOINT"), + start_time=start_time, + extra={"extra": "extra"}, + trace_id=str(trace_id), + dotted_order=f"{start_time.strftime('%Y%m%dT%H%M%S%fZ')}{str(trace_id)}", + ) + langchain_client.create_run(**run) + run["outputs"] = {"output": ["Hi"]} + run["extra"]["foo"] = "bar" + run["name"] = "test_run_updated" + langchain_client.update_run(run["id"], **run) + wait_for(lambda: langchain_client.read_run(run["id"]).end_time is not None) + stored_run = langchain_client.read_run(run["id"]) + assert stored_run.name == run["name"] + assert str(stored_run.id) == run["id"] + assert stored_run.outputs == run["outputs"] + assert stored_run.start_time == run["start_time"].replace(tzinfo=None) + finally: + langchain_client.delete_project(project_name=project_name) + + @pytest.mark.parametrize("uri", ["http://localhost:1981", "http://api.langchain.minus"]) def test_error_surfaced_invalid_uri(uri: str) -> None: get_env_var.cache_clear() @@ -936,6 +972,251 @@ def test_multipart_ingest_empty( assert not caplog.records +def test_multipart_ingest_create_with_attachments_error( + langchain_client: Client, caplog: pytest.LogCaptureFixture +) -> None: + _session = "__test_multipart_ingest_create_with_attachments" + trace_a_id = uuid4() + current_time = datetime.datetime.now(datetime.timezone.utc).strftime( + "%Y%m%dT%H%M%S%fZ" + ) + + runs_to_create: list[dict] = [ + { + "id": str(trace_a_id), + "session_name": _session, + "name": "trace a root", + "run_type": "chain", + "dotted_order": f"{current_time}{str(trace_a_id)}", + "trace_id": str(trace_a_id), + "inputs": {"input1": 1, "input2": 2}, + "attachments": { + "foo": ("text/plain", b"bar"), + "bar": ( + "image/png", + Path(__file__).parent / "test_data/parrot-icon.png", + ), + }, + } + ] + + # make sure no warnings logged + with pytest.raises(ValueError, match="Must set dangerously_allow_filesystem"): + langchain_client.multipart_ingest(create=runs_to_create, update=[]) + + +def test_multipart_ingest_create_with_attachments( + langchain_client: Client, caplog: pytest.LogCaptureFixture +) -> None: + _session = "__test_multipart_ingest_create_with_attachments" + trace_a_id = uuid4() + current_time = datetime.datetime.now(datetime.timezone.utc).strftime( + "%Y%m%dT%H%M%S%fZ" + ) + + runs_to_create: list[dict] = [ + { + "id": str(trace_a_id), + "session_name": _session, + "name": "trace a root", + "run_type": "chain", + "dotted_order": f"{current_time}{str(trace_a_id)}", + "trace_id": str(trace_a_id), + "inputs": {"input1": 1, "input2": 2}, + "attachments": { + "foo": ("text/plain", b"bar"), + "bar": ( + "image/png", + Path(__file__).parent / "test_data/parrot-icon.png", + ), + }, + } + ] + + # make sure no warnings logged + with caplog.at_level(logging.WARNING, logger="langsmith.client"): + langchain_client.multipart_ingest( + create=runs_to_create, update=[], dangerously_allow_filesystem=True + ) + assert not caplog.records + wait_for(lambda: _get_run(str(trace_a_id), langchain_client)) + created_run = langchain_client.read_run(run_id=str(trace_a_id)) + assert sorted(created_run.attachments.keys()) == sorted(["foo", "bar"]) + assert created_run.attachments["foo"]["reader"].read() == b"bar" + assert ( + created_run.attachments["bar"]["reader"].read() + == (Path(__file__).parent / "test_data/parrot-icon.png").read_bytes() + ) + + +def test_multipart_ingest_update_with_attachments_no_paths( + langchain_client: Client, caplog: pytest.LogCaptureFixture +): + _session = "__test_multipart_ingest_update_with_attachments_no_paths" + trace_a_id = uuid4() + current_time = datetime.datetime.now(datetime.timezone.utc).strftime( + "%Y%m%dT%H%M%S%fZ" + ) + + runs_to_create: list[dict] = [ + { + "id": str(trace_a_id), + "session_name": _session, + "name": "trace a root", + "run_type": "chain", + "dotted_order": f"{current_time}{str(trace_a_id)}", + "trace_id": str(trace_a_id), + "outputs": {"output1": 3, "output2": 4}, + "attachments": { + "foo": ("text/plain", b"bar"), + "bar": ("image/png", b"bar"), + }, + } + ] + with caplog.at_level(logging.WARNING, logger="langsmith.client"): + langchain_client.multipart_ingest(create=runs_to_create, update=[]) + + assert not caplog.records + wait_for(lambda: _get_run(str(trace_a_id), langchain_client)) + created_run = langchain_client.read_run(run_id=str(trace_a_id)) + assert created_run.attachments + assert sorted(created_run.attachments.keys()) == sorted(["foo", "bar"]) + assert created_run.attachments["foo"]["reader"].read() == b"bar" + assert created_run.attachments["bar"]["reader"].read() == b"bar" + + runs_to_update: list[dict] = [ + { + "id": str(trace_a_id), + "dotted_order": f"{current_time}{str(trace_a_id)}", + "trace_id": str(trace_a_id), + "outputs": {"output1": 3, "output2": 4}, + "attachments": { + "baz": ("text/plain", b"bar"), + "qux": ("image/png", b"bar"), + }, + } + ] + with caplog.at_level(logging.WARNING, logger="langsmith.client"): + langchain_client.multipart_ingest(create=[], update=runs_to_update) + + assert not caplog.records + + +def _get_run(run_id: ID_TYPE, langchain_client: Client, has_end: bool = False) -> bool: + try: + r = langchain_client.read_run(run_id) # type: ignore + if has_end: + return r.end_time is not None + return True + except LangSmithError: + return False + + +def test_multipart_ingest_update_with_attachments_error( + langchain_client: Client, caplog: pytest.LogCaptureFixture +) -> None: + _session = "__test_multipart_ingest_update_with_attachments" + + trace_a_id = uuid4() + current_time = datetime.datetime.now(datetime.timezone.utc).strftime( + "%Y%m%dT%H%M%S%fZ" + ) + + runs_to_create: list[dict] = [ + { + "id": str(trace_a_id), + "session_name": _session, + "name": "trace a root", + "run_type": "chain", + "dotted_order": f"{current_time}{str(trace_a_id)}", + "trace_id": str(trace_a_id), + "inputs": {"input1": 1, "input2": 2}, + } + ] + + # make sure no warnings logged + with caplog.at_level(logging.WARNING, logger="langsmith.client"): + langchain_client.multipart_ingest(create=runs_to_create, update=[]) + assert not caplog.records + wait_for(lambda: _get_run(str(trace_a_id), langchain_client)) + + runs_to_update: list[dict] = [ + { + "id": str(trace_a_id), + "dotted_order": f"{current_time}{str(trace_a_id)}", + "trace_id": str(trace_a_id), + "inputs": {"input1": 3, "input2": 4}, + "attachments": { + "foo": ("text/plain", b"bar"), + "bar": ( + "image/png", + Path(__file__).parent / "test_data/parrot-icon.png", + ), + }, + } + ] + with pytest.raises(ValueError, match="Must set dangerously_allow_filesystem"): + langchain_client.multipart_ingest(create=[], update=runs_to_update) + + +def test_multipart_ingest_update_with_attachments( + langchain_client: Client, caplog: pytest.LogCaptureFixture +) -> None: + _session = "__test_multipart_ingest_update_with_attachments" + trace_a_id = uuid4() + current_time = datetime.datetime.now(datetime.timezone.utc).strftime( + "%Y%m%dT%H%M%S%fZ" + ) + + runs_to_create: list[dict] = [ + { + "id": str(trace_a_id), + "session_name": _session, + "name": "trace a root", + "run_type": "chain", + "dotted_order": f"{current_time}{str(trace_a_id)}", + "trace_id": str(trace_a_id), + "inputs": {"input1": 1, "input2": 2}, + } + ] + + # make sure no warnings logged + with caplog.at_level(logging.WARNING, logger="langsmith.client"): + langchain_client.multipart_ingest(create=runs_to_create, update=[]) + assert not caplog.records + + runs_to_update: list[dict] = [ + { + "id": str(trace_a_id), + "dotted_order": f"{current_time}{str(trace_a_id)}", + "trace_id": str(trace_a_id), + "inputs": {"input1": 3, "input2": 4}, + "attachments": { + "foo": ("text/plain", b"bar"), + "bar": ( + "image/png", + Path(__file__).parent / "test_data/parrot-icon.png", + ), + }, + } + ] + langchain_client.multipart_ingest( + create=[], update=runs_to_update, dangerously_allow_filesystem=True + ) + assert open(Path(__file__).parent / "test_data/parrot-icon.png").closed + + assert not caplog.records + wait_for(lambda: _get_run(str(trace_a_id), langchain_client)) + created_run = langchain_client.read_run(run_id=str(trace_a_id)) + assert created_run.inputs == {"input1": 3, "input2": 4} + assert sorted(created_run.attachments.keys()) == sorted(["foo", "bar"]) + assert created_run.attachments["foo"]["reader"].read() == b"bar" + assert ( + created_run.attachments["bar"]["reader"].read() + == (Path(__file__).parent / "test_data/parrot-icon.png").read_bytes() + ) + + def test_multipart_ingest_create_then_update( langchain_client: Client, caplog: pytest.LogCaptureFixture ) -> None: @@ -1089,16 +1370,7 @@ def test_update_run_extra(add_metadata: bool, do_batching: bool) -> None: revision_id = uuid4() langchain_client.create_run(**run, revision_id=revision_id) # type: ignore - def _get_run(run_id: ID_TYPE, has_end: bool = False) -> bool: - try: - r = langchain_client.read_run(run_id) # type: ignore - if has_end: - return r.end_time is not None - return True - except LangSmithError: - return False - - wait_for(lambda: _get_run(run_id)) + wait_for(lambda: _get_run(run_id, langchain_client)) created_run = langchain_client.read_run(run_id) assert created_run.metadata["foo"] == "bar" assert created_run.metadata["revision_id"] == str(revision_id) @@ -1107,7 +1379,7 @@ def _get_run(run_id: ID_TYPE, has_end: bool = False) -> bool: run["extra"]["metadata"]["foo2"] = "baz" # type: ignore run["tags"] = ["tag3"] langchain_client.update_run(run_id, **run) # type: ignore - wait_for(lambda: _get_run(run_id, has_end=True)) + wait_for(lambda: _get_run(run_id, langchain_client, has_end=True)) updated_run = langchain_client.read_run(run_id) assert updated_run.metadata["foo"] == "bar" # type: ignore assert updated_run.revision_id == str(revision_id) @@ -1826,6 +2098,94 @@ def test_bulk_update_examples_with_attachments_operations( langchain_client.delete_dataset(dataset_id=dataset.id) +def test_examples_multipart_attachment_path(langchain_client: Client) -> None: + """Test uploading examples with attachments via multipart endpoint.""" + dataset_name = "__test_upload_examples_multipart" + uuid4().hex[:4] + if langchain_client.has_dataset(dataset_name=dataset_name): + langchain_client.delete_dataset(dataset_name=dataset_name) + + dataset = langchain_client.create_dataset( + dataset_name, + description="Test dataset for multipart example uploads", + data_type=DataType.kv, + ) + + example_id = uuid4() + example = ExampleUploadWithAttachments( + id=example_id, + inputs={"text": "hello world"}, + attachments={ + "file1": ("text/plain", b"original content 1"), + "file2": ("image/png", Path(__file__).parent / "test_data/parrot-icon.png"), + }, + ) + + created_examples = langchain_client.upload_examples_multipart( + dataset_id=dataset.id, uploads=[example], dangerously_allow_filesystem=True + ) + assert created_examples["count"] == 1 + assert open(Path(__file__).parent / "test_data/parrot-icon.png").closed + + # Verify the upload + retrieved = langchain_client.read_example(example_id) + + assert len(retrieved.attachments) == 2 + assert "file1" in retrieved.attachments + assert "file2" in retrieved.attachments + assert retrieved.attachments["file1"]["reader"].read() == b"original content 1" + assert ( + retrieved.attachments["file2"]["reader"].read() + == (Path(__file__).parent / "test_data/parrot-icon.png").read_bytes() + ) + + example_update = ExampleUpdateWithAttachments( + id=example_id, + attachments={ + "new_file1": ( + "image/png", + Path(__file__).parent / "test_data/parrot-icon.png", + ), + }, + ) + + langchain_client.update_examples_multipart( + dataset_id=dataset.id, + updates=[example_update], + dangerously_allow_filesystem=True, + ) + + retrieved = langchain_client.read_example(example_id) + + assert len(retrieved.attachments) == 1 + assert "new_file1" in retrieved.attachments + assert ( + retrieved.attachments["new_file1"]["reader"].read() + == (Path(__file__).parent / "test_data/parrot-icon.png").read_bytes() + ) + + example_wrong_path = ExampleUploadWithAttachments( + id=example_id, + inputs={"text": "hello world"}, + attachments={ + "file1": ( + "text/plain", + Path(__file__).parent / "test_data/not-a-real-file.txt", + ), + }, + ) + + with pytest.raises(FileNotFoundError) as exc_info: + langchain_client.upload_examples_multipart( + dataset_id=dataset.id, + uploads=[example_wrong_path], + dangerously_allow_filesystem=True, + ) + assert "test_data/not-a-real-file.txt" in str(exc_info.value) + + # Clean up + langchain_client.delete_dataset(dataset_id=dataset.id) + + def test_update_examples_multipart(langchain_client: Client) -> None: """Test updating examples with attachments via multipart endpoint.""" dataset_name = "__test_update_examples_multipart" + uuid4().hex[:4] diff --git a/python/tests/integration_tests/test_data/parrot-icon.png b/python/tests/integration_tests/test_data/parrot-icon.png new file mode 100644 index 000000000..7fd3de1dc Binary files /dev/null and b/python/tests/integration_tests/test_data/parrot-icon.png differ diff --git a/python/tests/integration_tests/test_runs.py b/python/tests/integration_tests/test_runs.py index 6ce943690..e0c9afd51 100644 --- a/python/tests/integration_tests/test_runs.py +++ b/python/tests/integration_tests/test_runs.py @@ -3,6 +3,7 @@ import uuid from collections import defaultdict from concurrent.futures import ThreadPoolExecutor +from pathlib import Path from typing import AsyncGenerator, Generator, Optional, Sequence import pytest # type: ignore @@ -11,6 +12,7 @@ from langsmith.client import Client from langsmith.run_helpers import trace, traceable from langsmith.run_trees import RunTree +from langsmith.schemas import Attachment @pytest.fixture @@ -479,3 +481,34 @@ async def test_end_metadata_with_run_tree(langchain_client: Client): assert run.run_type == "chain" assert run.metadata["final_metadata"] == run_id.hex assert run.outputs == {"result": "success"} + + +def test_trace_file_path(langchain_client: Client) -> None: + """Test that you can trace attachments with file paths""" + project_name = "__test_trace_file_path" + run_meta = uuid.uuid4().hex + + @traceable(run_type="chain") + def my_func(foo: Attachment): + return "foo" + + my_func( + Attachment( + mime_type="image/png", + data=Path(__file__).parent / "test_data/parrot-icon.png", + ), + langsmith_extra=dict( + project_name=project_name, metadata={"test_run": run_meta} + ), + ) + + poll_runs_until_count(langchain_client, project_name, 1, max_retries=20) + _filter = f'and(eq(metadata_key, "test_run"), eq(metadata_value, "{run_meta}"))' + runs = list(langchain_client.list_runs(project_name=project_name, filter=_filter)) + assert len(runs) == 1 + run = runs[0] + assert run.attachments + assert ( + run.attachments["foo"].reader.read() + == (Path(__file__).parent / "test_data/parrot-icon.png").read_bytes() + )